我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用scipy.signal()。
def applyFilter(data, b, a, padding=100, bidir=True): """Apply a linear filter with coefficients a, b. Optionally pad the data before filtering and/or run the filter in both directions.""" try: import scipy.signal except ImportError: raise Exception("applyFilter() requires the package scipy.signal.") d1 = data.view(np.ndarray) if padding > 0: d1 = np.hstack([d1[:padding], d1, d1[-padding:]]) if bidir: d1 = scipy.signal.lfilter(b, a, scipy.signal.lfilter(b, a, d1)[::-1])[::-1] else: d1 = scipy.signal.lfilter(b, a, d1) if padding > 0: d1 = d1[padding:-padding] if (hasattr(data, 'implements') and data.implements('MetaArray')): return MetaArray(d1, info=data.infoCopy()) else: return d1
def besselFilter(data, cutoff, order=1, dt=None, btype='low', bidir=True): """return data passed through bessel filter""" try: import scipy.signal except ImportError: raise Exception("besselFilter() requires the package scipy.signal.") if dt is None: try: tvals = data.xvals('Time') dt = (tvals[-1]-tvals[0]) / (len(tvals)-1) except: dt = 1.0 b,a = scipy.signal.bessel(order, cutoff * dt, btype=btype) return applyFilter(data, b, a, bidir=bidir) #base = data.mean() #d1 = scipy.signal.lfilter(b, a, data.view(ndarray)-base) + base #if (hasattr(data, 'implements') and data.implements('MetaArray')): #return MetaArray(d1, info=data.infoCopy()) #return d1
def butterworthFilter(data, wPass, wStop=None, gPass=2.0, gStop=20.0, order=1, dt=None, btype='low', bidir=True): """return data passed through bessel filter""" try: import scipy.signal except ImportError: raise Exception("butterworthFilter() requires the package scipy.signal.") if dt is None: try: tvals = data.xvals('Time') dt = (tvals[-1]-tvals[0]) / (len(tvals)-1) except: dt = 1.0 if wStop is None: wStop = wPass * 2.0 ord, Wn = scipy.signal.buttord(wPass*dt*2., wStop*dt*2., gPass, gStop) #print "butterworth ord %f Wn %f c %f sc %f" % (ord, Wn, cutoff, stopCutoff) b,a = scipy.signal.butter(ord, Wn, btype=btype) return applyFilter(data, b, a, bidir=bidir)
def differentiator(n, Hz=1): """ Return linear phase impulse response for a length *n* filter that approximates the differential operator. The sampling frequency is *Hz*. The filter length *n* must be even. The remez function returns type 3 (for *n* odd) and 4 (for *n* even) linear phase filters. However, type 3 linear phase filters are 0 at $\omega = 0$ and $\omega = \pi$. """ if n % 2 == 1: raise ValueError('the filter length n must be even') return scipy.signal.remez(n, [0, Hz / 2], [1], Hz=Hz, type='differentiator') * Hz * 2 * NP.pi
def __init__(self, n, axis, order=2, mode='valid'): """ Construct gradient operator for signal of dimension *n* for dimension *axis*. Use a filter kernel of length *order* (must be even). Use convolution type *mode*. """ self.n = n self.ndim = len(self.n) self.axis = axis if axis < 0 or axis >= self.ndim: raise ValueError('0 <= axis (= {0}) < ndim = {1}'.format(axis, self.ndim)) self.d = differentiator(order) h_list = [] for i in reversed(range(self.ndim)): if i == axis: h_list.append(self.d) else: h_list.append(NP.array([1])) super(GradientFilter, self).__init__(n, h_list, mode=mode)
def reSample( df , dt = None , xAxis = None , n = None , kind = 'linear') : """ re-sample the signal """ if type(df) == pd.Series : df = pd.DataFrame(df) f = interp1d( df.index, np.transpose(df.values) , kind=kind, axis=-1, copy=True, bounds_error=True, assume_sorted=True) if dt : end = int(+(df.index[-1] - df.index[0] ) / dt) * dt + df.index[0] xAxis = np.linspace( df.index[0] , end , 1+int(+(end - df.index[0] ) / dt) ) elif n : xAxis = np.linspace( df.index[0] , df.index[-1] , n ) elif xAxis == None : raise(Exception("reSample : either dt or xAxis should be provided" )) #For rounding issue, ensure that xAxis is within ts.xAxis #xAxis[ np.where( xAxis > np.max(df.index[:]) ) ] = df.index[ np.where( xAxis > np.max(df.index[:]) ) ] return pd.DataFrame( data = np.transpose(f(xAxis)), index = xAxis , columns = map( lambda x : "reSample("+ x +")" , df.columns ) )
def getPSD( df , dw = 0.05, roverlap = 0.5, window='hanning', detrend='constant') : """ Compute the power spectral density """ if type(df) == pd.Series : df = pd.DataFrame(df) nfft = int ( (2*pi / dw) / dx(df) ) nperseg = 2**int(log(nfft)/log(2)) noverlap = nperseg * roverlap """ Return the PSD of a time signal """ try : from scipy.signal import welch except : raise Exception("Welch function not found, please install scipy > 0.12") data = [] for iSig in range(df.shape[1]) : test = welch( df.values[:,iSig] , fs = 1. / dx(df) , window=window, nperseg=nperseg, noverlap=noverlap, nfft=nfft, detrend=detrend, return_onesided=True, scaling='density') data.append( test[1] / (2*pi) ) xAxis = test[0][:] * 2*pi return pd.DataFrame( data = np.transpose(data), index = xAxis , columns = [ "psd("+ str(x) +")" for x in df.columns ] )
def derivFFT(df, n=1 ) : """ Deriv a signal trought FFT, warning, edge can be a bit noisy... indexList : channel to derive n : order of derivation """ deriv = [] for iSig in range(df.shape[1]) : fft = np.fft.fft( df.values[:,iSig] ) #FFT freq = np.fft.fftfreq( df.shape[0] , dx(df) ) from copy import deepcopy fft0 = deepcopy(fft) if n>0 : fft *= (1j * 2*pi* freq[:])**n #Derivation in frequency domain else : fft[-n:] *= (1j * 2*pi* freq[-n:])**n fft[0:-n] = 0. tts = np.real(np.fft.ifft(fft)) tts -= tts[0] deriv.append( tts ) #Inverse FFT return pd.DataFrame( data = np.transpose(deriv), index = df.index , columns = [ "DerivFFT("+ x +")" for x in df.columns ] )
def load_segment(segment_path, old_segment_format=True, normalize_signal=False, resample_frequency=None): """ Convienience function for loading segments :param segment_path: Path to the segment file to load. :param old_segment_format: If True, the old format will be used. If False, the format backed by a pandas dataframe will be used. :param normalize_signal: If True, the signal will be normalized using the subject median and median absolute deviation. :param resample_frequency: If this is set to a number, the signal will be resampled to that frequency. :return: A Segment or DFSegment object with the data from the segment in *segment_path*. """ if normalize_signal: return load_and_standardize(segment_path, old_segment_format=old_segment_format) else: if old_segment_format: segment = Segment(segment_path) else: segment = DFSegment.from_mat_file(segment_path) if resample_frequency is not None: segment.resample_frequency(resample_frequency, inplace=True) return segment
def highpass_cnt(data, low_cut_hz, fs, filt_order=3, axis=0): """ Highpass signal applying **causal** butterworth filter of given order. Parameters ---------- data: 2d-array Time x channels low_cut_hz: float fs: float filt_order: int Returns ------- highpassed_data: 2d-array Data after applying highpass filter. """ if (low_cut_hz is None) or (low_cut_hz == 0): log.info("Not doing any highpass, since low 0 or None") return data.copy() b, a = scipy.signal.butter(filt_order, low_cut_hz / (fs / 2.0), btype='highpass') assert filter_is_stable(a) data_highpassed = scipy.signal.lfilter(b, a, data, axis=axis) return data_highpassed
def _damp_flux(flux, taperwidth): """Apply damping to the sides of the input flux. The damping applies to a width of size taperwidth on each side of the domain. Currently, uses a Tukey window. Inputs: flux input signal on which to damp sides (array) taperwidth width over which to apply damping at each boundary, as a fraction of domain size (scalar) Outputs: dampedFlux signal with edges damped (array) """ alpha = taperwidth * 2 # alpha sets the width of the damping for the entire signal, accounting for both left and right boundaries tukeyWindow = scipy.signal.tukey(len(flux), alpha) dampedFlux = flux * tukeyWindow return dampedFlux
def get_transfer_function(self, in_name, out_name, in_type='v', atol=0.0): # type: (str, str, str, float) -> TransferFunctionContinuous """Compute the transfer function between the two given nodes. Parameters ---------- in_name : str the input voltage/current node name. out_name : Union[str, List[str]] the output voltage node name. in_type : str set to 'v' for input voltage sources. Otherwise, current sources. atol : float absolute tolerance for checking zeros in the numerator. Used to filter out scipy warnings. Returns ------- system : TransferFunctionContinuous the scipy transfer function object. See scipy.signal package on how to use this object. """ num, den = self.get_num_den(in_name, out_name, in_type=in_type, atol=atol) return TransferFunctionContinuous(num, den)
def test_scipy(pyi_builder): pyi_builder.test_source( """ from distutils.version import LooseVersion # Test top-level SciPy importability. import scipy from scipy import * # Test hooked SciPy modules. import scipy.io.matlab import scipy.sparse.csgraph # Test problematic SciPy modules. import scipy.linalg import scipy.signal # SciPy >= 0.16 privatized the previously public "scipy.lib" package as # "scipy._lib". Since this package is problematic, test its # importability regardless of SciPy version. if LooseVersion(scipy.__version__) >= LooseVersion('0.16.0'): import scipy._lib else: import scipy.lib """)
def butter_low(dt_list, val, lowpass=1.0): import scipy.signal #val_mask = np.ma.getmaskarray(val) #dt is 300 s, 5 min dt_diff = np.diff(dt_list) if isinstance(dt_diff[0], float): dt_diff *= 86400. else: dt_diff = np.array([dt.total_seconds() for dt in dt_diff]) dt = malib.fast_median(dt_diff) #f is 0.00333 Hz #288 samples/day fs = 1./dt nyq = fs/2. order = 3 f_max = (1./(86400*lowpass)) / nyq b, a = scipy.signal.butter(order, f_max, btype='lowpass') #b, a = sp.signal.butter(order, (f_min, f_max), btype='bandstop') w, h = scipy.signal.freqz(b, a, worN=2000) # w_f = (nyq/np.pi)*w # w_f_days = 1/w_f/86400. #plt.plot(w_f_days, np.abs(h)) val_f = scipy.signal.filtfilt(b, a, val) return val_f
def is_periodic(aud_sample, SAMPLING_RATE = 8000): ''' :param aud_sample: Numpy 1D array rep of audio sample :param SAMPLING_RATE: Used to focus on human speech freq range :return: True if periodic, False if aperiodic ''' threshold = 20 # Use auto-correlation to find if there is enough periodicity in [50-400] Hz range values = signal.correlate(aud_sample, aud_sample, mode='full') # [50-400 Hz] corresponds to [2.5-20] ms OR [20-160] samples for 8 KHz sampling rate l_idx = int(SAMPLING_RATE*2.5/1000) r_idx = int(SAMPLING_RATE*20/1000) values = values[len(values)/2:] subset_values = values[l_idx:r_idx] if np.argmax(subset_values) < threshold: return False else: return True
def test_despreader_peaks(pos): """Test correlation peaks with signal at different positions.""" estimator = soa_estimator.SoaEstimator(TEMPLATE, None, BLOCK_LEN, len(TEMPLATE)) block = gen_block(pos) fft = np.fft.fft(block) corr = estimator.despread(fft) corr_abs = np.abs(corr) assert len(corr) == BLOCK_LEN - len(TEMPLATE) + 1 if pos <= BLOCK_LEN - len(TEMPLATE): peak_idx = np.argmax(corr_abs) non_peak = np.delete(corr_abs, peak_idx) assert peak_idx == pos assert corr_abs[peak_idx] >= PEAK_MAG - 0.1 np.testing.assert_array_less(non_peak, MAX_SIDEBAND + 0.1) else: np.testing.assert_array_less(corr_abs, MAX_SIDEBAND + 0.1)
def DCT4(samples): """ Method to create DCT4 transformation using DCT3 Arguments : samples : (1D Array) Input samples to be transformed Returns : y : (1D Array) Transformed output samples """ # Initialize samplesup=np.zeros(2*N, dtype = np.float32) # Upsample signal: samplesup[1::2]=samples y=spfft.dct(samplesup,type=3,norm='ortho')*np.sqrt(2)#/2 return y[0:N] #The DST4 transform:
def DST4(samples): """ Method to create DST4 transformation using DST3 Arguments : samples : (1D Array) Input samples to be transformed Returns : y : (1D Array) Transformed output samples """ # Initialize samplesup=np.zeros(2*N, dtype = np.float32) # Upsample signal # Reverse order to obtain DST4 out of DCT4: #samplesup[1::2]=np.flipud(samples) samplesup[0::2] = samples y = spfft.dst(samplesup,type=3,norm='ortho')*np.sqrt(2)#/2 # Flip sign of every 2nd subband to obtain DST4 out of DCT4 #y=(y[0:N])*(((-1)*np.ones(N, dtype = np.float32))**range(N)) return y[0: N]
def x2polyphase(x, N): """ Method to convert input signal x (a row vector) into a polphase row vector of blocks with length N. Arguments : x : (1D Array) Input samples N : (int) Number of subbands Returns : y : (3D Array) Polyphase representation of the input signal Author : Gerald Schuller ('shl'), Dec/2/15 """ #Number of blocks in the signal: L=int(np.floor(len(x)/N)) xp=np.zeros((1,N,L), dtype = np.float32) for m in range(L): xp[0,:,m] = x[m*N: (m*N+N)] return xp
def bandstop(data, fs, start, stop, order = 3): """ Apply bandstop filter on data. :param data: 3D video data :param fs: sampling frequency :param order: the order of butter filter used. :param start: lower frequency where band starts :param stop: higher frequency where band ends :return: the filtered 3d data set. """ nyq = 0.5 * fs high = stop / nyq low = start / nyq order = order b, a = scipy.signal.butter(order, [low, high], btype='bandstop') # zi = scipy.signal.lfiltic(b, a, y=[0.]) dataf = scipy.signal.lfilter(b, a, data) return dataf
def updateUi(self): if self.previewCheckBox.checkState(): # Draw preview of filtered signal currentlySelected = self.stackedWidget.currentWidget() function = currentlySelected.returnFunction() input_data = self.centralWidget.getData() previewElement = ChainElement(name="Preview") previewElement.function = function previewElement.input_ = input_data previewElement.update() output_data = previewElement.output data = (*output_data, input_data[1]) if self.previewPlot: # PlotWidget created already self.previewPlot.redraw(data) else: # PlotWidget not yet created self.previewPlot = PlotWidget(data) self.previewDlg = QDialog() self.previewDlg.setWindowTitle("Signal Preview") grid = QGridLayout() grid.addWidget(self.previewPlot) self.previewDlg.setLayout(grid) self.previewDlg.show()
def freq_shift_ramp(x, hza, dt): N_orig = len(x) N_padded = 2**nextpow2(N_orig) t = numpy.arange(0, N_padded) f_shift = numpy.linspace(hza[0], hza[1], len(x)) f_shift = numpy.append(f_shift, hza[1]*numpy.ones(N_padded-len(x))) pc1 = f_shift[:-1] - f_shift[1:] phase_correction = numpy.add.accumulate( t * dt * numpy.append(numpy.zeros(1), 2*numpy.pi*pc1)) lo = numpy.exp(1j*(2*numpy.pi*dt*f_shift*t + phase_correction)) x0 = numpy.append(x, numpy.zeros(N_padded-N_orig, x.dtype)) h = scipy.signal.hilbert(x0)*lo ret = h[:N_orig].real return ret # avoid most of the round-up-to-power-of-two penalty by # doing log-n shifts. discontinuity at boundaries, # but that's OK for JT65 2048-sample symbols.
def __init__(self, from_rate, to_rate): self.from_rate = from_rate self.to_rate = to_rate if self.from_rate > self.to_rate: # prepare a filter to precede resampling. self.filter = butter_lowpass(0.45 * self.to_rate, from_rate, 7) self.zi = scipy.signal.lfiltic(self.filter[0], self.filter[1], [0]) # total number of input and output samples, # so we can insert/delete to keep long-term # rates correct. self.nin = 0 self.nout = 0 # how much will output be delayed? # in units of output samples.
def cwt_time(data, wavelet, widths, dt, axis): # wavelets can be complex so output is complex output = np.zeros((len(widths),) + data.shape, dtype=np.complex) # compute in time slices = [None for _ in data.shape] slices[axis] = slice(None) for ind, width in enumerate(widths): # number of points needed to capture wavelet M = 10 * width / dt # times to use, centred at zero t = np.arange((-M + 1) / 2., (M + 1) / 2.) * dt # sample wavelet and normalise norm = (dt / width) ** .5 wavelet_data = norm * wavelet(t, width) output[ind, :] = scipy.signal.fftconvolve(data, wavelet_data[slices], mode='same') return output
def coi(self): """The Cone of Influence is the region near the edges of the input signal in which edge effects may be important. Return a tuple (T, S) that describes the edge of the cone of influence as a single line in (time, scale). """ Tmin = self.time.min() Tmax = self.time.max() Tmid = Tmin + (Tmax - Tmin) / 2 s = np.logspace(np.log10(self.scales.min()), np.log10(self.scales.max()), 100) c1 = Tmin + self.wavelet.coi(s) c2 = Tmax - self.wavelet.coi(s) C = np.hstack((c1[np.where(c1 < Tmid)], c2[np.where(c2 > Tmid)])) S = np.hstack((s[np.where(c1 < Tmid)], s[np.where(c2 > Tmid)])) # sort w.r.t time iC = C.argsort() sC = C[iC] sS = S[iC] return sC, sS
def mean_csd(perievent_lfp1, perievent_lfp2, window, fs): """Computes the mean Cross-Spectral Density (CSD) between perievent slices Parameters ---------- perievent_lfp1 : nept.AnalogSignal perievent_lfp2 : nept.AnalogSignal window : int fs : int Returns ------- freq : np.array power : np.array """ freq, power = scipy.signal.csd(perievent_lfp1.data.T, perievent_lfp2.data.T, fs=fs, nperseg=window, nfft=int(window*2)) return freq, np.mean(power, axis=0)
def mean_coherence(perievent_lfp1, perievent_lfp2, window, fs): """Computes the mean coherence between perievent slices Parameters ---------- perievent_lfp1 : nept.AnalogSignal perievent_lfp2 : nept.AnalogSignal window : int fs : int Returns ------- freq : np.array coherence : np.array """ freq, coherence = scipy.signal.coherence( perievent_lfp1.data.T, perievent_lfp2.data.T, fs=fs, nperseg=window, nfft=int(window*2)) return freq, np.mean(coherence, axis=0)
def add_delta_deltas(filterbanks, name=None): """Compute time first and second-order derivative channels. Args: filterbanks: float32 tensor with shape [batch_size, len, num_bins, 1] name: scope name Returns: float32 tensor with shape [batch_size, len, num_bins, 3] """ delta_filter = np.array([2, 1, 0, -1, -2]) delta_delta_filter = scipy.signal.convolve(delta_filter, delta_filter, "full") delta_filter_stack = np.array( [[0] * 4 + [1] + [0] * 4, [0] * 2 + list(delta_filter) + [0] * 2, list(delta_delta_filter)], dtype=np.float32).T[:, None, None, :] delta_filter_stack /= np.sqrt( np.sum(delta_filter_stack**2, axis=0, keepdims=True)) filterbanks = tf.nn.conv2d( filterbanks, delta_filter_stack, [1, 1, 1, 1], "SAME", data_format="NHWC", name=name) return filterbanks
def read_wave_file(filename): """ Read a wave file from disk # Arguments filename : the name of the wave file # Returns (fs, x) : (sampling frequency, signal) """ if (not os.path.isfile(filename)): raise ValueError("File does not exist") s = wave.open(filename, 'rb') if (s.getnchannels() != 1): raise ValueError("Wave file should be mono") # if (s.getframerate() != 22050): # raise ValueError("Sampling rate of wave file should be 16000") strsig = s.readframes(s.getnframes()) x = np.fromstring(strsig, np.short) fs = s.getframerate() s.close() x = x/32768.0 return fs, x
def read_wave_file_not_normalized(filename): """ Read a wave file from disk # Arguments filename : the name of the wave file # Returns (fs, x) : (sampling frequency, signal) """ if (not os.path.isfile(filename)): raise ValueError("File does not exist") s = wave.open(filename, 'rb') if (s.getnchannels() != 1): raise ValueError("Wave file should be mono") # if (s.getframerate() != 22050): # raise ValueError("Sampling rate of wave file should be 16000") strsig = s.readframes(s.getnframes()) x = np.fromstring(strsig, np.short) fs = s.getframerate() s.close() return fs, x
def frequency_phase_rotation(values, angle, deg=False): window_size = 64 noverlap = 32 window = signal.hann(window_size, sym=False) if not signal.check_COLA(window, len(window), noverlap): raise Exception('check_COLA failed.') f, t, Zxx = signal.stft(values, window=window, nperseg=window_size, noverlap=noverlap ) Zxx_rotated = np.zeros(Zxx.shape, dtype=np.complex) for freq_idx in range(Zxx.shape[0]): # Loop over all frequencies Zxx_rotated[freq_idx] = phase_rotation(Zxx[freq_idx], angle, deg) t, x = signal.istft(Zxx_rotated, window=window, nperseg=window_size, noverlap=noverlap ) return t, x
def adaptiveDetrend(data, x=None, threshold=3.0): """Return the signal with baseline removed. Discards outliers from baseline measurement.""" try: import scipy.signal except ImportError: raise Exception("adaptiveDetrend() requires the package scipy.signal.") if x is None: x = data.xvals(0) d = data.view(np.ndarray) d2 = scipy.signal.detrend(d) stdev = d2.std() mask = abs(d2) < stdev*threshold #d3 = where(mask, 0, d2) #d4 = d2 - lowPass(d3, cutoffs[1], dt=dt) lr = scipy.stats.linregress(x[mask], d[mask]) base = lr[1] + lr[0]*x d4 = d - base if (hasattr(data, 'implements') and data.implements('MetaArray')): return MetaArray(d4, info=data.infoCopy()) return d4
def checker(input_var, desire_size): ''' check if debug = 1 ''' if input_var is None: print('input_variable does not exist!') if desire_size is None: print('desire_size does not exist!') dd = numpy.size(desire_size) dims = numpy.shape(input_var) # print('dd=',dd,'dims=',dims) if numpy.isnan(numpy.sum(input_var[:])): print('input has NaN') if numpy.ndim(input_var) < dd: print('input signal has too few dimensions') if dd > 1: if dims[0:dd] != desire_size[0:dd]: print(dims[0:dd]) print(desire_size) print('input signal has wrong size1') elif dd == 1: if dims[0] != desire_size: print(dims[0]) print(desire_size) print('input signal has wrong size2') if numpy.mod(numpy.prod(dims), numpy.prod(desire_size)) != 0: print('input signal shape is not multiples of desired size!')
def __init__(self, n, axis, order=3, mode='valid'): """ Construct a second-order gradient operator for signal of dimension *n* for dimension *axis*. Use a filter kernel of length *order* (must be odd). Use convolution type *mode*. """ # assert that the filter length is odd assert(order % 2 == 1) self.n = n self.ndim = len(self.n) self.axis = axis if axis < 0 or axis >= self.ndim: raise ValueError('0 <= axis (= {0}) < ndim = {1}'.format(axis, self.ndim)) self.d = differentiator(int(order/2) + 1) self.d2 = NP.convolve(self.d, self.d) self.mode = mode h_list = [] m = [] for i in reversed(range(self.ndim)): if i == axis: h_list.append(self.d2) else: h_list.append(NP.array([1])) m.append(len(h_list[-1])) self.m = m if mode == 'circ': n_prime = array(n) - m + 1 super(Gradient2Filter, self).__init__(n_prime, h_list, mode=mode) else: super(Gradient2Filter, self).__init__(n, h_list, mode=mode)
def rampDf(df , rStart, rEnd) : """ Ramp the signal between rStart and rEnd (in place) """ a = ramp_v( df.index[:] , rStart, rEnd ) for c in df.columns : df[c][:] *= a[:] return df
def fillCos( A = 1.0, T = 10. , tMin = 0. , tMax = 50. , n = 200) : """ for testing purpose, fill signal with a cosine """ xAxis = np.linspace( tMin , tMax , n ) data = np.zeros( (n,2) ) data[:,0] = A * np.cos(2*pi*xAxis/T) data[:,1] = A * np.sin(2*pi*xAxis/T) return pd.DataFrame( data=data , index = xAxis , columns = [ "Cos" , "Sin" ] )
def slidingFFT( se, T , n = 1 , tStart = None , preSample = False , nHarmo = 5 , kind = abs , phase = None) : """ Harmonic analysis on a sliding windows se : Series to analyse T : Period tStart : start _xAxis n : size of the sliding windows in period. reSample : reSample the signal so that a period correspond to a integer number of time step nHarmo : number of harmonics to return kind : module, real, imaginary part, as a function (abs, np.imag, np.real ...) phase : phase shift (for instance to extract in-phase with cos or sin) """ if (type(se) == pd.DataFrame) : if len(se.columns) == 1 : se = se.iloc[:,0] nWin = int(0.5 + n*T / dx(se) ) #ReSample to get round number of time step per period if preSample : new = reSample( se, dt = n*T / (nWin) ) else : new = se signal = new.values[:] #Allocate results res = np.zeros( (new.shape[0] , nHarmo ) ) for iWin in range(new.shape[0] - nWin) : sig = signal[ iWin : iWin+nWin ] #windows fft = np.fft.fft( sig ) #FTT if phase !=None : #Phase shift fft *= np.exp( 1j* ( 2*pi*(iWin*1./nWin) + phase )) fftp = kind( fft ) #Take module, real or imaginary part spectre = 2*fftp/(nWin) #Scale for ih in range(nHarmo): res[iWin, ih] = spectre[ih*n] if ih == 0 : res[iWin, ih] /= 2.0 #if ih == 0 : res[iWin, ih] = 2.0 return pd.DataFrame( data = res , index = new.index , columns = map( lambda x : "Harmo {:} ({:})".format(x , se.name ) , range(nHarmo) ) )
def deriv(df, n=1) : """ Deriv a signal through finite difference """ from signalTreatment import der , der2 deriv = [] if n == 1 : for iSig in range(df.shape[1]) : deriv.append( der( df.values[:,iSig] , df.index ) ) elif n == 2 : for iSig in range(df.shape[1]) : deriv.append( der2( df.values[:,iSig] , df.index[:] ) ) return pd.DataFrame( data = np.transpose(deriv), index = df.index , columns = [ "Deriv("+ x +")" for x in df.columns ] )
def testPSD(display = True) : """ Read a signal, compute PSD and compare standard deviation to m0 """ df = read( r'../../testData/motion.dat' ) RsSig = np.std( df.values[:,0] ) * 4 print ("Rs from sigma " , RsSig) psd = getPSD(df ) RsM0 = (np.sum(psd.values[:,0])* dx(psd) ) **0.5 * 4.004 print ("Rs from m0 ",RsM0) psd = psd[0.1 : 2.0] if display : df.plot( ) psd.plot() plt.show()
def GetSn(y, range_ff=[0.25, 0.5], method='mean'): """ Estimate noise power through the power spectral density over the range of large frequencies Parameters ---------- y : array, shape (T,) One dimensional array containing the fluorescence intensities with one entry per time-bin. range_ff : (1,2) array, nonnegative, max value <= 0.5 range of frequency (x Nyquist rate) over which the spectrum is averaged method : string, optional, default 'mean' method of averaging: Mean, median, exponentiated mean of logvalues Returns ------- sn : noise standard deviation """ ff, Pxx = scipy.signal.welch(y) ind1 = ff > range_ff[0] ind2 = ff < range_ff[1] ind = np.logical_and(ind1, ind2) Pxx_ind = Pxx[ind] sn = { 'mean': lambda Pxx_ind: np.sqrt(np.mean(Pxx_ind / 2)), 'median': lambda Pxx_ind: np.sqrt(np.median(Pxx_ind / 2)), 'logmexp': lambda Pxx_ind: np.sqrt(np.exp(np.mean(np.log(Pxx_ind / 2)))) }[method](Pxx_ind) return sn
def discount_cumsum(x, discount): # See https://docs.scipy.org/doc/scipy/reference/tutorial/signal.html#difference-equation-filtering # Here, we have y[t] - discount*y[t+1] = x[t] # or rev(y)[t] - discount*rev(y)[t-1] = rev(x)[t] return scipy.signal.lfilter([1], [1, float(-discount)], x[::-1], axis=0)[::-1]
def filter_records_fir(coeffs, num_taps, # ignored decim_factor, recs, record_length, # ignored (uses shape of recs) num_records, # ignored (uses shape of recs) result): # split out a, b coefficients b = coeffs[:len(coeffs)//2] a = coeffs[len(coeffs)//2:] filtered_signal = scipy.signal.lfilter(b, a, recs) result[:] = np.copy(filtered_signal[:, ::decim_factor], order="C")
def filter_records_iir(coeffs, order, # ignored recs, record_length, # ignored (uses shape of recs) num_records, # ignored (uses shape of recs) result): # split out a, b coefficients b = coeffs[:len(coeffs)//2] a = coeffs[len(coeffs)//2:] result[:] = scipy.signal.lfilter(b, a, recs)