我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用scipy.io.wavfile.write()。
def wavWrite(y, fs, nbits, audioFile): """ Write samples to WAV file Args: samples: (ndarray / 2D ndarray) (floating point) sample vector mono: DIM: nSamples stereo: DIM: nSamples x nChannels fs: (int) Sample rate in Hz nBits: (int) Number of bits fnWAV: (string) WAV file name to write """ if nbits == 8: intsamples = (y+1.0) * AudioIO.normFact['int' + str(nbits)] fX = np.int8(intsamples) elif nbits == 16: intsamples = y * AudioIO.normFact['int' + str(nbits)] fX = np.int16(intsamples) elif nbits > 16: fX = y write(audioFile, fs, fX)
def sound(x,fs): """ Plays a wave file using the pyglet library. But first, it has to be written. Termination of the playback is being performed by any keyboard input and Enter. Args: x: (array) Floating point samples fs: (int) The sampling rate """ import pyglet as pg global player # Call the writing function AudioIO.wavWrite(x, fs, 16, 'testPlayback.wav') # Initialize playback engine player = pg.media.Player() # Initialize the object with the audio file playback = pg.media.load('testPlayback.wav') # Set it to player player.queue(playback) # Sound call player.play() # Killed by "keyboard" kill = raw_input() if kill or kill == '': AudioIO.stop() # Remove the dummy wave write os.remove('testPlayback.wav')
def download_bundle(bundle_name, target_dir, force_reload=False): """Downloads a Magenta bundle to target directory. Args: bundle_name: A string Magenta bundle name to download. target_dir: A string local directory in which to write the bundle. force_reload: A boolean that when True, reloads the bundle even if present. """ bundle_target = os.path.join(target_dir, bundle_name) if not os.path.exists(bundle_target) or force_reload: response = urllib.request.urlopen( 'http://download.magenta.tensorflow.org/models/%s' % bundle_name) data = response.read() local_file = open(bundle_target, 'wb') local_file.write(data) local_file.close()
def run_phase_reconstruction_example(): fs, d = fetch_sample_speech_tapestry() # actually gives however many components you say! So double what .m file # says fftsize = 512 step = 64 X_s = np.abs(stft(d, fftsize=fftsize, step=step, real=False, compute_onesided=False)) X_t = iterate_invert_spectrogram(X_s, fftsize, step, verbose=True) """ import matplotlib.pyplot as plt plt.specgram(d, cmap="gray") plt.savefig("1.png") plt.close() plt.imshow(X_s, cmap="gray") plt.savefig("2.png") plt.close() """ wavfile.write("phase_original.wav", fs, soundsc(d)) wavfile.write("phase_reconstruction.wav", fs, soundsc(X_t))
def run_fft_dct_example(): random_state = np.random.RandomState(1999) fs, d = fetch_sample_speech_fruit() n_fft = 64 X = d[0] X_stft = stft(X, n_fft) X_rr = complex_to_real_view(X_stft) X_dct = fftpack.dct(X_rr, axis=-1, norm='ortho') X_dct_sub = X_dct[1:] - X_dct[:-1] std = X_dct_sub.std(axis=0, keepdims=True) X_dct_sub += .01 * std * random_state.randn( X_dct_sub.shape[0], X_dct_sub.shape[1]) X_dct_unsub = np.cumsum(X_dct_sub, axis=0) X_idct = fftpack.idct(X_dct_unsub, axis=-1, norm='ortho') X_irr = real_to_complex_view(X_idct) X_r = istft(X_irr, n_fft)[:len(X)] SNR = 20 * np.log10(np.linalg.norm(X - X_r) / np.linalg.norm(X)) print(SNR) wavfile.write("fftdct_orig.wav", fs, soundsc(X)) wavfile.write("fftdct_rec.wav", fs, soundsc(X_r))
def run_ltsd_example(): fs, d = fetch_sample_speech_tapestry() winsize = 1024 d = d.astype("float32") / 2 ** 15 d -= d.mean() pad = 3 * fs noise_pwr = np.percentile(d, 1) ** 2 noise_pwr = max(1E-9, noise_pwr) d = np.concatenate((np.zeros((pad,)) + noise_pwr * np.random.randn(pad), d)) _, vad_segments = ltsd_vad(d, fs, winsize=winsize) v_up = np.where(vad_segments == True)[0] s = v_up[0] st = v_up[-1] + int(.5 * fs) d = d[s:st] bname = "tapestry.wav".split(".")[0] wavfile.write("%s_out.wav" % bname, fs, soundsc(d))
def urlretrieve(url, filename, reporthook=None, data=None): ''' This function is adpated from: https://github.com/fchollet/keras Original work Copyright (c) 2014-2015 keras contributors ''' def chunk_read(response, chunk_size=8192, reporthook=None): total_size = response.info().get('Content-Length').strip() total_size = int(total_size) count = 0 while 1: chunk = response.read(chunk_size) if not chunk: break count += 1 if reporthook: reporthook(count, chunk_size, total_size) yield chunk response = urlopen(url, data) with open(filename, 'wb') as fd: for chunk in chunk_read(response, reporthook=reporthook): fd.write(chunk)
def generate_and_save_samples(sample_fn, length, count, dir, rate, levels): def save_samples(data): data = (data * np.reshape(np.arange(levels) / (levels-1), [levels, 1, 1])).sum( axis=1, keepdims=True) value = np.iinfo(np.int16).max audio = (utils.inverse_mulaw(data * 2 - 1) * value).astype(np.int16) for idx, sample in enumerate(audio): filename = os.path.join(dir, 'sample_{}.wav'.format(idx)) wavfile.write(filename, rate, np.squeeze(sample)) samples = chainer.Variable( chainer.cuda.cupy.zeros([count, levels, 1, length], dtype='float32')) one_hot_ref = chainer.cuda.cupy.eye(levels).astype('float32') with tqdm.tqdm(total=length) as bar: for i in range(length): probs = F.softmax(sample_fn(samples))[:, :, 0, 0, i] samples.data[:, :, 0, i] = one_hot_ref[utils.sample_from(probs.data.get())] bar.update() samples.to_cpu() save_samples(samples.data)
def save_audio_file(filename, quantized_signal, quantization_steps=256, format="16bit_pcm", sampling_rate=48000): quantized_signal = quantized_signal.astype(float) normalized_signal = (quantized_signal / quantization_steps - 0.5) * 2.0 # inv mu-law companding transformation (ITU-T, 1988) mu = quantization_steps - 1 signals_1d = np.sign(normalized_signal) * ((1 + mu) ** np.absolute(normalized_signal)) / mu if format == "16bit_pcm": max = 1<<15 type = np.int16 elif format == "32bit_pcm": max = 1<<31 type = np.int32 elif format == "8bit_pcm": max = 1<<8 - 1 type = np.uint8 signals_1d *= max audio = signals_1d.reshape((-1, 1)).astype(type) audio = np.repeat(audio, 2, axis=1) wavfile.write(filename, sampling_rate, audio) # convert signal to 1xW image
def test(): return num_test = 100 x_cpu = sampler0.memories_test[0:num_test] x = Variable(torch.FloatTensor(x_cpu)) y = generator(x).data.cpu().numpy() chunk_size = y.shape[2] output_stft = np.zeros([num_test, chunk_size], dtype=np.complex128) for i in range(num_test): output_flat = y[i] output_complex = output_flat[0] + 1j * output_flat[1] output_stft[i] = output_complex output_wav = stft.istft(output_stft / spectrogram.spec_norm) wavfile.write("test.wav", 8000, output_wav.reshape(-1))
def test(): print("testing...") fake = generator(fixed_noise).data.cpu().numpy() print(fake.shape) fake = fake.reshape(-1, sampler.num_memory_channels, sampler.num_history, sampler.memory_size) print(fake.shape) fake_real = fake[0:,0:1,0:sampler.num_history,0:sampler.memory_size] fake_imag = fake[0:,1:2,0:sampler.num_history,0:sampler.memory_size] fake = fake_real + 1j * fake_imag print(fake.shape) fake = fake.reshape(-1, sampler.memory_size) print(fake.shape) output_wav = stft.istft(fake / spectrogram.spec_norm) print(output_wav.shape) wavfile.write("test.wav", 8000, output_wav.reshape(-1))
def test(): model.load("music.tflearn") X = fft_sequences[10].reshape([1, num_history, fft_stored_size]) X = np.random.uniform(low=-0.1, high=0.1, size=[1, num_history, fft_stored_size]) num_test = 10 output = np.zeros([num_test, num_history, fft_stored_size]) for test_i in range(num_test): Y = np.array(model.predict(X)[0]) output[test_i] = Y.reshape([num_history, fft_stored_size]) X = Y.reshape([1, num_history, fft_stored_size]) X[np.where(np.square(X) < 0.0001)] = 0.0 #X *= 1.5 #raw test #output[test_i] = fft_next[test_i] #print(output[test_i]) wav = convert(output) print("wav: " + str(wav.shape)) wavfile.write("test.wav", 8000, wav)
def main(): #Select filename filename = 'bach10sec.wav' #Test loudest_band with bach, send only one of the two channels music, frame_rate, nframes, nchannels = read_wave(filename, False) #Plot FFT of input signal #plot_fft(music[:,0], frame_rate, -5000, 5000) (low, high, loudest) = loudest_band(music[:,0], frame_rate, 75) print("low: ", low, " Hz") print("high: ", high, " Hz") #Plot FFT of output signal #plot_fft(loudest, frame_rate, -5000, 5000) #Write file loudest = loudest/np.max(loudest) #normalize to amplidue one loudest = loudest.astype(np.float32) #convert to 32-bit float wavfile.write(filename[0:-4]+'_filtered.wav', frame_rate, loudest)
def main(): #Select filename filename = 'scary.wav' #Test loudest_band with bach, send only one of the two channels music, frame_rate, nframes, nchannels = read_wave(filename, False) #Plot FFT of input signal plot_fft(music[:,0], frame_rate, -5000, 5000) (low, high, loudest) = loudest_band(music[:,0], frame_rate, 1000) print("low: ", low, " Hz") print("high: ", high, " Hz") #Plot FFT of output signal plot_fft(loudest, frame_rate, -5000, 5000) #Write file loudest = loudest/np.max(loudest) #normalize to amplidue one loudest = loudest.astype(np.float32) #convert to 32-bit float wavfile.write(filename[0:-4]+'_filtered.wav', frame_rate, loudest)
def midiwrap(): """ Wrapper to midi read and midi write """ try: sys.path.insert(1, get_resource_dir("")) from midi.utils import midiread, midiwrite sys.path.pop(1) except ImportError: logger.info("Need GPL licensed midi utils, downloading...", "http://www.iro.umontreal.ca/~lisa/deep/midi.zip") url = "http://www.iro.umontreal.ca/~lisa/deep/midi.zip" partial_path = get_resource_dir("") full_path = os.path.join(partial_path, "midi.zip") if not os.path.exists(full_path): download(url, full_path) zip_ref = zipfile.ZipFile(full_path, 'r') zip_ref.extractall(partial_path) zip_ref.close() sys.path.insert(1, get_resource_dir("")) from midi.utils import midiread, midiwrite sys.path.pop(1) return midiread, midiwrite
def split_file_to_wav(data, sampling_rate, n_channel, max_length, base_file_name): """ Splits data into smaller subfiles of same length. Files use the following convention for naming: BASEFILENAME + START_SAMPLE_NUMBER + .wav where START_SAMPLE_NUMBER is the location of the first sample in the original data set returns: a list of start, stop, label, where start is the first sample, stop is the last and label is the .wav file name. """ if max_length == np.inf: max_length = len(data) n_segments = len(data) // max_length intervals = [{"start": start_sample, "stop": start_sample + max_length, "label": "{}_{:013d}.wav".format(base_file_name, start_sample)} for start_sample in np.arange(n_segments) * max_length] for x in intervals: subdata = data[x["start"]:x["start"] + max_length, n_channel] wavfile.write(x["label"], sampling_rate, subdata - int(np.mean(subdata))) return intervals
def save_samples(inputFile,sample_rate,channel_count,bit_rate,audio_data): (root,ext) = inputFile.split('.') if (ext == 'wav'): if (audio_data.dtype == numpy.int16): # assume already scaled wavfile.write(inputFile,sample_rate,audio_data) else: print "data type is ",audio_data.dtype, " converting to int16 for .wav" sar = numpy.array(audio_data,dtype=numpy.int16) wavfile.write(inputFile,sample_rate,sar) else: encode_mp3(inputFile,sample_rate,channel_count,bit_rate,audio_data)
def encode(self, pcm_data, fn): sample_count = len(pcm_data) /2 output_buff_len = int(1.25 * sample_count + 7200) output_buff = (ctypes.c_char*output_buff_len)() self.dll.lame_encode_buffer.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_int, ctypes.c_int, ctypes.POINTER(ctypes.c_char), ctypes.c_int]; output_size = self.dll.lame_encode_buffer(self.lame, pcm_data, 0, sample_count, output_buff, output_buff_len); if (output_size): fn.write(output_buff[0:output_size])
def write(self, filename): """ Write the data to an audio file (only wav is supported). Parameters ---------- filename: string Path to the wav file. """ wavfile.write(filename, self.sample_rate, self)
def save_batch(batch_audio, batch_save_paths): for audio, name in zip(batch_audio, batch_save_paths): tf.logging.info("Saving: %s" % name) wavfile.write(name, 16000, audio)
def colab_play(array_of_floats, sample_rate, ephemeral=True, autoplay=False): """Creates an HTML5 audio widget to play a sound in Colab. This function should only be called from a Colab notebook. Args: array_of_floats: A 1D or 2D array-like container of float sound samples. Values outside of the range [-1, 1] will be clipped. sample_rate: Sample rate in samples per second. ephemeral: If set to True, the widget will be ephemeral, and disappear on reload (and it won't be counted against realtime document size). autoplay: If True, automatically start playing the sound when the widget is rendered. """ from google.colab.output import _js_builder as js # pylint: disable=g-import-not-at-top,protected-access normalizer = float(np.iinfo(np.int16).max) array_of_ints = np.array( np.asarray(array_of_floats) * normalizer, dtype=np.int16) memfile = BytesIO() wavfile.write(memfile, sample_rate, array_of_ints) html = """<audio controls {autoplay}> <source controls src="data:audio/wav;base64,{base64_wavfile}" type="audio/wav" /> Your browser does not support the audio element. </audio>""" html = html.format( autoplay='autoplay' if autoplay else '', base64_wavfile=base64.encodestring(memfile.getvalue())) memfile.close() global _play_id _play_id += 1 if ephemeral: element = 'id_%s' % _play_id display.display(display.HTML('<div id="%s"> </div>' % element)) js.Js('document', mode=js.EVAL).getElementById(element).innerHTML = html else: display.display(display.HTML(html))
def task(fpath, new_fpath): fs, signal = wavfile.read(fpath) signal_out = remove_silence(fs, signal) wavfile.write(new_fpath, fs, signal_out) return fpath
def writeWav(fn, fs, data): data = data * 1.5 / np.max(np.abs(data)) wavfile.write(fn, fs, data)
def download(url, server_fname, local_fname=None, progress_update_percentage=5, bypass_certificate_check=False): """ An internet download utility modified from http://stackoverflow.com/questions/22676/ how-do-i-download-a-file-over-http-using-python/22776#22776 """ if bypass_certificate_check: import ssl ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE u = urllib.urlopen(url, context=ctx) else: u = urllib.urlopen(url) if local_fname is None: local_fname = server_fname full_path = local_fname meta = u.info() with open(full_path, 'wb') as f: try: file_size = int(meta.get("Content-Length")) except TypeError: print("WARNING: Cannot get file size, displaying bytes instead!") file_size = 100 print("Downloading: %s Bytes: %s" % (server_fname, file_size)) file_size_dl = 0 block_sz = int(1E7) p = 0 while True: buffer = u.read(block_sz) if not buffer: break file_size_dl += len(buffer) f.write(buffer) if (file_size_dl * 100. / file_size) > p: status = r"%10d [%3.2f%%]" % (file_size_dl, file_size_dl * 100. / file_size) print(status) p += progress_update_percentage
def run_cqt_example(): try: fs, d = fetch_sample_file("/Users/User/cqt_resources/kempff1.wav") except ValueError: print("WARNING: Using sample music instead but kempff1.wav is the example") fs, d = fetch_sample_music() X = d[:44100] X_cq, c_dc, c_nyq, multiscale, shift, window_lens = cqt(X, fs) X_r = icqt(X_cq, c_dc, c_nyq, multiscale, shift, window_lens) SNR = 20 * np.log10(np.linalg.norm(X - X_r) / np.linalg.norm(X)) wavfile.write("cqt_original.wav", fs, soundsc(X)) wavfile.write("cqt_reconstruction.wav", fs, soundsc(X_r))
def run_world_example(): fs, d = fetch_sample_speech_tapestry() d = d.astype("float32") / 2 ** 15 temporal_positions_h, f0_h, vuv_h, f0_candidates_h = harvest(d, fs) temporal_positions_ct, spectrogram_ct, fs_ct = cheaptrick(d, fs, temporal_positions_h, f0_h, vuv_h) temporal_positions_d4c, f0_d4c, vuv_d4c, aper_d4c, coarse_aper_d4c = d4c(d, fs, temporal_positions_h, f0_h, vuv_h) #y = world_synthesis(f0_d4c, vuv_d4c, aper_d4c, spectrogram_ct, fs_ct) y = world_synthesis(f0_d4c, vuv_d4c, coarse_aper_d4c, spectrogram_ct, fs_ct) wavfile.write("out.wav", fs, soundsc(y))
def run_world_mgc_example(): # run on chromebook # enc 839.71 # synth 48.79 fs, d = fetch_sample_speech_tapestry() d = d.astype("float32") / 2 ** 15 # harcoded for 16k from # https://github.com/CSTR-Edinburgh/merlin/blob/master/misc/scripts/vocoder/world/extract_features_for_merlin.sh mgc_alpha = 0.58 #mgc_order = 59 mgc_order = 59 # this is actually just mcep mgc_gamma = 0.0 def enc(): temporal_positions_h, f0_h, vuv_h, f0_candidates_h = harvest(d, fs) temporal_positions_ct, spectrogram_ct, fs_ct = cheaptrick(d, fs, temporal_positions_h, f0_h, vuv_h) temporal_positions_d4c, f0_d4c, vuv_d4c, aper_d4c, coarse_aper_d4c = d4c(d, fs, temporal_positions_h, f0_h, vuv_h) mgc_arr = sp2mgc(spectrogram_ct, mgc_order, mgc_alpha, mgc_gamma, verbose=True) return mgc_arr, spectrogram_ct, f0_d4c, vuv_d4c, coarse_aper_d4c start = time.time() mgc_arr, spectrogram_ct, f0_d4c, vuv_d4c, coarse_aper_d4c = enc() enc_done = time.time() sp_r = mgc2sp(mgc_arr, mgc_alpha, mgc_gamma, fs=fs, verbose=True) synth_done = time.time() print("enc time: {}".format(enc_done - start)) print("synth time: {}".format(synth_done - enc_done)) y = world_synthesis(f0_d4c, vuv_d4c, coarse_aper_d4c, sp_r, fs) #y = world_synthesis(f0_d4c, vuv_d4c, aper_d4c, sp_r, fs) wavfile.write("out_mgc.wav", fs, soundsc(y))
def run_world_dct_example(): # on chromebook # enc 114.229 # synth 5.165 fs, d = fetch_sample_speech_tapestry() d = d.astype("float32") / 2 ** 15 def enc(): temporal_positions_h, f0_h, vuv_h, f0_candidates_h = harvest(d, fs) temporal_positions_ct, spectrogram_ct, fs_ct = cheaptrick(d, fs, temporal_positions_h, f0_h, vuv_h) temporal_positions_d4c, f0_d4c, vuv_d4c, aper_d4c, coarse_aper_d4c = d4c(d, fs, temporal_positions_h, f0_h, vuv_h) return spectrogram_ct, f0_d4c, vuv_d4c, coarse_aper_d4c start = time.time() spectrogram_ct, f0_d4c, vuv_d4c, coarse_aper_d4c = enc() dct_buf = fftpack.dct(spectrogram_ct) n_fft = 512 n_dct = 20 dct_buf = dct_buf[:, :n_dct] idct_buf = np.zeros((dct_buf.shape[0], n_fft + 1)) idct_buf[:, :n_dct] = dct_buf ispectrogram_ct = fftpack.idct(idct_buf) enc_done = time.time() y = world_synthesis(f0_d4c, vuv_d4c, coarse_aper_d4c, spectrogram_ct, fs) synth_done = time.time() print("enc time: {}".format(enc_done - start)) print("synth time: {}".format(synth_done - enc_done)) #y = world_synthesis(f0_d4c, vuv_d4c, aper_d4c, sp_r, fs) wavfile.write("out_dct.wav", fs, soundsc(y)) #run_world_mgc_example() #run_world_base_example()
def extract_for_one(wavDataDir, lineList, filename, FILE_EXT_WAV): filename_wav = os.path.join(wavDataDir,filename+FILE_EXT_WAV) filename_wav_silence_removed = os.path.join(wavDataDir+'_silence_removed','temp'+FILE_EXT_WAV) ##-- remove the silence from audio sr = 44100 audio = ess.MonoLoader(filename=filename_wav,downmix='left',sampleRate=sr)() audio_remove_silence = removeSilence(audio,sr,lineList) wavfile.write(filename_wav_silence_removed,sr,audio_remove_silence) ##-- process the silence removed audio loader = essentia.streaming.EqloudLoader(filename=filename_wav_silence_removed) fEx = FeatureExtractor(frameSize=2048, hopSize=1024, sampleRate=loader.paramValue('sampleRate')) p = essentia.Pool() loader.audio >> fEx.signal for desc, output in fEx.outputs.items(): output >> (p, desc) essentia.run(loader) # convert pitch from hz to cents for i in range(len(p['pitch_instantaneous_pitch'])): p['pitch_instantaneous_pitch'][i] = hz2cents(p['pitch_instantaneous_pitch'][i]) stats = ['mean', 'var', 'dmean', 'dvar'] statsPool = essentia.standard.PoolAggregator(defaultStats=stats)(p) return statsPool
def main(args): assert args.datafile is not None print 'datafile', args.datafile data = np.genfromtxt(args.datafile, delimiter = ' ') next_hundred_below = (data.shape[0]/100)*100 data = data[:next_hundred_below] print " data", data.shape # save to ppydata style pickled dict ppydata = {} ppydata['x'] = np.roll(data, 25, axis = 0).reshape(data.shape + (1, )) ppydata['y'] = data.reshape(data.shape + (1, )) for k, v in ppydata.items(): print ' ppydata.%s = %s' % (k, v.shape) pickle.dump(ppydata, open(args.datafile + '.pickle', 'wb')) # save as wav from scipy.io import wavfile # print "data.dtype", data.dtype data /= np.max(np.abs(data)) data *= 32767 data = np.vstack((data for i in range(10))) wavfile.write(args.datafile + '.wav', 44100, data.astype(np.int16)) plt.plot(data[:,0], data[:,1], 'k-o', alpha = 0.2) plt.show()