Python scipy.signal 模块,fftconvolve() 实例源码
我们从Python开源项目中,提取了以下37个代码示例,用于说明如何使用scipy.signal.fftconvolve()。
def freq_from_autocorr(sig, fs):
"""
Estimate frequency using autocorrelation
"""
# Calculate autocorrelation (same thing as convolution, but with
# one input reversed in time), and throw away the negative lags
corr = fftconvolve(sig, sig[::-1], mode='full')
corr = corr[len(corr)//2:]
# Find the first low point
d = diff(corr)
start = find(d > 0)[0]
# Find the next peak after the low point (other than 0 lag). This bit is
# not reliable for long signals, due to the desired peak occurring between
# samples, and other peaks appearing higher.
# Should use a weighting function to de-emphasize the peaks at longer lags.
peak = argmax(corr[start:]) + start
px, py = parabolic(corr, peak)
return fs / px
def sincinterp(x):
"""
Sinc interpolation for computation of fractional transformations.
As appears in :
-https://github.com/audiolabs/frft/
----------
Args:
f : (array) Complex valued input array
a : (float) Alpha factor
Returns:
ret : (array) Real valued synthesised data
"""
N = len(x)
y = np.zeros(2 * N - 1, dtype=x.dtype)
y[:2 * N:2] = x
xint = fftconvolve( y[:2 * N], np.sinc(np.arange(-(2 * N - 3), (2 * N - 2)).T / 2),)
return xint[2 * N - 3: -2 * N + 3]
def direct(self, sigin):
"""
apply this filter to a signal
sigin : input signal (ndarray)
returns the filtered signal (ndarray)
"""
fftconvolve = signal.fftconvolve
filtered = fftconvolve(sigin.ravel(), self.fir, 'same')
if self.extract_complex:
filtered_imag = fftconvolve(sigin.ravel(), self.fir_imag, 'same')
if sigin.ndim == 2:
filtered = filtered[None, :]
if self.extract_complex:
filtered_imag = filtered_imag[None, :]
if self.extract_complex:
return filtered, filtered_imag
else:
return filtered
def transform(self, sigin):
"""Apply this filter to a signal
Parameters
----------
sigin : array, shape (n_points, ) or (n_signals, n_points)
Input signal
Returns
-------
filtered : array, shape (n_points, ) or (n_signals, n_points)
Filtered signal
"""
sigin_ndim = sigin.ndim
sigin = np.atleast_2d(sigin)
filtered = [signal.fftconvolve(sig, self.fir, 'same') for sig in sigin]
if sigin_ndim == 1:
filtered = filtered[0]
else:
filtered = np.asarray(filtered)
return filtered
def cell_fd_conv(cell_df, h144=None):
Limg, Lx, Ly = cell_fd_info(cell_df)
if h144 is None:
h144 = get_h2d(Lx, Ly, l=405, z=0.5, dx=2.2/4, dy=2.2/4)
cell_img_fd_l = []
for l in range(Limg):
cell_img = cell_df[cell_df["ID"] == l]["image"].values.reshape(Lx, Ly)
#cell_img_fd = fd_conv(cell_img, h144)
cell_img_fd = fftconvolve(cell_img, h144, mode='same')
cell_img_fd_l.append(cell_img_fd)
cell_img_fd_a = np.array(cell_img_fd_l)
#print( cell_img_fd_a.shape)
return cell_img_fd_a
def cell_fd_conv(cell_df, h144=None):
Limg, Lx, Ly = cell_fd_info(cell_df)
if h144 is None:
h144 = get_h2d(Lx, Ly, l=405, z=0.5, dx=2.2/4, dy=2.2/4)
cell_img_fd_l = []
for l in range(Limg):
cell_img = cell_df[cell_df["ID"] == l]["image"].values.reshape(Lx, Ly)
#cell_img_fd = fd_conv(cell_img, h144)
cell_img_fd = fftconvolve(cell_img, h144, mode='same')
cell_img_fd_l.append(cell_img_fd)
cell_img_fd_a = np.array(cell_img_fd_l)
#print( cell_img_fd_a.shape)
return cell_img_fd_a
def cell_fd_conv(cell_df, h144=None):
Limg, Lx, Ly = cell_fd_info(cell_df)
if h144 is None:
h144 = get_h2d(Lx, Ly, l=405, z=0.5, dx=2.2/4, dy=2.2/4)
cell_img_fd_l = []
for l in range(Limg):
cell_img = cell_df[cell_df["ID"] == l]["image"].values.reshape(Lx, Ly)
#cell_img_fd = fd_conv(cell_img, h144)
cell_img_fd = fftconvolve(cell_img, h144, mode='same')
cell_img_fd_l.append(cell_img_fd)
cell_img_fd_a = np.array(cell_img_fd_l)
#print( cell_img_fd_a.shape)
return cell_img_fd_a
def mask_init(self):
x = np.linspace(-10,10,21)
X, Y = np.meshgrid(x,x)
R = X**2 + Y**2
O = np.exp(-R/2/(4**2))
OO = O/np.sum(O)
D = sg.fftconvolve(1.0*self.image.mask.data+0.0, OO,'same')
# D = pyfftw.interfaces.scipy_fftpack.convolve(1.0*self.image.mask.data+0.0, OO,'same')
self.target = copy.deepcopy(self.image.mask.data)
self.maskdata = 0.99*D + 0.01
AA = 2*self.maskdata - 1
AA = np.complex64(AA)
BB = np.arccos(AA)
self.masktheta = BB.real
self.image.mask.data = self.maskdata
def simulate(self, p, tindex=None, dt=1):
"""Simulates the head contribution.
Parameters
----------
p: 1D array
Parameters used for simulation.
tindex: pandas.Series, optional
Time indices to simulate the model.
Returns
-------
pandas.Series
The simulated head contribution.
"""
b = self.rfunc.block(p, dt)
stress = self.stress[0]
self.npoints = stress.index.size
h = pd.Series(fftconvolve(stress, b, 'full')[:self.npoints],
index=stress.index, name=self.name)
if tindex is not None:
h = h[tindex]
return h
def blur_image(self, save=False, show=False):
if self.part is None:
psf = self.PSFs
else:
psf = [self.PSFs[self.part]]
yN, xN, channel = self.shape
key, kex = self.PSFs[0].shape
delta = yN - key
assert delta >= 0, 'resolution of image should be higher than kernel'
result=[]
if len(psf) > 1:
for p in psf:
tmp = np.pad(p, delta // 2, 'constant')
cv2.normalize(tmp, tmp, alpha=0, beta=1, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_32F)
# blured = np.zeros(self.shape)
blured = cv2.normalize(self.original, self.original, alpha=0, beta=1, norm_type=cv2.NORM_MINMAX,
dtype=cv2.CV_32F)
blured[:, :, 0] = np.array(signal.fftconvolve(blured[:, :, 0], tmp, 'same'))
blured[:, :, 1] = np.array(signal.fftconvolve(blured[:, :, 1], tmp, 'same'))
blured[:, :, 2] = np.array(signal.fftconvolve(blured[:, :, 2], tmp, 'same'))
blured = cv2.normalize(blured, blured, alpha=0, beta=1, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_32F)
blured = cv2.cvtColor(blured, cv2.COLOR_RGB2BGR)
result.append(np.abs(blured))
else:
psf = psf[0]
tmp = np.pad(psf, delta // 2, 'constant')
cv2.normalize(tmp, tmp, alpha=0, beta=1, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_32F)
blured = cv2.normalize(self.original, self.original, alpha=0, beta=1, norm_type=cv2.NORM_MINMAX,
dtype=cv2.CV_32F)
blured[:, :, 0] = np.array(signal.fftconvolve(blured[:, :, 0], tmp, 'same'))
blured[:, :, 1] = np.array(signal.fftconvolve(blured[:, :, 1], tmp, 'same'))
blured[:, :, 2] = np.array(signal.fftconvolve(blured[:, :, 2], tmp, 'same'))
blured = cv2.normalize(blured, blured, alpha=0, beta=1, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_32F)
blured = cv2.cvtColor(blured, cv2.COLOR_RGB2BGR)
result.append(np.abs(blured))
self.result = result
if show or save:
self.__plot_canvas(show, save)
def convolve(self, win):
" Convolves each channel with window win."
from scipy.signal import fftconvolve
def conv_func(x):
return np.column_stack([fftconvolve(x[:, i], win)
for i in range(x.shape[1])])
return self.new_stream(self.vector_map(conv_func))
def test_convolve():
win = [.1, 0, 3]
for data in (data2, data3, data4):
x = np.column_stack([fftconvolve(data[:, i], win)
for i in range(data.shape[1])])
y = Stream(data, sr=1).convolve(win).call()
assert eq(x, y)
def _cross_corr(img1, img2=None):
''' Compute the cross correlation of one (or two) images.
Parameters
----------
img1 : np.ndarray
the image or curve to cross correlate
img2 : 1d or 2d np.ndarray, optional
If set, cross correlate img1 against img2. A shift of img2
to the right of img1 will lead to a shift of the point of
highest correlation to the right.
Default is set to None
'''
ndim = img1.ndim
if img2 is None:
img2 = img1
if img1.shape != img2.shape:
errorstr = "Image shapes don't match. "
errorstr += "(img1 : {},{}; img2 : {},{})"\
.format(*img1.shape, *img2.shape)
raise ValueError(errorstr)
# need to reverse indices for second image
# fftconvolve(A,B) = FFT^(-1)(FFT(A)*FFT(B))
# but need FFT^(-1)(FFT(A(x))*conj(FFT(B(x)))) = FFT^(-1)(A(x)*B(-x))
reverse_index = [slice(None, None, -1) for i in range(ndim)]
imgc = fftconvolve(img1, img2[reverse_index], mode='same')
return imgc
def smoothLine(data,kernel):
"""helper function parallelized by smooth"""
return signal.fftconvolve(data,kernel, mode='same')
def place_text(self, text_arrs, back_arr, bbs):
areas = [-np.prod(ta.shape) for ta in text_arrs]
order = np.argsort(areas)
locs = [None for i in range(len(text_arrs))]
out_arr = np.zeros_like(back_arr)
for i in order:
ba = np.clip(back_arr.copy().astype(np.float), 0, 255)
ta = np.clip(text_arrs[i].copy().astype(np.float), 0, 255)
ba[ba > 127] = 1e8
intersect = ssig.fftconvolve(ba,ta[::-1,::-1],mode='valid')
safemask = intersect < 1e8
if not np.any(safemask): # no collision-free position:
#warn("COLLISION!!!")
return back_arr,locs[:i],bbs[:i],order[:i]
minloc = np.transpose(np.nonzero(safemask))
loc = minloc[np.random.choice(minloc.shape[0]),:]
locs[i] = loc
# update the bounding-boxes:
bbs[i] = move_bb(bbs[i],loc[::-1])
# blit the text onto the canvas
w,h = text_arrs[i].shape
out_arr[loc[0]:loc[0]+w,loc[1]:loc[1]+h] += text_arrs[i]
return out_arr, locs, bbs, order
def _SSIMForMultiScale(img1, img2, max_val=255, filter_size=11, filter_sigma=1.5, k1=0.01, k2=0.03):
img1 = img1.astype(np.float64)
img2 = img2.astype(np.float64)
_, height, width, _ = img1.shape
size = min(filter_size, height, width)
sigma = size * filter_sigma / filter_size if filter_size else 0
if filter_size:
window = np.reshape(_FSpecialGauss(size, sigma), (1, size, size, 1))
mu1 = signal.fftconvolve(img1, window, mode='valid')
mu2 = signal.fftconvolve(img2, window, mode='valid')
sigma11 = signal.fftconvolve(img1 * img1, window, mode='valid')
sigma22 = signal.fftconvolve(img2 * img2, window, mode='valid')
sigma12 = signal.fftconvolve(img1 * img2, window, mode='valid')
else:
mu1, mu2 = img1, img2
sigma11 = img1 * img1
sigma22 = img2 * img2
sigma12 = img1 * img2
mu11 = mu1 * mu1
mu22 = mu2 * mu2
mu12 = mu1 * mu2
sigma11 -= mu11
sigma22 -= mu22
sigma12 -= mu12
c1 = (k1 * max_val) ** 2
c2 = (k2 * max_val) ** 2
v1 = 2.0 * sigma12 + c2
v2 = sigma11 + sigma22 + c2
ssim = np.mean((((2.0 * mu12 + c1) * v1) / ((mu11 + mu22 + c1) * v2)))
cs = np.mean(v1 / v2)
return ssim, cs
def _denoise1d(self, M, window_size, order, deriv, rate):
try:
window_size = np.abs(np.int(window_size))
order = np.abs(np.int(order))
except ValueError as msg:
raise ValueError("in SavitzkyGolay.denoise_spectra(), window_size and order have to be of type int")
if window_size % 2 != 1 or window_size < 1:
raise TypeError("in SavitzkyGolay.denoise_spectra(), window_size size must be a positive odd number")
if window_size < order + 2:
raise TypeError("in SavitzkyGolay.denoise_spectra(), window_size is too small for the polynomials order")
order_range = range(order+1)
half_window = (window_size -1) // 2
# precompute coefficients
b = np.mat([[k**i for i in order_range] for k in range(-half_window, half_window+1)])
m = np.linalg.pinv(b).A[deriv] * rate**deriv * factorial(deriv)
# pad the signal at the extremes with
# values taken from the signal itself
N, p = M.shape
dn = np.ones((N,p), dtype=np.float)
long_signal = np.ndarray(p+2, dtype=np.float)
for i in range(N):
y = M[i]
firstvals = y[0] - np.abs( y[1:half_window+1][::-1] - y[0] )
lastvals = y[-1] + np.abs(y[-half_window-1:-1][::-1] - y[-1])
long_signal = np.concatenate((firstvals, y, lastvals))
dn[i] = fftconvolve(long_signal, m, mode='valid')
return dn
def _fftconvolve(image, blur_width, blur_height):
kernel_width = 10 * int(_np.round(blur_width)) + 1
kernel_height = 10 * int(_np.round(blur_height)) + 1
kernel_y = _signal.gaussian(kernel_height, blur_height)
kernel_x = _signal.gaussian(kernel_width, blur_width)
kernel = _np.outer(kernel_y, kernel_x)
kernel /= kernel.sum()
return _signal.fftconvolve(image, kernel, mode='same')
def generate_signal(n, p, loops, SNR_dB=100, noise='white', h=None):
# First generate a random signal
if noise == 'pink':
x = noise_pink(n, rows=loops, alpha=1e-10)
elif noise == 'ar1':
x = noise_ar1(n, rows=loops)
else:
x = noise_white(n, rows=loops)
# Generate random filters on the sphere
if h is None:
h = np.random.randn(loops,p)
norm = np.linalg.norm(h, axis=1)
h = (h.T/norm).T
if h.ndim == 1:
if h.shape[0] >= p:
h = np.tile(h[:p], (loops,1))
else:
h2 = np.zeros(loops,p)
for i in xrange(loops):
h2[i,:h.shape[0]] = h
h = h2
# Finally generate the filtered signal
sigma_noise = 10.**(-SNR_dB/20.)
d = np.zeros((loops,n+h.shape[1]-1))
for l in xrange(loops):
d[l,:] = fftconvolve(x[l], h[l])
d[l,:] += np.random.randn(n+h.shape[1]-1)*sigma_noise
return x, h, d
def noise_ar1(n, rows=1, a1=0.9):
x = noise_white(n, rows=rows)
for row in x:
row[:] = fftconvolve(row, np.array([1., a1]), mode='same')
x = (x.T/np.sqrt(np.mean(x**2, axis=1))).T
return x
def _remove_far_masked_data(self, mask, list_signals):
"""Remove unnecessary data which is masked
and far (> self.ordar) from the unmasked data.
"""
if mask is None:
return list_signals
selection = ~mask
# convolution with a delay kernel,
# so we keep the close points before the selection
kernel = np.ones(self.ordar * 2 + 1)
kernel[-self.ordar:] = 0.
delayed_selection = fftconvolve(selection, kernel[None, :],
mode='same')
# remove numerical error from fftconvolve
delayed_selection[np.abs(delayed_selection) < 1e-13] = 0.
time_selection = delayed_selection.sum(axis=0) != 0
epoch_selection = delayed_selection.sum(axis=1) != 0
if not np.any(time_selection) or not np.any(epoch_selection):
raise ValueError("The mask seems to hide everything.")
output_signals = []
for sig in list_signals:
if sig is not None:
sig = sig[..., epoch_selection, :]
sig = sig[..., :, time_selection]
output_signals.append(sig)
return output_signals
def inverse(self, sigin):
"""Apply the inverse ARMA filter to a signal
sigin : input signal (ndarray)
returns the filtered signal(ndarray)
"""
arpart = np.concatenate((np.ones(1), self.AR_))
return signal.fftconvolve(sigin, arpart, 'same')
def fd_conv(Img_xy, h2d, mode ='same'):
#return convolve2d(Img_xy, h2d, mode=mode)
return fftconvolve(Img_xy, h2d, mode=mode)
def fd_conv(Img_xy, h2d, mode ='same'):
#return convolve2d(Img_xy, h2d, mode=mode)
return fftconvolve(Img_xy, h2d, mode=mode)
def simulate(self, p, tindex=None, dt=1):
"""Simulates the head contribution.
Parameters
----------
p: 1D array
Parameters used for simulation.
tindex: pandas.Series, optional
Time indices to simulate the model.
Returns
-------
pandas.Series
The simulated head contribution.
"""
b = self.rfunc.block(p[:-1], dt)
self.npoints = self.stress[0].index.size
stress = self.get_stress(p=p)
h = pd.Series(fftconvolve(stress, b, 'full')[:self.npoints],
index=self.stress[0].index, name=self.name)
if tindex is not None:
h = h[tindex]
# see whether it makes a difference to subtract gain * mean_stress
# h -= self.rfunc.gain(p) * stress.mean()
return h
def simulate(self, p, tindex=None, dt=1):
dt = int(dt)
b = self.rfunc.block(p[:-self.recharge.nparam], dt) # Block response
# The recharge calculation needs arrays
precip_array = np.array(self.stress["prec"])
evap_array = np.array(self.stress["evap"])
rseries = self.recharge.simulate(precip_array, evap_array,
p[-self.recharge.nparam:])
self.npoints = len(rseries)
h = pd.Series(fftconvolve(rseries, b, 'full')[:self.npoints],
index=self.stress["prec"].index, name=self.name)
if tindex is not None:
h = h[tindex]
return h
def simulate(self, p=None, tindex=None, dt=1):
h = pd.Series(data=0, index=self.stress[0].index, name=self.name)
for i in self.stress:
self.npoints = self.stress.index.size
b = self.rfunc.block(p, self.r[i]) # nparam-1 depending on rfunc
h += fftconvolve(self.stress[i], b, 'full')[:self.npoints]
if tindex is not None:
h = h[tindex]
return h
def freq_from_autocorr(signal, sampling_rate):
corr = fftconvolve(signal, signal[::-1], mode='full')
corr = corr[len(corr)//2:]
d = np.diff(corr)
start = find_index_by_true(d > 0)[0]
peak = np.argmax(corr[start:]) + start
px, py = parabolic(corr, peak)
return sampling_rate / px
def write_preview_wav(wav_fp, note_beats_and_abs_times, wav_fs=11025.0):
wav_len = int(wav_fs * (note_beats_and_abs_times[-1][1] + 0.05))
dt = 1.0 / wav_fs
note_type_to_idx = {}
idx = 0
for _, beat, time, note_type in note_beats_and_abs_times:
if note_type == '0' * len(note_type):
continue
if note_type not in note_type_to_idx:
note_type_to_idx[note_type] = idx
idx += 1
num_note_types = len(note_type_to_idx)
pulse_f = np.zeros((num_note_types, wav_len))
for _, beat, time, note_type in note_beats_and_abs_times:
sample = int(time * wav_fs)
if sample > 0 and sample < wav_len and note_type in note_type_to_idx:
pulse_f[note_type_to_idx[note_type]][sample] = 1.0
scale = [440.0, 587.33, 659.25, 783.99]
freqs = [scale[i % 4] * math.pow(2.0, (i // 4) - 1) for i in xrange(num_note_types)]
metro_f = np.zeros(wav_len)
for idx in xrange(num_note_types):
click_len = 0.05
click_t = np.arange(0.0, click_len, dt)
click_atk = 0.02
click_sus = 0.5
click_rel = 0.2
click_env = _linterp(0.0, [(click_atk, 1.0), (click_sus, 1.0), (click_rel, 0.0)], len(click_t))
click_f = click_env * np.sin(2.0 * np.pi * freqs[idx] * click_t)
metro_f += fftconvolve(pulse_f[idx], click_f, mode='full')[:wav_len]
#metro_f += pulse_f[idx][:wav_len]
_wav_write(wav_fp, wav_fs, metro_f, normalize=True)
def convolve(A, B, FFT=None):
""" Convolve two arrrays A & B row-wise. One or both can be one-dimensional for SIMO/SISO convolution
Parameters
----------
A, B: array_like
Data to perform the convolution on of shape [Nsignals x NSamples]
FFT: bool, optional
Selects wether time or frequency domain convolution is applied. Default: On if Nsamples > 500 for both
Returns
-------
out: array
Array containing row-wise, linear convolution of A and B
"""
A = _np.atleast_2d(A)
B = _np.atleast_2d(B)
N_sigA, L_sigA = A.shape
N_sigB, L_sigB = B.shape
if FFT is None and (L_sigA > 500 and L_sigB > 500):
FFT = True
else:
FFT = False
if (N_sigA != N_sigB) and not (N_sigA == 1 or N_sigB == 1):
raise ValueError('Number of rows must either match or at least one must be one-dimensional.')
if N_sigA == 1 and N_sigB != 1:
A = _np.broadcast_to(A, (N_sigB, L_sigA))
elif N_sigA != 1 and N_sigB == 1:
B = _np.broadcast_to(B, (N_sigA, L_sigB))
out = []
for IDX, cur_row in enumerate(A):
if FFT:
out.append(fftconvolve(cur_row, B[IDX]))
else:
out.append(_np.convolve(cur_row, B[IDX]))
return _np.array(out)
def _project(reference_sources, estimated_source, flen):
"""Least-squares projection of estimated source on the subspace spanned by
delayed versions of reference sources, with delays between 0 and flen-1
"""
nsrc = reference_sources.shape[0]
nsampl = reference_sources.shape[1]
# computing coefficients of least squares problem via FFT ##
# zero padding and FFT of input data
reference_sources = np.hstack((reference_sources,
np.zeros((nsrc, flen - 1))))
estimated_source = np.hstack((estimated_source, np.zeros(flen - 1)))
n_fft = int(2**np.ceil(np.log2(nsampl + flen - 1.)))
sf = scipy.fftpack.fft(reference_sources, n=n_fft, axis=1)
sef = scipy.fftpack.fft(estimated_source, n=n_fft)
# inner products between delayed versions of reference_sources
G = np.zeros((nsrc * flen, nsrc * flen))
for i in range(nsrc):
for j in range(nsrc):
ssf = sf[i] * np.conj(sf[j])
ssf = np.real(scipy.fftpack.ifft(ssf))
ss = toeplitz(np.hstack((ssf[0], ssf[-1:-flen:-1])),
r=ssf[:flen])
G[i * flen: (i+1) * flen, j * flen: (j+1) * flen] = ss
G[j * flen: (j+1) * flen, i * flen: (i+1) * flen] = ss.T
# inner products between estimated_source and delayed versions of
# reference_sources
D = np.zeros(nsrc * flen)
for i in range(nsrc):
ssef = sf[i] * np.conj(sef)
ssef = np.real(scipy.fftpack.ifft(ssef))
D[i * flen: (i+1) * flen] = np.hstack((ssef[0], ssef[-1:-flen:-1]))
# Computing projection
# Distortion filters
try:
C = np.linalg.solve(G, D).reshape(flen, nsrc, order='F')
except np.linalg.linalg.LinAlgError:
C = np.linalg.lstsq(G, D)[0].reshape(flen, nsrc, order='F')
# Filtering
sproj = np.zeros(nsampl + flen - 1)
for i in range(nsrc):
sproj += fftconvolve(C[:, i], reference_sources[i])[:nsampl + flen - 1]
return sproj
def MacroBroad(data, vmacro, extend=True):
"""
This broadens the data by a given macroturbulent velocity.
It works for small wavelength ranges. I need to make a better
version that is accurate for large wavelength ranges! Sorry
for the terrible variable names, it was copied from
convol.pro in AnalyseBstar (Karolien Lefever)
Parameters:
===========
-data: kglib.utils.DataStructures.xypoint instance
Stores the data to be broadened. The data MUST
be equally-spaced before calling this!
-vmacro: float
The macroturbulent velocity, in km/s
-extend: boolean
If true, the y-axis will be extended to avoid edge-effects
Returns:
========
A broadened version of data.
"""
# Make the kernel
c = constants.c.cgs.value * units.cm.to(units.km)
sq_pi = np.sqrt(np.pi)
lambda0 = np.median(data.x)
xspacing = data.x[1] - data.x[0]
mr = vmacro * lambda0 / c
ccr = 2 / (sq_pi * mr)
px = np.arange(-data.size() / 2, data.size() / 2 + 1) * xspacing
pxmr = abs(px) / mr
profile = ccr * (np.exp(-pxmr ** 2) + sq_pi * pxmr * (erf(pxmr) - 1.0))
# Extend the xy axes to avoid edge-effects, if desired
if extend:
before = data.y[-profile.size / 2 + 1:]
after = data.y[:profile.size / 2]
extended = np.r_[before, data.y, after]
first = data.x[0] - float(int(profile.size / 2.0 + 0.5)) * xspacing
last = data.x[-1] + float(int(profile.size / 2.0 + 0.5)) * xspacing
x2 = np.linspace(first, last, extended.size)
conv_mode = "valid"
else:
extended = data.y.copy()
x2 = data.x.copy()
conv_mode = "same"
# Do the convolution
newdata = data.copy()
newdata.y = fftconvolve(extended, profile / profile.sum(), mode=conv_mode)
return newdata
def _compute_smoothed_histogram(values, bandwidth, coord_range,
logtrans=False):
"""Approximate 1-D density estimation.
Estimate 1-D probability densities at evenly-spaced grid points,
for specified data. This method is based on creating a 1-D histogram of
data points quantised with respect to evenly-spaced grid points.
Probability densities are then estimated at the grid points by convolving
the obtained histogram with a Gaussian kernel.
Parameters
----------
values : np.array (N,)
A vector containing the data for which to perform density estimation.
Successive data points are indexed by the first axis in the array.
bandwidth : float
The desired KDE bandwidth. (When log-transformation
of data is desired, bandwidth should be specified in log-space.)
coord_range: (2,)
Minimum and maximum values of coordinate on which to evaluate the
smoothed histogram.
logtrans : boolean
Whether or not to log-transform the data before performing density
estimation.
Returns
-------
np.array (M-1,)
An array of estimated probability densities at specified grid points.
"""
if logtrans:
ber = [np.log10(extreme) for extreme in coord_range]
bin_edges = np.logspace(*ber, num=DENSITY_N + 1)
bin_edge_range = ber[1] - ber[0]
else:
bin_edges = np.linspace(*coord_range, num=DENSITY_N + 1)
bin_edge_range = coord_range[1] - coord_range[0]
if values.size < 2:
# Return zeros if there are too few points to do anything useful.
return bin_edges[:-1], np.zeros(bin_edges.shape[0] - 1)
# Bin the values
H = np.histogram(values, bin_edges)[0]
relative_bw = bandwidth / bin_edge_range
K = _compute_gaussian_kernel(H.shape, relative_bw)
pdf = signal.fftconvolve(H, K, mode='same')
# Return lower edges of bins and normalized pdf
return bin_edges[:-1], pdf / np.trapz(pdf, bin_edges[:-1])
def _compute_smoothed_histogram2d(values,
bandwidth,
coord_ranges,
logtrans=False):
"""Approximate 2-D density estimation.
Estimate 2-D probability densities at evenly-spaced grid points,
for specified data. This method is based on creating a 2-D histogram of
data points quantised with respect to evenly-spaced grid points.
Probability densities are then estimated at the grid points by convolving
the obtained histogram with a Gaussian kernel.
Parameters
----------
values : np.array (N,2)
A 2-D array containing the data for which to perform density
estimation. Successive data points are indexed by the first axis in the
array. The second axis indexes x and y coordinates of data points
(values[:,0] and values[:,1] respectively).
bandwidth : array-like (2,)
The desired KDE bandwidths for x and y axes. (When log-transformation
of data is desired, bandwidths should be specified in log-space.)
coord_range: (2,2)
Minimum and maximum values of coordinates on which to evaluate the
smoothed histogram.
logtrans : array-like (2,)
A 2-element boolean array specifying whether or not to log-transform
the x or y coordinates of the data before performing density
estimation.
Returns
-------
np.array (M-1, M-1)
An array of estimated probability densities at specified grid points.
"""
bin_edges = []
bedge_range = []
for minmax, lt in zip(coord_ranges, logtrans):
if lt:
ber = [np.log10(extreme) for extreme in minmax]
bin_edges.append(np.logspace(*ber, num=DENSITY_N + 1))
bedge_range.append(ber[1] - ber[0])
else:
bin_edges.append(np.linspace(*minmax, num=DENSITY_N + 1))
bedge_range.append(minmax[1] - minmax[0])
# Bin the observations
H = np.histogram2d(values[:, 0], values[:, 1], bins=bin_edges)[0]
relative_bw = [bw / berange for bw, berange in zip(bandwidth, bedge_range)]
K = _compute_gaussian_kernel(H.shape, relative_bw)
pdf = signal.fftconvolve(H.T, K, mode='same')
# Normalize pdf
bin_centers = [edges[:-1] + np.diff(edges) / 2. for edges in bin_edges]
pdf /= np.trapz(np.trapz(pdf, bin_centers[1]), bin_centers[0])
# Return lower bin edges and density
return bin_edges[0][:-1], bin_edges[1][:-1], pdf
def conv(data, kernel, mode='full', method='fft', use_jit=True):
"""Convoles data with a kernel using either FFT or sparse convolution
This function convolves data with a kernel, relying either on the
fast Fourier transform (FFT) or a sparse convolution function.
Parameters
----------
data : array_like
First input, typically the data array
kernel : array_like
Second input, typically the kernel
mode : str {'full', 'valid', 'same'}, optional, default: 'full'
A string indicating the size of the output:
- ``full``:
The output is the full discrete linear convolution of the inputs.
- ``valid``:
The output consists only of those elements that do not rely on
zero-padding.
- ``same``:
The output is the same size as `data`, centered with respect to the
'full' output.
method : str {'fft', 'sparse'}, optional, default: 'fft'
A string indicating the convolution method:
- ``fft``:
Use the fast Fourier transform (FFT).
- ``sparse``:
Use the sparse convolution.
use_jit : bool, optional, default: True
A flag indicating whether to use Numba's just-in-time compilation
option (only relevant for `method`=='sparse').
"""
if method.lower() == 'fft':
# Use FFT: faster on non-sparse data
conved = sps.fftconvolve(data, kernel, mode)
elif method.lower() == 'sparse':
# Use sparseconv: faster on sparse data
conved = sparseconv(data, kernel, mode, use_jit)
else:
raise ValueError("Acceptable methods are: 'fft', 'sparse'.")
return conved
def gen_speech_at_mic_stft(phi_ks, source_signals, mic_array_coord, noise_power, fs, fft_size=1024):
"""
generate microphone signals with short time Fourier transform
:param phi_ks: azimuth of the acoustic sources
:param source_signals: speech signals for each arrival angle, one per row
:param mic_array_coord: x and y coordinates of the microphone array
:param noise_power: the variance of the microphone noise signal
:param fs: sampling frequency
:param fft_size: number of FFT bins
:return: y_hat_stft: received (complex) signal at microphones
y_hat_stft_noiseless: the noiseless received (complex) signal at microphones
"""
frame_shift_step = np.int(fft_size / 1.) # half block overlap for adjacent frames
K = source_signals.shape[0] # number of point sources
num_mic = mic_array_coord.shape[1] # number of microphones
# Generate the impulse responses for the array and source directions
impulse_response = gen_far_field_ir(np.reshape(phi_ks, (1, -1), order='F'),
mic_array_coord, fs)
# Now generate all the microphone signals
y = np.zeros((num_mic, source_signals.shape[1] + impulse_response.shape[2] - 1), dtype=np.float32)
for src in xrange(K):
for mic in xrange(num_mic):
y[mic] += fftconvolve(impulse_response[src, mic], source_signals[src])
# Now do the short time Fourier transform
# The resulting signal is M x fft_size/2+1 x number of frames
y_hat_stft_noiseless = \
np.array([pra.stft(signal, fft_size, frame_shift_step, transform=mkl_fft.rfft).T
for signal in y]) / np.sqrt(fft_size)
# Add noise to the signals
y_noisy = y + np.sqrt(noise_power) * np.array(np.random.randn(*y.shape), dtype=np.float32)
# compute sources stft
source_stft = \
np.array([pra.stft(s_loop, fft_size, frame_shift_step, transform=mkl_fft.rfft).T
for s_loop in source_signals]) / np.sqrt(fft_size)
y_hat_stft = \
np.array([pra.stft(signal, fft_size, frame_shift_step, transform=mkl_fft.rfft).T
for signal in y_noisy]) / np.sqrt(fft_size)
return y_hat_stft, y_hat_stft_noiseless, source_stft
def gen_sig_at_mic_stft(phi_ks, alpha_ks, mic_array_coord, SNR, fs, fft_size=1024, Ns=256):
"""
generate microphone signals with short time Fourier transform
:param phi_ks: azimuth of the acoustic sources
:param alpha_ks: power of the sources
:param mic_array_coord: x and y coordinates of the microphone array
:param SNR: signal to noise ratio at the microphone
:param fs: sampling frequency
:param fft_size: number of FFT bins
:param Ns: number of time snapshots used to estimate covariance matrix
:return: y_hat_stft: received (complex) signal at microphones
y_hat_stft_noiseless: the noiseless received (complex) signal at microphones
"""
frame_shift_step = np.int(fft_size / 1.) # half block overlap for adjacent frames
K = alpha_ks.shape[0] # number of point sources
num_mic = mic_array_coord.shape[1] # number of microphones
# Generate the impulse responses for the array and source directions
impulse_response = gen_far_field_ir(np.reshape(phi_ks, (1, -1), order='F'),
mic_array_coord, fs)
# Now generate some noise
# source_signal = np.random.randn(K, Ns * fft_size) * np.sqrt(alpha_ks[:, np.newaxis])
source_signal = np.random.randn(K, fft_size + (Ns - 1) * frame_shift_step) * \
np.sqrt(np.reshape(alpha_ks, (-1, 1), order='F'))
# Now generate all the microphone signals
y = np.zeros((num_mic, source_signal.shape[1] + impulse_response.shape[2] - 1), dtype=np.float32)
for src in xrange(K):
for mic in xrange(num_mic):
y[mic] += fftconvolve(impulse_response[src, mic], source_signal[src])
# Now do the short time Fourier transform
# The resulting signal is M x fft_size/2+1 x number of frames
y_hat_stft_noiseless = \
np.array([pra.stft(signal, fft_size, frame_shift_step, transform=mkl_fft.rfft).T
for signal in y]) / np.sqrt(fft_size)
# compute noise variance based on SNR
signal_energy = linalg.norm(y_hat_stft_noiseless.flatten()) ** 2
noise_energy = signal_energy / 10 ** (SNR * 0.1)
sigma2_noise = noise_energy / y_hat_stft_noiseless.size
# Add noise to the signals
y_noisy = y + np.sqrt(sigma2_noise) * np.array(np.random.randn(*y.shape), dtype=np.float32)
y_hat_stft = \
np.array([pra.stft(signal, fft_size, frame_shift_step, transform=mkl_fft.rfft).T
for signal in y_noisy]) / np.sqrt(fft_size)
return y_hat_stft, y_hat_stft_noiseless