Python numpy 模块,blackman() 实例源码
我们从Python开源项目中,提取了以下30个代码示例,用于说明如何使用numpy.blackman()。
def run_mgc_example():
import matplotlib.pyplot as plt
fs, x = wavfile.read("test16k.wav")
pos = 3000
fftlen = 1024
win = np.blackman(fftlen) / np.sqrt(np.sum(np.blackman(fftlen) ** 2))
xw = x[pos:pos + fftlen] * win
sp = 20 * np.log10(np.abs(np.fft.rfft(xw)))
mgc_order = 20
mgc_alpha = 0.41
mgc_gamma = -0.35
mgc_arr = win2mgc(xw, order=mgc_order, alpha=mgc_alpha, gamma=mgc_gamma, verbose=True)
xwsp = 20 * np.log10(np.abs(np.fft.rfft(xw)))
sp = mgc2sp(mgc_arr, mgc_alpha, mgc_gamma, fftlen)
plt.plot(xwsp)
plt.plot(20. / np.log(10) * np.real(sp), "r")
plt.xlim(1, len(xwsp))
plt.show()
def run_mgc_example():
import matplotlib.pyplot as plt
fs, x = wavfile.read("test16k.wav")
pos = 3000
fftlen = 1024
win = np.blackman(fftlen) / np.sqrt(np.sum(np.blackman(fftlen) ** 2))
xw = x[pos:pos + fftlen] * win
sp = 20 * np.log10(np.abs(np.fft.rfft(xw)))
mgc_order = 20
mgc_alpha = 0.41
mgc_gamma = -0.35
mgc_arr = win2mgc(xw, order=mgc_order, alpha=mgc_alpha, gamma=mgc_gamma, verbose=True)
xwsp = 20 * np.log10(np.abs(np.fft.rfft(xw)))
sp = mgc2sp(mgc_arr, mgc_alpha, mgc_gamma, fftlen)
plt.plot(xwsp)
plt.plot(20. / np.log(10) * np.real(sp), "r")
plt.xlim(1, len(xwsp))
plt.show()
def transform(self, X_indices, y_indices):
X_indices, y_indices = super(IndexBatchIterator, self).transform(X_indices, y_indices)
[count] = X_indices.shape
# Use preallocated space
X = self.Xbuf[:count]
Y = self.Ybuf[:count]
window = np.blackman(SAMPLE_SIZE//DOWNSAMPLE)[None,:]
for i, ndx in enumerate(X_indices):
if ndx == -1:
ndx = np.random.randint(len(self.source.events))
augmented = self.augmented[:,ndx:ndx+SAMPLE_SIZE]
X[i] = augmented[:,::-1][:,::DOWNSAMPLE]
if y_indices is not None:
Y[i] = self.source.events[ndx]
Y = None if (y_indices is None) else Y
return X, Y
def HeterodynWithF0Track(x, tf0, f0, sr=1,
nwind=1024, nhop=512,
windfunc=np.blackman):
'''
Calculates the amplitude near frequency f0 in x
(f0 is time-varying, values given at tf0
nwind should be at least 3 periods if the signal
is periodic.
'''
valid_idx = np.logical_not(np.isnan(f0))
tx = np.arange(len(x))/float(sr)
f0s = np.interp(tx, tf0[valid_idx], f0[valid_idx])
phs = np.cumsum(2*np.pi*f0s/sr)
sinsig = np.exp(1j*phs)
hamp, t = FuncWind(np.sum, x*sinsig, power=1, sr=sr,
nwind=nwind, nhop=nhop,
windfunc=windfunc)
return np.array(hamp)*2, np.array(t)
def SpecCentWind(x, sr=1, nwind=1024, nhop=512, windfunc=np.blackman):
'''
Calculates the SpectralCentroid of x, in frames of
length nwind, and in steps of nhop. windfunc is used as
windowing function
nwind should be at least 3 periods if the signal is periodic.
'''
ff = np.arange(nwind/2)/float(nwind)*sr
def SCvec(xw):
xf = np.fft.fft(xw)
xf2 = xf[:nwind/2]
return sum(np.abs(xf2)*ff)/sum(np.abs(xf2))
amp, t = FuncWind(SCvec, x, power=0, sr=sr,
nwind=nwind, nhop=nhop)
return np.array(amp), np.array(t)
def design_windowed_sinc_lpf(fc, bw):
N = Filter.get_filter_length_from_bandwidth(bw)
# Compute sinc filter impulse response
h = np.sinc(2 * fc * (np.arange(N) - (N - 1) / 2.))
# We use blackman window function
w = np.blackman(N)
# Multiply sinc filter with window function
h = h * w
# Normalize to get unity gain
h_unity = h / np.sum(h)
return h_unity
def iFFT(Y, output_length=None, window=False):
""" Inverse real-valued Fourier Transform
Parameters
----------
Y : array_like
Frequency domain data [Nsignals x Nbins]
output_length : int, optional
Lenght of returned time-domain signal (Default: 2 x len(Y) + 1)
win : boolean, optional
Weights the resulting time-domain signal with a Hann
Returns
-------
y : array_like
Reconstructed time-domain signal
"""
Y = _np.atleast_2d(Y)
y = _np.fft.irfft(Y, n=output_length)
if window:
if window not in {'hann', 'hamming', 'blackman', 'kaiser'}:
raise ValueError('Selected window must be one of hann, hamming, blackman or kaiser')
no_of_signals, no_of_samples = y.shape
if window == 'hann':
window_array = _np.hanning(no_of_samples)
elif window == 'hamming':
window_array = _np.hamming(no_of_samples)
elif window == 'blackman':
window_array = _np.blackman(no_of_samples)
elif window == 'kaiser':
window_array = _np.kaiser(no_of_samples, 3)
y = window_array * y
return y
def blackman(M):
"""Returns the Blackman window.
The Blackman window is defined as
.. math::
w(n) = 0.42 - 0.5 \\cos\\left(\\frac{2\\pi{n}}{M-1}\\right)
+ 0.08 \\cos\\left(\\frac{4\\pi{n}}{M-1}\\right)
\\qquad 0 \\leq n \\leq M-1
Args:
M (:class:`~int`):
Number of points in the output window. If zero or less, an empty
array is returned.
Returns:
~cupy.ndarray: Output ndarray.
.. seealso:: :func:`numpy.blackman`
"""
if M < 1:
return from_data.array([])
if M == 1:
return basic.ones(1, float)
n = ranges.arange(0, M)
return 0.42 - 0.5 * trigonometric.cos(2.0 * numpy.pi * n / (M - 1))\
+ 0.08 * trigonometric.cos(4.0 * numpy.pi * n / (M - 1))
def smooth(x,window_len=11,window='hanning'):
if x.ndim != 1:
raise ValueError, "smooth only accepts 1 dimension arrays."
if x.size < window_len:
return x
# raise ValueError, "Input vector needs to be bigger than window size."
if window_len<3:
return x
if not window in ['flat', 'hanning', 'hamming', 'bartlett', 'blackman']:
raise ValueError, "Window is one of 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'"
s=numpy.r_[x[window_len-1:0:-1],x,x[-1:-window_len:-1]]
if window == 'flat': #moving average
w=numpy.ones(window_len,'d')
else:
w=eval('numpy.'+window+'(window_len)')
y=numpy.convolve(w/w.sum(),s,mode='valid')
y = y[(window_len/2-1) : -(window_len/2)-1]
return y
def FuncWind(func, x, sr=1, nwind=1024, nhop=512, power=1,
windfunc=np.blackman):
'''
Applies a function window by window to a time series
'''
nsam = len(x)
ist = 0
iend = ist+nwind
t = []
ret = []
wind = windfunc(nwind)
if power > 0:
wsumpow = sum(wind**power)
else:
wsumpow = 1.
while (iend < nsam):
thisx = x[ist:iend]
xw = thisx*wind
ret.append(func(xw)/wsumpow)
t.append(float(ist+iend)/2.0/float(sr))
ist = ist+nhop
iend = ist+nwind
return np.array(ret), np.array(t)
def RMSWind(x, sr=1, nwind=1024, nhop=512, windfunc=np.blackman):
'''
Calculates the RMS amplitude amplitude of x, in frames of
length nwind, and in steps of nhop. windfunc is used as
windowing function.
nwind should be at least 3 periods if the signal is periodic.
'''
nsam = len(x)
ist = 0
iend = ist+nwind
t = []
ret = []
wind = windfunc(nwind)
wsum2 = np.sum(wind**2)
while (iend < nsam):
thisx = x[ist:iend]
xw = thisx*wind
ret.append(np.sum(xw*xw/wsum2))
t.append(float(ist+iend)/2.0/float(sr))
ist = ist+nhop
iend = ist+nwind
return np.sqrt(np.array(ret)), np.array(t)
def Heterodyn(x, f, sr=1, nwind=1024, nhop=512,
windfunc=np.blackman):
'''
Calculates the amplitude near frequency f in x
nwind should be at least 3 periods if the signal is periodic.
'''
sinsig = np.exp(2j*np.pi*np.arange(len(x))*f/float(sr))
hamp, t = FuncWind(np.sum, x*sinsig, power=1, sr=sr,
nwind=nwind, nhop=nhop,
windfunc=windfunc)
return np.array(hamp)*2, np.array(t)
def SpecFlux(x, sr=1, nwind=1024, nhop=512, minf=0,
maxf=np.inf, windfunc=np.blackman):
'''
Calculates the spectral flux in sunud
'''
nsam = len(x)
# first window
ist = 0
iend = ist+nwind
t = []
res = []
wind = windfunc(nwind)
minbin = int(minf/sr*nwind)
maxbinf = (float(maxf)/sr*nwind)
if maxbinf > nwind:
maxbin = nwind
else:
maxbin = int(maxbinf)
while (iend < nsam-nhop):
thisx = x[ist:iend]
nextx = x[ist+nhop:iend+nhop]
ff = np.abs(np.fft.fft(thisx*wind))
fl = np.abs(np.fft.fft(nextx*wind))
res.append(np.sqrt(sum((ff[minbin:maxbin]-fl[minbin:maxbin])**2)))
t.append(float(ist+iend+nhop)/2.0/float(sr))
ist = ist+nhop
iend = ist+nwind
return np.array(res), np.array(t)
def _plot_multiple_spectrum(signals, fs, labels, colors):
"""
plot the signals spectrum
"""
s = Spectrum(block_length=min(2048, signals[0].size), fs=fs,
wfunc=np.blackman)
for sig in signals:
s.periodogram(sig, hold=True)
s.plot(labels=labels, colors=colors, fscale='lin')
def _design(self):
"""Designs the FIR filter"""
# the length of the filter
order = self._get_order()
half_order = (order - 1) // 2
w = np.blackman(order)
t = np.linspace(-half_order, half_order, order)
phase = (2.0 * np.pi * self.fc / self.fs) * t
car = np.cos(phase)
fir = w * car
# the filter must be symmetric, in order to be zero-phase
assert np.all(np.abs(fir - fir[::-1]) < 1e-15)
# remove the constant component by forcing fir.sum() = 0
if self.zero_mean:
fir -= fir.sum() / order
gain = np.sum(fir * car)
self.fir = fir * (1.0 / gain)
# add the imaginary part to have a complex wavelet
if self.extract_complex:
car_imag = np.sin(phase)
fir_imag = w * car_imag
self.fir_imag = fir_imag * (1.0 / gain)
return self
def get_normalized_templates(template_streams):
templates = []
for ts in template_streams:
t = data_conversion.stream2array(ts)
t = t.astype(np.float32)
t -= np.mean(t, axis=1, keepdims=True)
template_max = np.amax(np.abs(t))
t /= template_max
t *= np.blackman(ts[0].stats.npts)
templates.append(t)
return templates
def spectrum_process(data, sfreq, cfreq, toffset, modulus, integration, bins, log_scale, zscale, detrend, title, clr):
""" Break spectrum by modulus and display each block. Integration here acts
as a pure average on the spectral data.
"""
if detrend:
dfn = matplotlib.mlab.detrend_mean
else:
dfn = matplotlib.mlab.detrend_none
win = numpy.blackman(bins)
if modulus:
block = 0
block_size = integration * modulus
block_toffset = toffset
while block < len(data) / block_size:
vblock = data[block * block_size:block * block_size + modulus]
pblock, freq = matplotlib.mlab.psd(
vblock, NFFT=bins, Fs=sfreq, detrend=dfn, window=win, scale_by_freq=False)
# complete integration
for idx in range(1, integration):
vblock = data[block * block_size + idx *
modulus:block * block_size + idx * modulus + modulus]
pblock_n, freq = matplotlib.mlab.psd(
vblock, NFFT=bins, Fs=sfreq, detrend=dfn, window=matplotlib.mlab.window_hanning, scale_by_freq=False)
pblock += pblock_n
pblock /= integration
yield spectrum_plot(pblock, freq, cfreq, block_toffset,
log_scale, zscale, title, clr)
block += 1
block_toffset += block_size / sfreq
else:
pdata, freq = matplotlib.mlab.psd(
data, NFFT=bins, Fs=sfreq, detrend=dfn, window=win, scale_by_freq=False)
yield spectrum_plot(pdata, freq, cfreq, toffset,
log_scale, zscale, title, clr)
def smooth(x,window_len=11,window='hanning'):
"""
Smooth the data using a window with requested size.
Copied from http://wiki.scipy.org/Cookbook/SignalSmooth
This method is based on the convolution of a scaled window with the signal.
The signal is prepared by introducing reflected copies of the signal
(with the window size) in both ends so that transient parts are minimized
in the begining and end part of the output signal.
:param x: the input signal
:param window_len: the dimension of the smoothing window; should be an odd integer
:param window: the type of window from 'flat', 'hanning', 'hamming', 'bartlett', 'blackman' flat window will produce a moving average smoothing.
:returns: the smoothed signal
Example
>>> t=linspace(-2,2,0.1)
>>> x=sin(t)+randn(len(t))*0.1
>>> y=smooth(x)
.. seealso:: numpy.hanning, numpy.hamming, numpy.bartlett, numpy.blackman, numpy.convolve, scipy.signal.lfilter
.. note:: length(output) != length(input), to correct this: return y[(window_len/2-1):-(window_len/2)] instead of just y.
.. todo:: the window parameter could be the window itself if an array instead of a string
"""
if x.ndim != 1:
raise ValueError("smooth only accepts 1 dimension arrays.")
if x.size < window_len:
raise ValueError("Input vector needs to be bigger than window size.")
if window_len<3:
return x
if not window in ['flat', 'hanning', 'hamming', 'bartlett', 'blackman']:
raise ValueError("Window is on of 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'")
s=numpy.r_[x[window_len-1:0:-1],x,x[-1:-window_len:-1]]
#print(len(s))
if window == 'flat': #moving average
w=numpy.ones(window_len,'d')
else:
w=eval('numpy.'+window+'(window_len)')
y=numpy.convolve(w/w.sum(),s,mode='valid')
return y
def smooth(x,window_len=11,window='flat'):
"""smooth the data using a window with requested size.
This method is based on the convolution of a scaled window with the signal.
The signal is prepared by introducing reflected copies of the signal
(with the window size) in both ends so that transient parts are minimized
in the beginning and end part of the output signal.
:param x: the input signal
:param window_len: the dimension of the smoothing window; should be an odd integer
:param window: the type of window from 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'
flat window will produce a moving average smoothing.
:return: the smoothed signal
example::
t=linspace(-2,2,0.1)
x=sin(t)+randn(len(t))*0.1
y=smooth(x)
:see also: numpy.hanning, numpy.hamming, numpy.bartlett, numpy.blackman, numpy.convolve,
scipy.signal.lfilter
TODO: the window parameter could be the window itself if an array instead of a string
"""
if x.ndim != 1:
raise ValueError("smooth only accepts 1 dimension arrays.")
if x.size < window_len:
raise ValueError("Input vector needs to be bigger than window size.")
if window_len < 3:
return x
if not window in ['flat', 'hanning', 'hamming', 'bartlett', 'blackman']:
raise ValueError("Window is on of 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'")
s=numpy.r_[2*x[0]-x[window_len:1:-1],x,2*x[-1]-x[-1:-window_len:-1]]
#print(len(s))
if window == 'flat': #moving average
w = numpy.ones(window_len,'d')
else:
w = eval('numpy.' + window + '(window_len)')
y = numpy.convolve(w/w.sum(), s, mode='same')
return y[window_len-1:-window_len+1]
def smooth(x,window_len=11,window='flat'):
"""smooth the data using a window with requested size.
This method is based on the convolution of a scaled window with the signal.
The signal is prepared by introducing reflected copies of the signal
(with the window size) in both ends so that transient parts are minimized
in the beginning and end part of the output signal.
:param x: the input signal
:param window_len: the dimension of the smoothing window; should be an odd integer
:param window: the type of window from 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'
flat window will produce a moving average smoothing.
:return: the smoothed signal
example::
t=linspace(-2,2,0.1)
x=sin(t)+randn(len(t))*0.1
y=smooth(x)
:see also: numpy.hanning, numpy.hamming, numpy.bartlett, numpy.blackman, numpy.convolve,
scipy.signal.lfilter
TODO: the window parameter could be the window itself if an array instead of a string
"""
if x.ndim != 1:
raise ValueError("smooth only accepts 1 dimension arrays.")
if x.size < window_len:
raise ValueError("Input vector needs to be bigger than window size.")
if window_len < 3:
return x
if not window in ['flat', 'hanning', 'hamming', 'bartlett', 'blackman']:
raise ValueError("Window is on of 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'")
s=numpy.r_[2*x[0]-x[window_len:1:-1],x,2*x[-1]-x[-1:-window_len:-1]]
#print(len(s))
if window == 'flat': #moving average
w = numpy.ones(window_len,'d')
else:
w = eval('numpy.' + window + '(window_len)')
y = numpy.convolve(w/w.sum(), s, mode='same')
return y[window_len-1:-window_len+1]
def smooth(x,window_len=11,window='flat'):
"""smooth the data using a window with requested size.
This method is based on the convolution of a scaled window with the signal.
The signal is prepared by introducing reflected copies of the signal
(with the window size) in both ends so that transient parts are minimized
in the beginning and end part of the output signal.
:param x: the input signal
:param window_len: the dimension of the smoothing window; should be an odd integer
:param window: the type of window from 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'
flat window will produce a moving average smoothing.
:return: the smoothed signal
example::
t=linspace(-2,2,0.1)
x=sin(t)+randn(len(t))*0.1
y=smooth(x)
:see also: numpy.hanning, numpy.hamming, numpy.bartlett, numpy.blackman, numpy.convolve,
scipy.signal.lfilter
TODO: the window parameter could be the window itself if an array instead of a string
"""
if x.ndim != 1:
raise ValueError("smooth only accepts 1 dimension arrays.")
if x.size < window_len:
raise ValueError("Input vector needs to be bigger than window size.")
if window_len < 3:
return x
if not window in ['flat', 'hanning', 'hamming', 'bartlett', 'blackman']:
raise ValueError("Window is on of 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'")
s=numpy.r_[2*x[0]-x[window_len:1:-1],x,2*x[-1]-x[-1:-window_len:-1]]
#print(len(s))
if window == 'flat': #moving average
w = numpy.ones(window_len,'d')
else:
w = eval('numpy.' + window + '(window_len)')
y = numpy.convolve(w/w.sum(), s, mode='same')
return y[window_len-1:-window_len+1]
def smooth(x,window_len=11,window='flat'):
"""smooth the data using a window with requested size.
This method is based on the convolution of a scaled window with the signal.
The signal is prepared by introducing reflected copies of the signal
(with the window size) in both ends so that transient parts are minimized
in the beginning and end part of the output signal.
:param x: the input signal
:param window_len: the dimension of the smoothing window; should be an odd integer
:param window: the type of window from 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'
flat window will produce a moving average smoothing.
:return: the smoothed signal
example::
t=linspace(-2,2,0.1)
x=sin(t)+randn(len(t))*0.1
y=smooth(x)
:see also: numpy.hanning, numpy.hamming, numpy.bartlett, numpy.blackman, numpy.convolve,
scipy.signal.lfilter
TODO: the window parameter could be the window itself if an array instead of a string
"""
if x.ndim != 1:
raise ValueError("smooth only accepts 1 dimension arrays.")
if x.size < window_len:
raise ValueError("Input vector needs to be bigger than window size.")
if window_len < 3:
return x
if not window in ['flat', 'hanning', 'hamming', 'bartlett', 'blackman']:
raise ValueError("Window is on of 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'")
s=numpy.r_[2*x[0]-x[window_len:1:-1],x,2*x[-1]-x[-1:-window_len:-1]]
#print(len(s))
if window == 'flat': #moving average
w = numpy.ones(window_len,'d')
else:
w = eval('numpy.' + window + '(window_len)')
y = numpy.convolve(w/w.sum(), s, mode='same')
return y[window_len-1:-window_len+1]
def smooth(x,window_len=11,window='flat'):
"""smooth the data using a window with requested size.
This method is based on the convolution of a scaled window with the signal.
The signal is prepared by introducing reflected copies of the signal
(with the window size) in both ends so that transient parts are minimized
in the beginning and end part of the output signal.
:param x: the input signal
:param window_len: the dimension of the smoothing window; should be an odd integer
:param window: the type of window from 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'
flat window will produce a moving average smoothing.
:return: the smoothed signal
example::
t=linspace(-2,2,0.1)
x=sin(t)+randn(len(t))*0.1
y=smooth(x)
:see also: numpy.hanning, numpy.hamming, numpy.bartlett, numpy.blackman, numpy.convolve,
scipy.signal.lfilter
TODO: the window parameter could be the window itself if an array instead of a string
"""
if x.ndim != 1:
raise ValueError("smooth only accepts 1 dimension arrays.")
if x.size < window_len:
raise ValueError("Input vector needs to be bigger than window size.")
if window_len < 3:
return x
if not window in ['flat', 'hanning', 'hamming', 'bartlett', 'blackman']:
raise ValueError("Window is on of 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'")
s=numpy.r_[2*x[0]-x[window_len:1:-1],x,2*x[-1]-x[-1:-window_len:-1]]
#print(len(s))
if window == 'flat': #moving average
w = numpy.ones(window_len,'d')
else:
w = eval('numpy.' + window + '(window_len)')
y = numpy.convolve(w/w.sum(), s, mode='same')
return y[window_len-1:-window_len+1]
def smooth(x,window_len=11,window='flat'):
"""smooth the data using a window with requested size.
This method is based on the convolution of a scaled window with the signal.
The signal is prepared by introducing reflected copies of the signal
(with the window size) in both ends so that transient parts are minimized
in the beginning and end part of the output signal.
:param x: the input signal
:param window_len: the dimension of the smoothing window; should be an odd integer
:param window: the type of window from 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'
flat window will produce a moving average smoothing.
:return: the smoothed signal
example::
t=linspace(-2,2,0.1)
x=sin(t)+randn(len(t))*0.1
y=smooth(x)
:see also: numpy.hanning, numpy.hamming, numpy.bartlett, numpy.blackman, numpy.convolve,
scipy.signal.lfilter
TODO: the window parameter could be the window itself if an array instead of a string
"""
if x.ndim != 1:
raise ValueError("smooth only accepts 1 dimension arrays.")
if x.size < window_len:
raise ValueError("Input vector needs to be bigger than window size.")
if window_len < 3:
return x
if not window in ['flat', 'hanning', 'hamming', 'bartlett', 'blackman']:
raise ValueError("Window is on of 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'")
s=numpy.r_[2*x[0]-x[window_len:1:-1],x,2*x[-1]-x[-1:-window_len:-1]]
#print(len(s))
if window == 'flat': #moving average
w = numpy.ones(window_len,'d')
else:
w = eval('numpy.' + window + '(window_len)')
y = numpy.convolve(w/w.sum(), s, mode='same')
return y[window_len-1:-window_len+1]
def compute_moving_window_int(sample: np.ndarray,
fs: float,
blackman_win_length: int,
filter_length: int = 257,
delta: float = .02) -> np.ndarray:
"""
:param sample: ecg sample array
:param fs: sampling frequency
:param blackman_win_length: length of the blackman window on which to compute the moving window integration
:param filter_length: length of the FIR bandpass filter on which filtering is done on ecg sample array
:param delta: to compute the weights of each band in FIR filter
:return: the Moving window integration of the sample array
"""
# I believe these constants can be kept in a file
# filter edges
filter_edges = [0, 4.5 * 2 / fs, 5 * 2 / fs, 20 * 2 / fs, 20.5 * 2 / fs, 1]
# gains at filter band edges
gains = [0, 0, 1, 1, 0, 0]
# weights
weights = [500 / delta, 1 / delta, 500 / delta]
# length of the FIR filter
# FIR filter coefficients for bandpass filtering
filter_coeff = signal.firls(filter_length, filter_edges, gains, weights)
# bandpass filtered signal
bandpass_signal = signal.convolve(sample, filter_coeff, 'same')
bandpass_signal /= np.percentile(bandpass_signal, 90)
# derivative array
derivative_array = (np.array([-1.0, -2.0, 0, 2.0, 1.0])) * (1 / 8)
# derivative signal (differentiation of the bandpass)
derivative_signal = signal.convolve(bandpass_signal, derivative_array, 'same')
derivative_signal /= np.percentile(derivative_signal, 90)
# squared derivative signal
derivative_squared_signal = derivative_signal ** 2
derivative_squared_signal /= np.percentile(derivative_squared_signal, 90)
# blackman window
blackman_window = np.blackman(blackman_win_length)
# moving window Integration of squared derivative signal
mov_win_int_signal = signal.convolve(derivative_squared_signal, blackman_window, 'same')
mov_win_int_signal /= np.percentile(mov_win_int_signal, 90)
return mov_win_int_signal
def detect_rpeak(ecg: DataStream,
fs: float = 64,
threshold: float = 0.5,
blackman_win_len_range: float = 0.2) -> DataStream:
"""
This program implements the Pan Tomkins algorithm on ECG signal to detect the R peaks
Since the ecg array can have discontinuity in the timestamp arrays the rr-interval calculated
in the algorithm is calculated in terms of the index in the sample array
The algorithm consists of some major steps
1. computation of the moving window integration of the signal in terms of blackman window of a prescribed length
2. compute all the peaks of the moving window integration signal
3. adaptive thresholding with dynamic signal and noise thresholds applied to filter out the R peak locations
4. confirm the R peaks through differentiation from the nearby peaks and remove the false peaks
:param ecg: ecg array of tuples (timestamp,value)
:param fs: sampling frequency
:param threshold: initial threshold to detect the R peak in a signal normalized by the 90th percentile. .5 is default.
:param blackman_win_len_range : the range to calculate blackman window length
:return: R peak array of tuples (timestamp, Rpeak interval)
"""
data = ecg.data
result = DataStream.from_datastream([ecg])
if len(data) == 0:
result.data = []
return result
sample = np.array([i.sample for i in data])
timestamp = np.array([i.start_time for i in data])
# computes the moving window integration of the signal
blackman_win_len = np.ceil(fs * blackman_win_len_range)
y = compute_moving_window_int(sample, fs, blackman_win_len)
peak_location_values = [(i, y[i]) for i in range(2, len(y) - 1) if check_peak(y[i - 2:i + 3])]
# initial RR interval average
peak_location = [i[0] for i in peak_location_values]
running_rr_avg = sum(np.diff(peak_location)) / (len(peak_location) - 1)
rpeak_temp1 = compute_r_peaks(threshold, running_rr_avg, y, peak_location_values)
rpeak_temp2 = remove_close_peaks(rpeak_temp1, sample, fs)
index = confirm_peaks(rpeak_temp2, sample, fs)
rpeak_timestamp = timestamp[index]
rpeak_value = np.diff(rpeak_timestamp)
rpeak_timestamp = rpeak_timestamp[1:]
result_data = []
for k in range(len(rpeak_value)):
result_data.append(
DataPoint.from_tuple(rpeak_timestamp[k], rpeak_value[k].seconds + rpeak_value[k].microseconds / 1e6))
# Create resulting datastream to be returned
result.data = result_data
return result
def design(self, fs, fc, n_cycles=7.0, bandwidth=None, zero_mean=True):
"""
Designs a FIR filter that is a band-pass filter centered
on frequency fc.
fs : sampling frequency (Hz)
fc : frequency of the carrier (Hz)
n_cycles : number of oscillation in the wavelet
bandwidth : bandwidth of the FIR wavelet filter
(used when: bandwidth is not None and n_cycles is None)
zero_mean : if True, we make sure fir.sum() = 0
"""
# the length of the filter
if bandwidth is None and n_cycles is not None:
half_order = int(float(n_cycles) / fc * fs / 2)
order = 2 * half_order + 1
elif bandwidth is not None and n_cycles is None:
half_order = int(1.65 * fs / bandwidth) // 2
order = half_order * 2 + 1
else:
raise ValueError('Carrier.design: n_cycles and order cannot be '
'both None or both not None.')
w = np.blackman(order)
t = np.linspace(-half_order, half_order, order)
phase = (2.0 * np.pi * fc / fs) * t
car = np.cos(phase)
fir = w * car
# the filter must be symmetric, in order to be zero-phase
assert np.all(np.abs(fir - fir[::-1]) < 1e-15)
# remove the constant component by forcing fir.sum() = 0
if zero_mean:
fir -= fir.sum() / order
gain = np.sum(fir * car)
self.fir = fir * (1.0 / gain)
self.fs = fs
# add the imaginary part to have a complex wavelet
if self.extract_complex:
car_imag = np.sin(phase)
fir_imag = w * car_imag
self.fir_imag = fir_imag * (1.0 / gain)
return self
def smooth(x,window_len=11,window='flat'):
"""smooth the data using a window with requested size.
This method is based on the convolution of a scaled window with the signal.
The signal is prepared by introducing reflected copies of the signal
(with the window size) in both ends so that transient parts are minimized
in the beginning and end part of the output signal.
:param x: the input signal
:param window_len: the dimension of the smoothing window; should be an odd integer
:param window: the type of window from 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'
flat window will produce a moving average smoothing.
:return: the smoothed signal
example::
t=linspace(-2,2,0.1)
x=sin(t)+randn(len(t))*0.1
y=smooth(x)
:see also: numpy.hanning, numpy.hamming, numpy.bartlett, numpy.blackman, numpy.convolve,
scipy.signal.lfilter
TODO: the window parameter could be the window itself if an array instead of a string
"""
if x.ndim != 1:
raise ValueError("smooth only accepts 1 dimension arrays.")
if x.size < window_len:
raise ValueError("Input vector needs to be bigger than window size.")
if window_len < 3:
return x
if not window in ['flat', 'hanning', 'hamming', 'bartlett', 'blackman']:
raise ValueError("Window is on of 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'")
s=numpy.r_[2*x[0]-x[window_len:1:-1],x,2*x[-1]-x[-1:-window_len:-1]]
#print(len(s))
if window == 'flat': #moving average
w = numpy.ones(window_len,'d')
else:
w = eval('numpy.' + window + '(window_len)')
y = numpy.convolve(w/w.sum(), s, mode='same')
return y[window_len-1:-window_len+1]
def smooth(x,window_len=11,window='flat'):
"""smooth the data using a window with requested size.
This method is based on the convolution of a scaled window with the signal.
The signal is prepared by introducing reflected copies of the signal
(with the window size) in both ends so that transient parts are minimized
in the beginning and end part of the output signal.
:param x: the input signal
:param window_len: the dimension of the smoothing window; should be an odd integer
:param window: the type of window from 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'
flat window will produce a moving average smoothing.
:return: the smoothed signal
example::
t=linspace(-2,2,0.1)
x=sin(t)+randn(len(t))*0.1
y=smooth(x)
:see also: numpy.hanning, numpy.hamming, numpy.bartlett, numpy.blackman, numpy.convolve,
scipy.signal.lfilter
TODO: the window parameter could be the window itself if an array instead of a string
"""
if x.ndim != 1:
raise ValueError("smooth only accepts 1 dimension arrays.")
if x.size < window_len:
raise ValueError("Input vector needs to be bigger than window size.")
if window_len < 3:
return x
if not window in ['flat', 'hanning', 'hamming', 'bartlett', 'blackman']:
raise ValueError("Window is on of 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'")
s=numpy.r_[2*x[0]-x[window_len:1:-1],x,2*x[-1]-x[-1:-window_len:-1]]
#print(len(s))
if window == 'flat': #moving average
w = numpy.ones(window_len,'d')
else:
w = eval('numpy.' + window + '(window_len)')
y = numpy.convolve(w/w.sum(), s, mode='same')
return y[window_len-1:-window_len+1]