Python numpy 模块,correlate() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用numpy.correlate()。
def get_timepixel_g2( oned_count ):
n = len( oned_count )
norm = ( np.arange( n, 0, -1) *
np.array( [np.average(oned_count[i:]) for i in range( n )] ) *
np.array( [np.average(oned_count[0:n-i]) for i in range( n )] ) )
return np.correlate(oned_count, oned_count, mode = 'full')[-n:]/norm
#########################################
def get_timepixel_g2( oned_count ):
n = len( oned_count )
norm = ( np.arange( n, 0, -1) *
np.array( [np.average(oned_count[i:]) for i in range( n )] ) *
np.array( [np.average(oned_count[0:n-i]) for i in range( n )] ) )
return np.correlate(oned_count, oned_count, mode = 'full')[-n:]/norm
#########################################
def get_timepixel_g2( oned_count ):
n = len( oned_count )
norm = ( np.arange( n, 0, -1) *
np.array( [np.average(oned_count[i:]) for i in range( n )] ) *
np.array( [np.average(oned_count[0:n-i]) for i in range( n )] ) )
return np.correlate(oned_count, oned_count, mode = 'full')[-n:]/norm
#########################################
def gi_correlation(self, p_gi_len, p_symbol_len, signal):
''' Mode Erkennung '''
corr_delay = p_symbol_len - p_gi_len
# Verzögertes Signal / Delay
corr_A = signal[0:p_gi_len-1];
corr_B = signal[corr_delay:len(signal)-1]
# Normierung
corr_A = corr_A / np.sqrt( np.sum(np.square(np.abs(corr_A))) )
corr_B = corr_B / np.sqrt( np.sum(np.square(np.abs(corr_B))) )
# Korrelation
erg_corr = np.correlate(corr_A,corr_B)
return erg_corr[0]
def crosscorr(data, fb, fs, pairs=None):
"""
Parameters
----------
Returns
-------
"""
n_channels, n_samples = np.shape(data)
filtered, _, _ = analytic_signal(data, fb, fs)
r = np.zeros([n_channels, n_channels], dtype=np.float32)
for i in range(n_channels):
for ii in range(n_channels):
r[i, ii] = np.correlate(filtered[i, ], filtered[ii, ], mode='valid')
return r
def estimate_range(tx,rx,fs,quiet=False):
"""
tx: the known, noise-free, undelayed transmit signal (bistatic radars agree beforehand on the psuedorandom sequence)
rx: the noisy, corrupted, interference, jammed signal to estimate distance from
fs: baseband sample frequency
"""
Rxy = np.correlate(tx, rx, 'full')
lags = np.arange(Rxy.size) - Rxy.size // 2
pklag = lags[Rxy.argmax()]
distest_m = -pklag / fs / 2 * c
mR = abs(Rxy) # magnitude of complex cross-correlation
if not quiet and figure is not None:
ax = figure().gca()
ax.plot(lags,mR)
ax.plot(pklag,mR[mR.argmax()], color='red', marker='*')
ax.set_title('cross-correlation of receive waveform with transmit waveform')
ax.set_ylabel('$|R_{xy}|$')
ax.set_xlabel('lags')
ax.set_xlim(pklag-100,pklag+100)
return distest_m
def procchunk(rx, tx, P:dict):
if P['rxfn'] is not None:
rx = scipy.signal.resample_poly(rx,
P['resample'].numerator,
P['resample'].denominator)
fs = P['txfs']
# %% resamples parameters
NrxPRI = int(fs * P['pri']) # Number of RX samples per PRI (resampled)
assert NrxPRI >= tx.size,'PRI must be longer than chirp length!'
NrxChirp = rx.size // NrxPRI # number of complete PRIs received in this data
assert NrxChirp == P['Nchirp']
Rxy = 0.
for i in range(P['Nchirp']):
r = rx[i*NrxPRI:(i+1)*NrxPRI]
Rxy += np.correlate(tx, r,'same')
if P['verbose']:
plotxcor(Rxy, fs)
draw()
pause(0.1)
return Rxy
def find_audio_period(aclip, t_min=.1, t_max=2, t_res=.01):
""" Finds the period, in seconds of an audioclip.
The beat is then given by bpm = 60/T
t_min and _tmax are bounds for the returned value, t_res
is the numerical precision
"""
chunksize = int(t_res*aclip.fps)
chunk_duration = 1.0*chunksize/aclip.fps
# v denotes the list of volumes
v = np.array([(c**2).sum() for c in
aclip.iter_chunks(chunksize)])
v = v-v.mean()
corrs = np.correlate(v, v, mode = 'full')[-len(v):]
corrs[:int(t_min/chunk_duration)]=0
corrs[int(t_max/chunk_duration):]=0
return chunk_duration*np.argmax(corrs)
def extractMseq(cover, stego, secret_length, m, tau=1):
u"""Extract secret informations by spread spectrum using m-sequence.
@param cover : cover data (2 dimensional np.ndarray)
@param stego : stego data (2 dimension np.ndarray)
@param secret_length : length of secret information
@param m : M-Sequence
@param tau : embed shift interval
@return secret : extracted secret information
"""
cover = _image2vrctor(cover)
stego = _image2vrctor(stego)
m_length = len(m)
data = stego - cover
data = data[:m_length:tau]
secret_data = correlate(m, data, cycle=CYCLE)
center = ((m_length-1)*2+1)//2
secret_data = secret_data[center:center+secret_length]
secret_data = list(map(_checkData, secret_data))
return secret_data
def _convolve_or_correlate(f, a, v, mode, propagate_mask):
"""
Helper function for ma.correlate and ma.convolve
"""
if propagate_mask:
# results which are contributed to by either item in any pair being invalid
mask = (
f(getmaskarray(a), np.ones(np.shape(v), dtype=np.bool), mode=mode)
| f(np.ones(np.shape(a), dtype=np.bool), getmaskarray(v), mode=mode)
)
data = f(getdata(a), getdata(v), mode=mode)
else:
# results which are not contributed to by any pair of valid elements
mask = ~f(~getmaskarray(a), ~getmaskarray(v))
data = f(filled(a, 0), filled(v, 0), mode=mode)
return masked_array(data, mask=mask)
def plot_correlate():
"""Plots the autocorrelation function computed by numpy.
"""
wave = thinkdsp.read_wave('28042__bcjordan__voicedownbew.wav')
wave.normalize()
segment = wave.segment(start=0.2, duration=0.01)
lags, corrs = autocorr(segment)
corrs2 = numpy.correlate(segment.ys, segment.ys, mode='same')
thinkplot.plot(corrs2)
thinkplot.config(xlabel='lag',
ylabel='correlation',
xlim=[0, len(corrs2)])
thinkplot.save(root='autocorr9')
N = len(corrs2)
half = corrs2[N//2:]
lengths = range(N, N//2, -1)
half /= lengths
half /= half[0]
def find_audio_period(aclip, t_min=.1, t_max=2, t_res=.01):
""" Finds the period, in seconds of an audioclip.
The beat is then given by bpm = 60/T
t_min and _tmax are bounds for the returned value, t_res
is the numerical precision
"""
chunksize = int(t_res*aclip.fps)
chunk_duration = 1.0*chunksize/aclip.fps
# v denotes the list of volumes
v = np.array([(c**2).sum() for c in
aclip.iter_chunks(chunksize)])
v = v-v.mean()
corrs = np.correlate(v, v, mode = 'full')[-len(v):]
corrs[:int(t_min/chunk_duration)]=0
corrs[int(t_max/chunk_duration):]=0
return chunk_duration*np.argmax(corrs)
def find_audio_period(aclip, t_min=.1, t_max=2, t_res=.01):
""" Finds the period, in seconds of an audioclip.
The beat is then given by bpm = 60/T
t_min and _tmax are bounds for the returned value, t_res
is the numerical precision
"""
chunksize = int(t_res*aclip.fps)
chunk_duration = 1.0*chunksize/aclip.fps
# v denotes the list of volumes
v = np.array([(c**2).sum() for c in
aclip.iter_chunks(chunksize)])
v = v-v.mean()
corrs = np.correlate(v, v, mode = 'full')[-len(v):]
corrs[:int(t_min/chunk_duration)]=0
corrs[int(t_max/chunk_duration):]=0
return chunk_duration*np.argmax(corrs)
def estimated_autocorrelation(x):
n = len(x)
variance = x.var()
x = x - x.mean()
r = np.correlate(x, x, mode='full')[-n:]
assert np.allclose(r, np.array([(x[:n - k] * x[-(n - k):]).sum() for k in range(n)]))
result = r / (variance * (np.arange(n, 0, -1)))
return result
def _delta(x, window):
return np.correlate(x, window, mode="same")
def test_float(self):
self._setup(np.float)
z = np.correlate(self.x, self.y, 'full')
assert_array_almost_equal(z, self.z1)
z = np.correlate(self.x, self.y[:-1], 'full')
assert_array_almost_equal(z, self.z1_4)
z = np.correlate(self.y, self.x, 'full')
assert_array_almost_equal(z, self.z2)
z = np.correlate(self.x[::-1], self.y, 'full')
assert_array_almost_equal(z, self.z1r)
z = np.correlate(self.y, self.x[::-1], 'full')
assert_array_almost_equal(z, self.z2r)
z = np.correlate(self.xs, self.y, 'full')
assert_array_almost_equal(z, self.zs)
def test_object(self):
self._setup(Decimal)
z = np.correlate(self.x, self.y, 'full')
assert_array_almost_equal(z, self.z1)
z = np.correlate(self.y, self.x, 'full')
assert_array_almost_equal(z, self.z2)
def test_no_overwrite(self):
d = np.ones(100)
k = np.ones(3)
np.correlate(d, k)
assert_array_equal(d, np.ones(100))
assert_array_equal(k, np.ones(3))
def test_complex(self):
x = np.array([1, 2, 3, 4+1j], dtype=np.complex)
y = np.array([-1, -2j, 3+1j], dtype=np.complex)
r_z = np.array([3-1j, 6, 8+1j, 11+5j, -5+8j, -4-1j], dtype=np.complex)
r_z = r_z[::-1].conjugate()
z = np.correlate(y, x, mode='full')
assert_array_almost_equal(z, r_z)
def align(l1, l2, axis):
if axis == 1: #horizontal alignment, we do not care about the right line end
#cw = min(l2.shape[1],l1.shape[1])
#l1 = l1[:,:cw]
#l2 = l2[:,:cw]
#compute correlation
sc1 = np.sum(l1, axis=1-axis)
sc2 = np.sum(l2, axis=1-axis)
cor = np.correlate(sc1,sc2,"same")
posErr = np.argmax(cor)-sc1.shape[0]/2
#place at right position
if posErr > 0:
l2c = l2.copy()
l2c[:]=0
l2c[:,posErr:] = l2[:,:-posErr]
l2 = l2c
elif posErr < 0:
l1c = l1.copy()
l1c[:]=0
l1c[:,-posErr:] = l1[:,:posErr]
l1=l1c
else: #vertical alignment, we cate about both ends
#compute correlation
sc1 = np.sum(l1, axis=1-axis)
sc2 = np.sum(l2, axis=1-axis)
cor = np.correlate(sc1,sc2,"same")
posErr = np.argmax(cor)-sc1.shape[0]/2
#place at right position
if posErr > 0:
l2c=l2.copy()
l2c[:]=0
l2c[posErr:,:] = l2[:-posErr,:]
l2 = l2c
elif posErr < 0:
l1c=l1.copy()
l1c[:]=0
l1c[-posErr:,:]=l1[:posErr,:]
l1 = l1c
return posErr, l1, l2
def autocorr(x):
x=x-np.mean(x)
y=np.correlate(x,x,mode='full')
return y[y.size/2:]/y[y.size/2]
def time_delay_func_paralel(start, end, outs, multi):
for idx in range(start, end):
print 'locating ...', getpid()
c = numpy.correlate(multi[0, ][:, 0], multi[idx, ][:, 0], "full")
C, I = c.max(0), c.argmax(0)
outs[idx] = ((float(len(c))+1.0)/2.0 - I)/44100.0
def time_delay_func(x, y):
print 'locating ...'
c = numpy.correlate(x[:, 0], y[:, 0], "full")
C, I = c.max(0), c.argmax(0)
out = ((float(len(c))+1.0)/2.0 - I)/44100.0
return out
def fst_delay_snd(fst, snd, samp_rate, max_delay):
# Verify argument shape.
s1, s2 = fst.shape, snd.shape
if len(s1) != 1 or len(s2) != 1 or s1[0] != s2[0]:
raise Exception("Argument shape invalid, in 'fst_delay_snd' function")
half_len = int(s1[0]/2)
a = numpy.array(fst, dtype=numpy.double)
b = numpy.array(snd, dtype=numpy.double)
corr = numpy.correlate(a, b, 'same')
max_pos = numpy.argmax(corr)
# plot(s1[0], samp_rate, a, b, corr)
return corr, (max_pos - half_len) / samp_rate
def fst_delay_snd(fst, snd, samp_rate):
# Verify argument shape.
s1, s2 = fst.shape, snd.shape
if len(s1) != 1 or len(s2) != 1 or s1[0] != s2[0]:
raise Exception("Argument shape invalid, in 'fst_delay_snd' function")
half_len = int(s1[0]/2)
corre = numpy.correlate(fst, snd, 'same')
max_pos = numpy.argmax(corre)
return (max_pos - half_len)/samp_rate
def rolling_mean(X, window_size):
w = 1.0 / window_size * np.ones((window_size))
return np.correlate(X, w, 'valid')
def rolling_mean(X, window_size):
w = 1.0 / window_size * np.ones((window_size))
return np.correlate(X, w, 'valid')
def test_float(self):
self._setup(np.float)
z = np.correlate(self.x, self.y, 'full')
assert_array_almost_equal(z, self.z1)
z = np.correlate(self.x, self.y[:-1], 'full')
assert_array_almost_equal(z, self.z1_4)
z = np.correlate(self.y, self.x, 'full')
assert_array_almost_equal(z, self.z2)
z = np.correlate(self.x[::-1], self.y, 'full')
assert_array_almost_equal(z, self.z1r)
z = np.correlate(self.y, self.x[::-1], 'full')
assert_array_almost_equal(z, self.z2r)
z = np.correlate(self.xs, self.y, 'full')
assert_array_almost_equal(z, self.zs)
def test_object(self):
self._setup(Decimal)
z = np.correlate(self.x, self.y, 'full')
assert_array_almost_equal(z, self.z1)
z = np.correlate(self.y, self.x, 'full')
assert_array_almost_equal(z, self.z2)
def test_no_overwrite(self):
d = np.ones(100)
k = np.ones(3)
np.correlate(d, k)
assert_array_equal(d, np.ones(100))
assert_array_equal(k, np.ones(3))
def test_complex(self):
x = np.array([1, 2, 3, 4+1j], dtype=np.complex)
y = np.array([-1, -2j, 3+1j], dtype=np.complex)
r_z = np.array([3-1j, 6, 8+1j, 11+5j, -5+8j, -4-1j], dtype=np.complex)
r_z = r_z[::-1].conjugate()
z = np.correlate(y, x, mode='full')
assert_array_almost_equal(z, r_z)
def autoCorrelation(s):
def ac1d(x):
var = x.var()
x = x - x.mean()
r = np.correlate(x[-x.size:], x, mode='full')
return r[r.size//2:] / (var * np.arange(x.shape[0], 0, -1))
return np.apply_along_axis(ac1d, 0, s)
def dodemod(rx, P:dict):
aud = None
fs = P['rxfs']
if P['demod']=='chirp':
tx = loadbin(P['txfn'], P['txfs'])
if tx is None:
warnings.warn('simulated chirp reception')
tx = rx
rx = 0.05*rx + 0.1*rx.max()*(np.random.randn(rx.size) + 1j*np.random.randn(rx.size))
txfs = fs
else:
rx = scipy.signal.resample_poly(rx, UP, DOWN)
fs = txfs = P['txfs']
txsec = tx.size/txfs # length of TX in seconds
if P['pri'] is None:
pri=txsec
print(f'Using {pri*1000} ms PRI and {P["Npulse"]} pulses incoherently integrated')
# %% integration
NrxPRI = int(fs * pri) # Number of RX samples per PRI
NrxStack = rx.size // NrxPRI # number of complete PRIs received in this data
Nint = NrxStack // P['Npulse'] # Number of steps we'll take iterating
Nextract = P['Npulse'] * NrxPRI # total number of samples to extract (in general part of one PRI is discarded after numerous PRIs)
ax=None
for i in range(Nint):
ci = slice(i*Nextract, (i+1)*Nextract)
rxint = rx[ci].reshape((NrxPRI, P['Npulse'])).mean(axis=1)
Rxy = np.correlate(tx, rxint, 'full')
ax = plotxcor(Rxy, txfs, ax)
draw(); pause(0.5)
elif P['demod']=='am':
aud = am_demod(P['again']*rx, fs, fsaudio, P['fc'], p.audiobw, frumble=p.frumble, verbose=True)
elif P['demod']=='ssb':
aud = ssb_demod(P['again']*rx, fs, fsaudio, P['fc'], p.audiobw,verbose=True)
return aud,fs
def plot(seq):
"""Plot the autocorrelation of the given sequence."""
import matplotlib.pyplot as plt
bipolar = np.where(seq, 1.0, -1.0)
autocorr = np.correlate(bipolar, bipolar, 'same')
plt.figure()
plt.title("Length {} Gold code autocorrelation".format(len(seq)))
xdata = np.arange(len(seq)) - len(seq) // 2
plt.plot(xdata, autocorr, '.-')
plt.show()
def _print_stats(seq):
bipolar = np.where(seq, 1.0, -1.0)
autocorr = np.correlate(bipolar, bipolar, 'same')
peaks = np.sort(np.abs(autocorr))
peak = peaks[-1]
noise = np.sqrt(np.mean(peaks[:-1]**2))
peak_to_peak2 = peak / peaks[-2]
peak_to_noise = peak / noise
print("Peak amplitude: {:.0f}".format(peak))
print("Largest non-peak amplitude: {:.0f}".format(peaks[-2]))
print("Peak-to-max: {:.2f}".format(peak_to_peak2))
print("Peak-to-noise: {:.2f}".format(peak_to_noise))
def correlationIndividual(data, idx = (0,1), cls = -1, delay = (-100, 100)):
"""Calculate corrs and auto correlation in time between the various measures"""
n = len(idx);
means = np.mean(data[:,:-1], axis = 0);
nd = delay[1] - delay[0] + 1;
cc = np.zeros((nd,n,n))
for i in range(n):
for j in range(n):
if delay[0] < 0:
cm = np.correlate(data[:, i] - means[i], data[-delay[0]:, j] - means[j]);
else:
cm = [0];
if delay[1] > 0:
cp = np.correlate(data[:, j] - means[j], data[delay[1]:, i] - means[i]);
else:
cp = [0];
ca = np.concatenate((cm[1:], cp[::-1]));
if delay[0] > 0:
cc[:,i,j] = ca[delay[0]:];
elif delay[1] < 0:
cc[:,i,j] = ca[:-delay[1]];
else:
cc[:,i,j] = ca;
return cc;
def correlationIndividualStages(data, idx = (0,1), cls = -1, delay = (-100, 100)):
"""Calculate correlation functions in time between the various measures in each stage"""
stages = stageIndex(data, cls = cls);
stages = np.insert(np.append(stages, data.shape[0]), 0, 0);
ns = len(stages) - 1;
n = len(idx);
means = np.mean(data[:,:-1], axis = 0);
nd = delay[1] - delay[0] + 1;
cc = np.zeros((nd,n,n,ns))
for s in range(ns):
dat = data[stages[s]:stages[s+1],:];
for i in range(n):
for j in range(n):
if delay[0] < 0:
cm = np.correlate(dat[:, i] - means[i], dat[-delay[0]:, j] - means[j]);
else:
cm = [0];
if delay[1] > 0:
cp = np.correlate(dat[:, j] - means[j], dat[delay[1]:, i] - means[i]);
else:
cp = [0];
ca = np.concatenate((cm[1:], cp[::-1]));
if delay[0] > 0:
cc[:,i,j,s] = ca[delay[0]:];
elif delay[1] < 0:
cc[:,i,j,s] = ca[:-delay[1]];
else:
cc[:,i,j,s] = ca;
return cc;
def correlogram(experiment):
left = experiment.getresults('left')
right = experiment.getresults('right')
corr_trials = [np.correlate(l,r,'same') for l,r in zip(left,right)]
return np.mean(corr_trials,axis=0)
def five_group_stats(group):
sales = np.array(group['Demanda_uni_equil'].values)
samana = group['Semana'].values
max_index = np.argmax(samana)
returns = group['Dev_proxima'].mean()
#this is signature on when slaes happens
sorted_samana_index = np.argsort(samana)
sorted_sales = sales[sorted_samana_index]
signature = np.sum([ math.pow(2,s-3) for s in samana])
kurtosis = fillna_and_inf(scipy.stats.kurtosis(sorted_sales))
hmean = fillna_and_inf(scipy.stats.hmean(np.where(sales <0, 0.1, sales)))
entropy = fillna_and_inf(scipy.stats.entropy(sales))
std = fillna_and_inf(np.std(sales))
N = len(sales)
ci = fillna_and_inf(calculate_ci(std, N))
corr = fillna_and_inf(scipy.stats.pearsonr(range(N), sorted_sales)[0])
autocorr_list = np.correlate(sorted_sales, sorted_sales, mode='same')
mean_autocorr = fillna_and_inf(np.mean(autocorr_list))
mean = np.mean(sales)
mean_corss_points_count = 0
if N > 1:
high_than_mean = mean < sorted_sales[0]
for i in range(1,N):
if (high_than_mean and mean > sorted_sales[i]) or (not high_than_mean and mean > sorted_sales[i]):
mean_corss_points_count += mean_corss_points_count
high_than_mean = mean < sorted_sales[i]
return mean, N, std, np.median(sales), sales[max_index], samana[max_index], \
returns, signature, kurtosis, hmean, entropy, ci, corr, mean_autocorr, mean_corss_points_count
def test_float(self):
self._setup(np.float)
z = np.correlate(self.x, self.y, 'full')
assert_array_almost_equal(z, self.z1)
z = np.correlate(self.x, self.y[:-1], 'full')
assert_array_almost_equal(z, self.z1_4)
z = np.correlate(self.y, self.x, 'full')
assert_array_almost_equal(z, self.z2)
z = np.correlate(self.x[::-1], self.y, 'full')
assert_array_almost_equal(z, self.z1r)
z = np.correlate(self.y, self.x[::-1], 'full')
assert_array_almost_equal(z, self.z2r)
z = np.correlate(self.xs, self.y, 'full')
assert_array_almost_equal(z, self.zs)
def test_object(self):
self._setup(Decimal)
z = np.correlate(self.x, self.y, 'full')
assert_array_almost_equal(z, self.z1)
z = np.correlate(self.y, self.x, 'full')
assert_array_almost_equal(z, self.z2)
def test_no_overwrite(self):
d = np.ones(100)
k = np.ones(3)
np.correlate(d, k)
assert_array_equal(d, np.ones(100))
assert_array_equal(k, np.ones(3))
def test_complex(self):
x = np.array([1, 2, 3, 4+1j], dtype=np.complex)
y = np.array([-1, -2j, 3+1j], dtype=np.complex)
r_z = np.array([3-1j, 6, 8+1j, 11+5j, -5+8j, -4-1j], dtype=np.complex)
r_z = r_z[::-1].conjugate()
z = np.correlate(y, x, mode='full')
assert_array_almost_equal(z, r_z)
def test_float(self):
self._setup(np.float)
z = np.correlate(self.x, self.y, 'full')
assert_array_almost_equal(z, self.z1)
z = np.correlate(self.x, self.y[:-1], 'full')
assert_array_almost_equal(z, self.z1_4)
z = np.correlate(self.y, self.x, 'full')
assert_array_almost_equal(z, self.z2)
z = np.correlate(self.x[::-1], self.y, 'full')
assert_array_almost_equal(z, self.z1r)
z = np.correlate(self.y, self.x[::-1], 'full')
assert_array_almost_equal(z, self.z2r)
z = np.correlate(self.xs, self.y, 'full')
assert_array_almost_equal(z, self.zs)
def test_object(self):
self._setup(Decimal)
z = np.correlate(self.x, self.y, 'full')
assert_array_almost_equal(z, self.z1)
z = np.correlate(self.y, self.x, 'full')
assert_array_almost_equal(z, self.z2)
def test_no_overwrite(self):
d = np.ones(100)
k = np.ones(3)
np.correlate(d, k)
assert_array_equal(d, np.ones(100))
assert_array_equal(k, np.ones(3))
def test_complex(self):
x = np.array([1, 2, 3, 4+1j], dtype=np.complex)
y = np.array([-1, -2j, 3+1j], dtype=np.complex)
r_z = np.array([3-1j, 6, 8+1j, 11+5j, -5+8j, -4-1j], dtype=np.complex)
r_z = r_z[::-1].conjugate()
z = np.correlate(y, x, mode='full')
assert_array_almost_equal(z, r_z)
def angle_from_audio(self, file_name, chunks):
[rate, wave] = wavfile.read(file_name)
raw_0 = wave[:, 0].astype(np.float64)
raw_1 = wave[:, 1].astype(np.float64)
for i in range(1, chunks):
start = i*chunks
end = (i+1)*chunks
left = raw_0[start:end]
right = raw_1[start-self.buffer:end+self.buffer]
corr_arr = np.correlate(right, left, 'valid')
max_index = (len(corr_arr)/2)-np.argmax(corr_arr)
time_d = max_index/float(rate)
signal_dist = time_d*self.sound_speed
if (signal_dist != 0 and abs(signal_dist)<=self.mic_dist):
angle = math.degrees(math.asin( signal_dist / self.mic_dist))
self.angles.append(angle)
a = np.array(self.angles)
hist, bins = np.histogram(a, bins=10)
# width = 0.7 * (bins[1] - bins[0])
# center = (bins[:-1] + bins[1:]) / 2
# plt.bar(center, hist, align='center', width=width)
# plt.xlabel('Angle (degrees)', fontsize=16)
# plt.show()
index = np.argmax(hist)
self.angle_pred = bins[index]
print self.angle_pred
def autocorrelation(x,lags): #Temporal correlation
n = len(x)
x = np.array(x)
result = [np.correlate(x[i:] - x[i:].mean(),x[:n-i]-x[:n-i].mean())[0]\
/(x[i:].std()*x[:n-i].std()*(n-i)) for i in range(1,lags+1)]
return result