Python numpy 模块,fabs() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用numpy.fabs()。
def sparse_optical_flow(im1, im2, pts, fb_threshold=-1,
window_size=15, max_level=2,
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03)):
# Forward flow
p1, st, err = cv2.calcOpticalFlowPyrLK(im1, im2, pts, None,
winSize=(window_size, window_size),
maxLevel=max_level, criteria=criteria )
# Backward flow
if fb_threshold > 0:
p0r, st0, err = cv2.calcOpticalFlowPyrLK(im2, im1, p1, None,
winSize=(window_size, window_size),
maxLevel=max_level, criteria=criteria)
p0r[st0 == 0] = np.nan
# Set only good
fb_good = (np.fabs(p0r-p0) < fb_threshold).all(axis=1)
p1[~fb_good] = np.nan
st = np.bitwise_and(st, st0)
err[~fb_good] = np.nan
return p1, st, err
def check_visibility(camera, pts_w, zmin=0, zmax=100):
"""
Check if points are visible given fov of camera.
This method checks for both horizontal and vertical
fov.
camera: type Camera
"""
# Transform points in to camera's reference
# Camera: p_cw
pts_c = camera.c2w(pts_w.reshape(-1, 3))
# Determine look-at vector, and check angle
# subtended with camera's z-vector (3rd column)
z = pts_c[:,2]
v = pts_c / np.linalg.norm(pts_c, axis=1).reshape(-1, 1)
hangle, vangle = np.arctan2(v[:,0], v[:,2]), np.arctan2(-v[:,1], v[:,2])
# Provides inds mask for all points that are within fov
return np.fabs(hangle) < camera.fov[0] * 0.5 and \
np.fabs(vangle) < camera.fov[1] * 0.5 and \
z >= zmin and z <= zmax
def test_multiple_calls(self):
"""Tests that the results are the same after calling multiple times
"""
bayes = Bayesian(simulations={'Gun': [self.sim1, self.exp1]},
models={'eos': self.eos_model},
opt_keys='eos')
sol1, hist1, sens1, fisher1 = bayes()
sol2, hist2, sens2, fisher2 = bayes()
npt.assert_almost_equal(hist1[0], hist2[0], decimal=4,
err_msg='Histories not equal for subsequent'
'runs')
npt.assert_almost_equal(sol1.models['eos'].get_dof() /
sol2.models['eos'].get_dof(),
np.ones(bayes.shape()[1]),
decimal=10,
err_msg='DOF not equal for subsequent runs')
npt.assert_almost_equal(np.fabs(sens1['Gun'] - sens2['Gun']),
np.zeros(sens1['Gun'].shape),
decimal=10)
def test_effvol_complete():
# Test that the effective volume == volume when the completeness == 1
tsf= gaia_tools.select.tgasSelectUniform(comp=1.)
tesf= gaia_tools.select.tgasEffectiveSelect(tsf)
dxy, dz, zmin= 0.2, 0.1, 0.15
v= tesf.volume(\
lambda x,y,z: cyl_vol_func(x,y,z,xymax=dxy,zmin=zmin,zmax=zmin+dz),
xyz=True)
v_exp= numpy.pi*dxy**2.*dz
assert(numpy.fabs(v/v_exp-1.) < 10.**-3.), 'Effective volume for unit completeness is not equal to the volume'
# Another one
dxy, dz, zmin= 0.2, 0.2, -0.15
v= tesf.volume(\
lambda x,y,z: cyl_vol_func(x,y,z,xymax=dxy,zmin=zmin,zmax=zmin+dz),
xyz=True,ndists=251)
v_exp= numpy.pi*dxy**2.*dz
assert(numpy.fabs(v/v_exp-1.) < 10.**-2.), 'Effective volume for unit completeness is not equal to the volume'
return None
def test_effvol_uniform_complete():
# Test that the effective volume == A x volume when the completeness == A
comp= 0.33
tsf= gaia_tools.select.tgasSelectUniform(comp=comp)
tesf= gaia_tools.select.tgasEffectiveSelect(tsf)
dxy, dz, zmin= 0.2, 0.1, 0.15
v= tesf.volume(\
lambda x,y,z: cyl_vol_func(x,y,z,xymax=dxy,zmin=zmin,zmax=zmin+dz),
xyz=True)
v_exp= numpy.pi*dxy**2.*dz*comp
assert(numpy.fabs(v/v_exp-1.) < 10.**-3.), 'Effective volume for unit completeness is not equal to the volume'
# Another one
dxy, dz, zmin= 0.2, 0.2, -0.15
v= tesf.volume(\
lambda x,y,z: cyl_vol_func(x,y,z,xymax=dxy,zmin=zmin,zmax=zmin+dz),
xyz=True,ndists=251)
v_exp= numpy.pi*dxy**2.*dz*comp
assert(numpy.fabs(v/v_exp-1.) < 10.**-2.), 'Effective volume for unit completeness is not equal to the volume'
return None
def test_effvol_uniform_complete_partialsky():
# Test that the effective volume == A x volume x sky-fraction when the completeness == A over a fraction of the sky for a spherical volume
comp= 0.33
ramin, ramax= 30., 120.
tsf= gaia_tools.select.tgasSelectUniform(comp=comp,ramin=ramin,ramax=ramax)
tesf= gaia_tools.select.tgasEffectiveSelect(tsf)
dr, rmin= 0.1, 0.
v= tesf.volume(\
lambda x,y,z: spher_vol_func(x,y,z,rmin=rmin,rmax=rmin+dr),
xyz=True,ndists=251)
v_exp= 4.*numpy.pi*dr**3./3.*comp*(ramax-ramin)/360.
assert(numpy.fabs(v/v_exp-1.) < 10.**-2.), 'Effective volume for unit completeness is not equal to the volume'
# Another one
dr, rmin= 0.2, 0.
v= tesf.volume(\
lambda x,y,z: spher_vol_func(x,y,z,rmin=rmin,rmax=rmin+dr),
xyz=True,ndists=501)
v_exp= 4.*numpy.pi*dr**3./3.*comp*(ramax-ramin)/360.
assert(numpy.fabs(v/v_exp-1.) < 10.**-1.9), 'Effective volume for unit completeness is not equal to the volume'
return None
def azimuth_init(self):
_R_eq = self.radius_eq
_inc = float(self.parking_orbit_inc)
_lat = self.latitude()
_to = float(self.parking_orbit_alt)
_mu = self.mu
_Rot_p = self.rotational_period
node = "Ascending"
if _inc < 0:
node = "Descending"
_inc = np.fabs(_inc)
if (np.fabs(_lat)) > _inc: _inc = np.fabs(_lat)
if (180 - np.fabs(_lat)) < _inc: _inc = (180 - np.fabs(_lat))
velocity_eq = (2 * pi * _R_eq) / _Rot_p
t_orb_v = np.sqrt(_mu / (_to + _R_eq))
return _inc, _lat, velocity_eq, t_orb_v, node
def azimuth_init(self):
_R_eq = self.radius_eq
_inc = float(self.parking_orbit_inc)
_lat = self.latitude()
_to = float(self.parking_orbit_alt)
_mu = self.mu
_Rot_p = self.rotational_period
node = "Ascending"
if _inc < 0:
node = "Descending"
_inc = np.fabs(_inc)
if (np.fabs(_lat)) > _inc: _inc = np.fabs(_lat)
if (180 - np.fabs(_lat)) < _inc: _inc = (180 - np.fabs(_lat))
velocity_eq = (2 * pi * _R_eq) / _Rot_p
t_orb_v = np.sqrt(_mu / (_to + _R_eq))
return _inc, _lat, velocity_eq, t_orb_v, node
def azimuth_init(self):
_R_eq = self.radius_eq
_inc = float(self.parking_orbit_inc)
_lat = self.latitude()
_to = float(self.parking_orbit_alt)
_mu = self.mu
_Rot_p = self.rotational_period
node = "Ascending"
if _inc < 0:
node = "Descending"
_inc = np.fabs(_inc)
if (np.fabs(_lat)) > _inc: _inc = np.fabs(_lat)
if (180 - np.fabs(_lat)) < _inc: _inc = (180 - np.fabs(_lat))
velocity_eq = (2 * pi * _R_eq) / _Rot_p
t_orb_v = np.sqrt(_mu / (_to + _R_eq))
return _inc, _lat, velocity_eq, t_orb_v, node
def azimuth_init(self):
_R_eq = self.radius_eq
_inc = float(self.target_orbit_inc)
_lat = self.latitude()
_to = float(self.target_orbit_alt)
_mu = self.mu
_Rot_p = self.rotational_period
node = "Ascending"
if _inc < 0:
node = "Descending"
_inc = np.fabs(_inc)
if (np.fabs(_lat)) > _inc: _inc = np.fabs(_lat)
if (180 - np.fabs(_lat)) < _inc: _inc = (180 - np.fabs(_lat))
velocity_eq = (2 * pi * _R_eq) / _Rot_p
t_orb_v = np.sqrt(_mu / (_to + _R_eq))
return _inc, _lat, velocity_eq, t_orb_v, node
def test_dipole_fluxpoints(self):
"""Tests dipole flux points."""
field = ElectricField([PointCharge(-2, [0, 0]), PointCharge(2, [2, 0])])
circle = GaussianCircle([0, 0], 0.1)
fluxpoints = circle.fluxpoints(field, 4)
self.assertEqual(len(fluxpoints), 4)
fluxpoints = circle.fluxpoints(field, 14)
self.assertEqual(len(fluxpoints), 14)
self.assertTrue(isclose(fluxpoints[0], [0.1, 0]).all())
self.assertTrue(isclose(fluxpoints[7], [-0.1, 0]).all())
x1 = fluxpoints[1:7]
x2 = fluxpoints[-1:7:-1]
x2[:, 1] = fabs(x2[:, 1])
self.assertTrue(isclose(x1, x2).all())
#-----------------------------------------------------------------------------
# main()
def test_energy_conservation_sech2disk_manyparticles():
# Test that energy is conserved for a self-gravitating disk
N= 101
totmass= 1.
sigma= 1.
zh= 2.*sigma**2./totmass
x= numpy.arctanh(2.*numpy.random.uniform(size=N)-1)*zh
v= numpy.random.normal(size=N)*sigma
v-= numpy.mean(v) # stabilize
m= numpy.ones_like(x)/N*(1.+0.1*(2.*numpy.random.uniform(size=N)-1))
g= wendy.nbody(x,v,m,0.05)
E= wendy.energy(x,v,m)
cnt= 0
while cnt < 100:
tx,tv= next(g)
assert numpy.fabs(wendy.energy(tx,tv,m)-E) < 10.**-10., "Energy not conserved during simple N-body integration"
cnt+= 1
return None
def test_energy_conservation_sech2disk_manyparticles():
# Test that energy is conserved for a self-gravitating disk
N= 101
totmass= 1.
sigma= 1.
zh= 2.*sigma**2./totmass
x= numpy.arctanh(2.*numpy.random.uniform(size=N)-1)*zh
v= numpy.random.normal(size=N)*sigma
v-= numpy.mean(v) # stabilize
m= numpy.ones_like(x)/N*(1.+0.1*(2.*numpy.random.uniform(size=N)-1))
omega= 1.1
g= wendy.nbody(x,v,m,0.05,omega=omega)
E= wendy.energy(x,v,m,omega=omega)
cnt= 0
while cnt < 100:
tx,tv= next(g)
assert numpy.fabs(wendy.energy(tx,tv,m,omega=omega)-E) < 10.**-10., "Energy not conserved during simple N-body integration with external harmonic potential"
cnt+= 1
return None
def test_energy_individual():
# Simple test that the individual energies are calculated correctly
x= numpy.array([-1.1,0.1,1.3])
v= numpy.array([3.,2.,-5.])
m= numpy.array([1.,2.,3.])
omega= 1.1
E= wendy.energy(x,v,m,individual=True,omega=omega)
assert numpy.fabs(E[0]-m[0]*v[0]**2./2.-m[0]*(m[1]*numpy.fabs(x[0]-x[1])
+m[2]*numpy.fabs(x[0]-x[2])
+omega**2.*x[0]**2./2.)) < 10.**-10
assert numpy.fabs(E[1]-m[1]*v[1]**2./2.-m[1]*(m[0]*numpy.fabs(x[0]-x[1])
+m[2]*numpy.fabs(x[2]-x[1])
+omega**2.*x[1]**2./2.)) < 10.**-10
assert numpy.fabs(E[2]-m[2]*v[2]**2./2.-m[2]*(m[0]*numpy.fabs(x[0]-x[2])
+m[1]*numpy.fabs(x[2]-x[1])
+omega**2.*x[2]**2./2.)) < 10.**-10
return None
def test_energy_conservation_sech2disk_manyparticles():
# Test that energy is conserved for a self-gravitating disk
N= 101
totmass= 1.
sigma= 1.
zh= 2.*sigma**2./totmass
x= numpy.arctanh(2.*numpy.random.uniform(size=N)-1)*zh
v= numpy.random.normal(size=N)*sigma
v-= numpy.mean(v) # stabilize
m= numpy.ones_like(x)/N*(1.+0.1*(2.*numpy.random.uniform(size=N)-1))
g= wendy.nbody(x,v,m,0.05,approx=True,nleap=1000)
E= wendy.energy(x,v,m)
cnt= 0
while cnt < 100:
tx,tv= next(g)
assert numpy.fabs(wendy.energy(tx,tv,m)-E)/E < 10.**-6., "Energy not conserved during approximate N-body integration"
cnt+= 1
return None
def test_notracermasses():
# approx should work with tracer sheets
# Test that energy is conserved for a self-gravitating disk
N= 101
totmass= 1.
sigma= 1.
zh= 2.*sigma**2./totmass
x= numpy.arctanh(2.*numpy.random.uniform(size=N)-1)*zh
v= numpy.random.normal(size=N)*sigma
v-= numpy.mean(v) # stabilize
m= numpy.ones_like(x)/N*(1.+0.1*(2.*numpy.random.uniform(size=N)-1))
m[N//2:]= 0.
m*= 2.
g= wendy.nbody(x,v,m,0.05,approx=True,nleap=1000)
E= wendy.energy(x,v,m)
cnt= 0
while cnt < 100:
tx,tv= next(g)
assert numpy.fabs(wendy.energy(tx,tv,m)-E)/E < 10.**-6., "Energy not conserved during approximate N-body integration with some tracer particles"
cnt+= 1
return None
def test_energy_conservation_sech2disk_manyparticles():
# Test that energy is conserved for a self-gravitating disk
N= 101
totmass= 1.
sigma= 1.
zh= 2.*sigma**2./totmass
x= numpy.arctanh(2.*numpy.random.uniform(size=N)-1)*zh
v= numpy.random.normal(size=N)*sigma
v-= numpy.mean(v) # stabilize
m= numpy.ones_like(x)/N*(1.+0.1*(2.*numpy.random.uniform(size=N)-1))
omega= 1.1
g= wendy.nbody(x,v,m,0.05,omega=omega,approx=True,nleap=1000)
E= wendy.energy(x,v,m,omega=omega)
cnt= 0
while cnt < 100:
tx,tv= next(g)
assert numpy.fabs(wendy.energy(tx,tv,m,omega=omega)-E)/E < 10.**-6., "Energy not conserved during approximate N-body integration with external harmonic potential"
cnt+= 1
return None
def test_againstexact_sech2disk_manyparticles():
# Test that the exact N-body and the approximate N-body agree
N= 101
totmass= 1.
sigma= 1.
zh= 2.*sigma**2./totmass
x= numpy.arctanh(2.*numpy.random.uniform(size=N)-1)*zh
v= numpy.random.normal(size=N)*sigma
v-= numpy.mean(v) # stabilize
m= numpy.ones_like(x)/N*(1.+0.1*(2.*numpy.random.uniform(size=N)-1))
omega= 1.1
g= wendy.nbody(x,v,m,0.05,approx=True,nleap=2000,omega=omega)
ge= wendy.nbody(x,v,m,0.05,omega=omega)
cnt= 0
while cnt < 100:
tx,tv= next(g)
txe,tve= next(ge)
assert numpy.all(numpy.fabs(tx-txe) < 10.**-5.), "Exact and approximate N-body give different positions"
assert numpy.all(numpy.fabs(tv-tve) < 10.**-5.), "Exact and approximate N-body give different positions"
cnt+= 1
return None
def potential(y,x,v,m,twopiG=1.,omega=None):
"""
NAME:
potential
PURPOSE:
compute the gravitational potential at a set of points
INPUT:
y - positions at which to compute the potential
x - positions of N-body particles [N]
v - velocities of N-body particles [N]
m - masses of N-body particles [N]
twopiG= (1.) value of 2 \pi G
omega= (None) if set, frequency of external harmonic oscillator
OUTPUT:
potential(y)
HISTORY:
2017-05-12 - Written - Bovy (UofT/CCA)
"""
if not omega is None:
out= omega**2.*y**2./2.
else:
out= 0.
return out\
+twopiG\
*numpy.sum(m*numpy.fabs(x-numpy.atleast_2d(y).T),axis=1)
def visibilityTest(dpt, loc, tol=2.):
"""
z-buffer like visibility test for non-occluded joints
:param dpt: depth image
:param loc: 2D joint locations
:param tol: tolerance
:return: list of indices of visible ie non-occluded joints
"""
vis = []
for i in range(loc.shape[0]):
y = np.rint(loc[i, 1]).astype(int)
x = np.rint(loc[i, 0]).astype(int)
if 0 <= x < dpt.shape[1] and 0 <= y < dpt.shape[0]:
if np.fabs(dpt[y, x] - loc[i, 2]) < tol:
vis.append(i)
# else:
# print("joint {}: dpt {} anno {}".format(i, dpt[y, x], gtcrop[i, 2]))
return vis
def azimuth_init(self):
_R_eq = self.radius_eq
_inc = float(self.parking_orbit_inc)
_lat = self.latitude()
_to = float(self.parking_orbit_alt)
_mu = self.mu
_Rot_p = self.rotational_period
node = "Ascending"
if _inc < 0:
node = "Descending"
_inc = np.fabs(_inc)
if (np.fabs(_lat)) > _inc: _inc = np.fabs(_lat)
if (180 - np.fabs(_lat)) < _inc: _inc = (180 - np.fabs(_lat))
velocity_eq = (2 * pi * _R_eq) / _Rot_p
t_orb_v = np.sqrt(_mu / (_to + _R_eq))
return _inc, _lat, velocity_eq, t_orb_v, node
def azimuth_init(self):
_R_eq = self.radius_eq
_inc = float(self.target_orbit_inc)
_lat = self.latitude()
_to = float(self.target_orbit_alt)
_mu = self.mu
_Rot_p = self.rotational_period
node = "Ascending"
if _inc < 0:
node = "Descending"
_inc = np.fabs(_inc)
if (np.fabs(_lat)) > _inc: _inc = np.fabs(_lat)
if (180 - np.fabs(_lat)) < _inc: _inc = (180 - np.fabs(_lat))
velocity_eq = (2 * pi * _R_eq) / _Rot_p
t_orb_v = np.sqrt(_mu / (_to + _R_eq))
return _inc, _lat, velocity_eq, t_orb_v, node
def equilibrium_ionization(self):
"""
Calculate the ionization equilibrium for all ions of the element.
Brief explanation and equations about how these equations are solved.
"""
# Make matrix of ionization and recombination rates
a_matrix = np.zeros(self.temperature.shape + (self.atomic_number+1, self.atomic_number+1))
for i in range(1, self.atomic_number):
a_matrix[:, i, i] = -(self[i].ionization_rate() + self[i].recombination_rate()).value
a_matrix[:, i, i-1] = self[i-1].ionization_rate().value
a_matrix[:, i, i+1] = self[i+1].recombination_rate().value
a_matrix[:, 0, 0] = -(self[0].ionization_rate() + self[0].recombination_rate()).value
a_matrix[:, 0, 1] = self[1].recombination_rate().value
a_matrix[:, -1, -1] = -(self[-1].ionization_rate() + self[-1].recombination_rate()).value
a_matrix[:, -1, -2] = self[-2].ionization_rate().value
# Solve system of equations using SVD and normalize
_, _, V = np.linalg.svd(a_matrix)
# Select columns of V with smallest eigenvalues (returned in descending order)
ioneq = np.fabs(V[:, -1, :])
ioneq /= np.sum(ioneq, axis=1)[:, np.newaxis]
return ioneq
def _get_cci(cls, df, n_days=None):
""" Commodity Channel Index
CCI = (Typical Price - 20-period SMA of TP) / (.015 x Mean Deviation)
Typical Price (TP) = (High + Low + Close)/3
TP is also implemented as 'middle'.
:param df: data
:param n_days: N days window
:return: None
"""
if n_days is None:
n_days = 14
column_name = 'cci'
else:
n_days = int(n_days)
column_name = 'cci_{}'.format(n_days)
tp = df['middle']
tp_sma = df['middle_{}_sma'.format(n_days)]
md = df['middle'].rolling(
min_periods=1, center=False, window=n_days).apply(
lambda x: np.fabs(x - x.mean()).mean())
df[column_name] = (tp - tp_sma) / (.015 * md)
def mad(a, axis=None, c=1.4826, return_med=False):
"""Compute normalized median absolute difference
Can also return median array, as this can be expensive, and often we want both med and nmad
Note: 1.4826 = 1/0.6745
"""
a = checkma(a)
#return np.ma.median(np.fabs(a - np.ma.median(a))) / c
if a.count() > 0:
if axis is None:
med = fast_median(a)
out = fast_median(np.fabs(a - med)) * c
else:
med = np.ma.median(a, axis=axis)
out = np.ma.median(np.ma.fabs(a - med), axis=axis) * c
else:
out = np.ma.masked
if return_med:
out = (out, med)
return out
#Percentile values
def find_right_intersect(vec, target_val, start_index=0):
nearest_index = start_index
next_index = start_index
size = len(vec) - 1
if next_index == size:
return size
next_val = vec[next_index]
best_distance = np.abs(next_val - target_val)
while (next_index < size):
next_index += 1
next_val = vec[next_index]
dist = np.fabs(next_val - target_val)
if dist < best_distance:
best_distance = dist
nearest_index = next_index
if next_index == size or next_val < target_val:
break
return nearest_index
def find_left_intersect(vec, target_val, start_index=0):
nearest_index = start_index
next_index = start_index
size = len(vec) - 1
if next_index == size:
return size
next_val = vec[next_index]
best_distance = np.abs(next_val - target_val)
while (next_index > 0):
next_index -= 1
next_val = vec[next_index]
dist = np.fabs(next_val - target_val)
if dist < best_distance:
best_distance = dist
nearest_index = next_index
if next_index == size or next_val < target_val:
break
return nearest_index
def _wrap_results(result, dtype):
""" wrap our results if needed """
if is_datetime64_dtype(dtype):
if not isinstance(result, np.ndarray):
result = lib.Timestamp(result)
else:
result = result.view(dtype)
elif is_timedelta64_dtype(dtype):
if not isinstance(result, np.ndarray):
# raise if we have a timedelta64[ns] which is too large
if np.fabs(result) > _int64_max:
raise ValueError("overflow in timedelta operation")
result = lib.Timedelta(result, unit='ns')
else:
result = result.astype('i8').view(dtype)
return result
def test_irreg_hf(self):
import matplotlib.pyplot as plt
fig = plt.gcf()
plt.clf()
fig.add_subplot(111)
idx = date_range('2012-6-22 21:59:51', freq='S', periods=100)
df = DataFrame(np.random.randn(len(idx), 2), idx)
irreg = df.ix[[0, 1, 3, 4]]
ax = irreg.plot()
diffs = Series(ax.get_lines()[0].get_xydata()[:, 0]).diff()
sec = 1. / 24 / 60 / 60
self.assertTrue((np.fabs(diffs[1:] - [sec, sec * 2, sec]) < 1e-8).all(
))
plt.clf()
fig.add_subplot(111)
df2 = df.copy()
df2.index = df.index.asobject
ax = df2.plot()
diffs = Series(ax.get_lines()[0].get_xydata()[:, 0]).diff()
self.assertTrue((np.fabs(diffs[1:] - sec) < 1e-8).all())
def shareflux(lc1, lc2, frac=0.01):
"""
I add "noise" to lc1 and lc2 by randomly sharing flux between the two sources.
:param frac: The stddev of the gaussian "noise" in flux, with respect to the minimum flux in the curves.
"""
if not np.all(lc1.jds == lc2.jds):
raise RuntimeError("I do only work on curves with identical jds !")
#lc1fs = lc1.getrawfluxes()
#lc2fs = lc2.getrawfluxes()
minshift = np.fabs(max(lc1.getminfluxshift(), lc2.getminfluxshift()))
shifts = frac * minshift * np.random.randn(len(lc1))
shifts = np.clip(shifts, -minshift+1.0, minshift-1.0) # To garantee that we won't get negative fluxes
lc1.addfluxes(shifts)
lc2.addfluxes(-shifts)
def combigauss(subtds, subtderrs, truetds, lensmodelsigma = 0.0):
"""
Give me submission delays and error bars, as well as the corresponding true delays, in form of numpy arrays.
I compute the mean and sigma of the combined posterior on the fractional time delay distance error.
"""
from scipy.stats import norm
subtdoffs = subtds - truetds
centers = subtdoffs/truetds
sigmas = subtderrs/np.fabs(truetds)
# We convolve with the lensmodelsigma:
sigmas = np.sqrt(sigmas**2 + lensmodelsigma**2)
sigma_combi = 1.0 / np.sqrt(np.sum(1.0 / (sigmas**2)))
center_combi = sigma_combi**2 * np.sum( centers/sigmas**2 )
probazero = norm.pdf(0.0, center_combi, sigma_combi)
return (center_combi, sigma_combi, probazero)
# To plot this you could do:
#plt.plot(x, norm.pdf(x, center_combi, sigma_combi), ls="--", color="black")
def tv(self):
"""
Returns the total variation of the spline. Simple !
http://en.wikipedia.org/wiki/Total_variation
"""
# Method 1 : linear approximation
ptd = 5 # point density in days ... this is enough !
a = self.t[0]
b = self.t[-1]
x = np.linspace(a, b, int((b-a) * ptd))
y = self.eval(jds = x)
tv1 = np.sum(np.fabs(y[1:] - y[:-1]))
#print "TV1 : %f" % (tv1)
return tv1
# Method 2 : integrating the absolute value of the derivative ... hmm, splint does not integrate derivatives ..
#si.splev(jds, (self.t, self.c, self.k))
def calMinTheta(M):
vecContainZero=False
zeroCount=0
Mtrans_T=np.dot(M.T,M)
u,s,v=np.linalg.svd(Mtrans_T)
eigenValue=s[-1]
minVec=v[-1,:]
for i in minVec:
if np.fabs(i) < (1e-3):
zeroCount+=1
if zeroCount!=0:
print("0 exits and discard the following vector: ")
vecContainZero=True
print(minVec)
return minVec.T,vecContainZero
#scale the returned eigenVector of float into integer and check whether the scaled theta is valid(satisfy the threshold)
#if valid, return True and the valid eigenVector, if not valid, return False and empty eigenVector
def calMinTheta(M):
vecContainZero=False
zeroCount=0
Mtrans_T=np.dot(M.T,M)
u,s,v=np.linalg.svd(Mtrans_T)
eigenValue=s[-1]
minVec=v[-1,:]
for i in minVec:
if np.fabs(i) < (1e-3):
zeroCount+=1
if zeroCount!=0:
print("0 exits and discard the following vector: ")
vecContainZero=True
print(minVec)
return minVec.T,vecContainZero
#scale the returned eigenVector of float into integer and check whether the scaled theta is valid(satisfy the threshold)
#if valid, return True and the valid eigenVector, if not valid, return False and empty eigenVector
def visit_fn(self, temperature):
factor1 = np.exp(np.log(temperature) / (self.qv - 1.0))
factor2 = np.exp((4.0 - self.qv) * np.log(self.qv - 1.0))
factor3 = np.exp((2.0 - self.qv) * np.log(2.0) / (self.qv - 1.0))
factor4 = np.sqrt(np.pi) * factor1 * factor2 / (factor3 * (
3.0 - self.qv))
factor5 = 1.0 / (self.qv - 1.0) - 0.5
d1 = 2.0 - factor5
factor6 = np.pi * (1.0 - factor5) / np.sin(
np.pi * (1.0 - factor5)) / np.exp(gammaln(d1))
sigmax = np.exp(-(self.qv - 1.0) * np.log(
factor6 / factor4) / (3.0 - self.qv))
x = sigmax * self.gaussian_fn(1)
y = self.gaussian_fn(0)
den = np.exp(
(self.qv - 1.0) * np.log((np.fabs(y))) / (3.0 - self.qv))
return x / den
def _em(X, eps=0.001):
""" EM algorithm, find weight.
X : numpy two dim ndarray.
return: weights
usage:
>>> X = np.array([[1, 2], [2, 4], [3, 1]])
>>> print em(X)
[ 0.33586597 0.66413403]
"""
N, K = X.shape
# init
W = X.sum(axis=0) / float(X.sum())
# solve
while True:
W0 = W
# E step
Y = np.tile(W, (N, 1)) * X
Q = Y / np.tile(Y.sum(axis=1), (K, 1)).T
# M step
W = Q.sum(axis=0) / N
# termination ?
if np.fabs(W - W0).sum() < eps:
break
return W
def is_coplanar(sample1, sample2):
"""
To decide if two cluster of pixels are coplanar
Args:
sample1,sample2
Returns:
True or False
"""
vec1 = fit_plane(sample1)
vec2 = fit_plane(sample2)
mag1 = np.sqrt(vec1.dot(vec1))
mag2 = np.sqrt(vec2.dot(vec2))
cosine = (vec1.dot(vec2))/(mag1 * mag2)
if np.fabs(cosine)> 0.95:
return True
else:
return False
def lksprob(alam):
"""
Computes a Kolmolgorov-Smirnov t-test significance level. Adapted from
Numerical Recipes.
Usage: lksprob(alam)
"""
fac = 2.0
sum = 0.0
termbf = 0.0
a2 = -2.0*alam*alam
for j in range(1,201):
term = fac*math.exp(a2*j*j)
sum = sum + term
if math.fabs(term) <= (0.001*termbf) or math.fabs(term) < (1.0e-8*sum):
return sum
fac = -fac
termbf = math.fabs(term)
return 1.0 # Get here only if fails to converge; was 0.0!!
def __call__(self, individual):
try:
# Transform the tree expression in a callable function
func = toolbox.compile(expr=individual)
error = numpy.fabs(func(*self.inVarValues) - self.targetVarValues) / self.targetVarValues
avgerror = numpy.average(error)
return (avgerror,)
except NameError as ne:
print ne
print ne.message
except Exception as e:
return (sys.float_info.max,)
#This is a basic error function that finds the total of all the relative errors.
#Note that your data set should not contain any 0 values. That will cause a "divide by zero" error.
#At initialization time, it gets the data to compare against.
#
# The data is a list of lists, the labels give the names of interesting data.
# The config file defines which data lists are of use.
# All data lists are expected to be of equla length. Repeated values are perfectly OK.
# The config file defines a set of "inVars" which are the input variables. (e.g. rho, T)
# it also defines a single "targetVar" which is the array of function values.
def __call__(self, individual):
try:
# Transform the tree expression in a callable function
func = toolbox.compile(expr=individual)
error = numpy.fabs(func(*self.inVarValues) - self.targetVarValues) / self.targetVarValues
sumerror = numpy.sum(error)
return (sumerror,)
except NameError as ne:
print ne
print ne.message
except Exception as e:
return (sys.float_info.max,)
#This is a basic error function that finds the maximum of all the relative errors.
#Note that your data set should not contain any 0 values. That will cause a "divide by zero" error.
#At initialization time, it gets the data to compare against.
#
# The data is a list of lists, the labels give the names of interesting data.
# The config file defines which data lists are of use.
# All data lists are expected to be of equla length. Repeated values are perfectly OK.
# The config file defines a set of "inVars" which are the input variables. (e.g. rho, T)
# it also defines a single "targetVar" which is the array of function values.
def __call__(self, individual):
try:
# Transform the tree expression in a callable function
func = toolbox.compile(expr=individual)
error = numpy.fabs((func(*self.inVarValues) - self.targetVarValues)) / self.targetVarValues
maxerror = numpy.max(error)
return (maxerror,)
except NameError as ne:
print ne
print ne.message
except Exception as e:
return (sys.float_info.max,)
#R^2 is a common regression measurement to find how much variance is explained by the approximation.
#It works well early on in the calcuation, but loses percision has the approximation becomes close.
#
# The data is a list of lists, the labels give the names of interesting data.
# The config file defines which data lists are of use.
# All data lists are expected to be of equla length. Repeated values are perfectly OK.
# The config file defines a set of "inVars" which are the input variables. (e.g. rho, T)
# it also defines a single "targetVar" which is the array of function values.
def _filter_streamlines(self, streamline, close_threshold=0.05,
loop_length_range: u.cm =[2.e+9, 5.e+10]*u.cm, **kwargs):
"""
Check extracted loop to make sure it fits given criteria. Return True if it passes.
Parameters
----------
streamline : yt streamline object
close_threshold : `float`
percentage of domain width allowed between loop endpoints
loop_length_range : `~astropy.Quantity`
minimum and maximum allowed loop lengths (in centimeters)
"""
streamline = streamline[np.all(streamline != 0.0, axis=1)]
loop_length = np.sum(np.linalg.norm(np.diff(streamline, axis=0), axis=1))
if np.fabs(streamline[0, 2] - streamline[-1, 2]) > close_threshold*self.extrapolated_3d_field.domain_width[2]:
return False
elif loop_length > loop_length_range[1].to(u.cm).value or loop_length < loop_length_range[0].to(u.cm).value:
return False
else:
return True
def make_detector_array(self, field):
"""
Construct bins based on desired observing area.
"""
delta_x = np.fabs(field.clipped_hmi_map.xrange[1] - field.clipped_hmi_map.xrange[0])
delta_y = np.fabs(field.clipped_hmi_map.yrange[1] - field.clipped_hmi_map.yrange[0])
min_z = min(field.extrapolated_3d_field.domain_left_edge[2].value,
self.total_coordinates[:,2].min().value)
max_z = max(field.extrapolated_3d_field.domain_right_edge[2].value,
self.total_coordinates[:,2].max().value)
delta_z = field._convert_angle_to_length(max(self.resolution.x, self.resolution.y)).value
self.bins = Pair(int(np.ceil((delta_x/self.resolution.x).decompose()).value),
int(np.ceil((delta_y/self.resolution.y).decompose()).value),
int(np.ceil(np.fabs(max_z - min_z)/delta_z)))
self.bin_range = Pair(field._convert_angle_to_length(field.clipped_hmi_map.xrange).value,
field._convert_angle_to_length(field.clipped_hmi_map.yrange).value,
np.array([min_z, max_z]))
def equilibrium_ionization(self, rate_matrix=None):
"""
Calculate the ionization equilibrium for all ions of the element.
Brief explanation and equations about how these equations are solved.
"""
if rate_matrix is None:
rate_matrix = self._rate_matrix()
# Solve system of equations using SVD and normalize
_, _, V = np.linalg.svd(rate_matrix.value)
# Select columns of V with smallest eigenvalues (returned in descending order)
ioneq = np.fabs(V[:, -1, :])
ioneq /= np.sum(ioneq, axis=1)[:, np.newaxis]
return u.Quantity(ioneq)
def emissivity(self, density: u.cm**(-3), include_energy=False, **kwargs):
"""
Calculate emissivity for all lines as a function of temperature and density
"""
populations = self.level_populations(density, include_protons=kwargs.get('include_protons', True))
if populations is None:
return (None, None)
wavelengths = np.fabs(self._wgfa['wavelength'])
# Exclude 0 wavelengths which correspond to two-photon decays
upper_levels = self._wgfa['upper_level'][wavelengths != 0*u.angstrom]
a_values = self._wgfa['A'][wavelengths != 0*u.angstrom]
wavelengths = wavelengths[wavelengths != 0*u.angstrom]
if include_energy:
energy = const.h.cgs*const.c.cgs/wavelengths.to(u.cm)
else:
energy = 1.*u.photon
emissivity = populations[:, :, upper_levels - 1]*(a_values*energy)
return wavelengths, emissivity
def _get_cci(cls, df, n_days=None):
""" Commodity Channel Index
CCI = (Typical Price - 20-period SMA of TP) / (.015 x Mean Deviation)
Typical Price (TP) = (High + Low + Close)/3
TP is also implemented as 'middle'.
:param df: data
:param n_days: N days window
:return: None
"""
if n_days is None:
n_days = 14
column_name = 'cci'
else:
n_days = int(n_days)
column_name = 'cci_{}'.format(n_days)
tp = df['middle']
tp_sma = df['middle_{}_sma'.format(n_days)]
md = df['middle'].rolling(
min_periods=1, center=False, window=n_days).apply(
lambda x: np.fabs(x - x.mean()).mean())
df[column_name] = (tp - tp_sma) / (.015 * md)
def test_load_image(self):
blob = sub_mean(self.image)
image_mean = np.mean(np.mean(self.image, axis=0), axis=0)
blob_mean = np.mean(np.mean(blob, axis=0), axis=0)
assert (np.fabs(config.RGB_MEAN - (image_mean - blob_mean)) < 1e-9).all()
def equal(a, b):
return np.fabs(a - b) < EPS
# (4), (4)
# (4), (num, 4)
# or (num, 4), (num, 4)
# (num, 4), (4)
def __init__(self, directory, is_2015=False, scale=1.0):
"""
Ground truth dataset iterator
"""
if is_2015:
left_dir, right_dir = 'image_2', 'image_3'
noc_dir, occ_dir = 'disp_noc_0', 'disp_occ_0'
calib_left, calib_right = 'P2', 'P3'
else:
left_dir, right_dir = 'image_0', 'image_1'
noc_dir, occ_dir = 'disp_noc', 'disp_occ'
calib_left, calib_right = 'P0', 'P1'
self.scale = scale
# Stereo is only evaluated on the _10.png images
self.stereo = StereoDatasetReader(os.path.expanduser(directory),
left_template=''.join([left_dir, '/%06i_10.png']),
right_template=''.join([right_dir, '/%06i_10.png']), scale=scale, grayscale=True)
self.noc = ImageDatasetReader(template=os.path.join(os.path.expanduser(directory), noc_dir, '%06i_10.png'))
self.occ = ImageDatasetReader(template=os.path.join(os.path.expanduser(directory), occ_dir, '%06i_10.png'))
def calib_read(fn, scale):
db = AttrDict.load_yaml(fn)
P0 = np.float32(db[calib_left].split(' '))
P1 = np.float32(db[calib_right].split(' '))
fx, cx, cy = P0[0], P0[2], P0[6]
baseline_px = np.fabs(P1[3])
return StereoCamera.from_calib_params(fx, fx, cx, cy, baseline_px=baseline_px)
self.calib = DatasetReader(template=os.path.join(os.path.expanduser(directory), 'calib/%06i.txt'),
process_cb=lambda fn: calib_read(fn, scale))
self.poses = repeat(None)
def im_resize(im, shape=None, scale=0.5, interpolation=cv2.INTER_AREA):
if shape is not None:
return cv2.resize(im, dsize=shape, fx=0., fy=0., interpolation=interpolation)
else:
if np.fabs(scale-1.0) < 1e-2:
return im
elif scale <= 1.0:
return cv2.resize(im, None, fx=scale, fy=scale, interpolation=interpolation)
else:
shape = (int(im.shape[1]*scale), int(im.shape[0]*scale))
return im_resize(im, shape)