Python numpy 模块,nan() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用numpy.nan()。
def sparse_optical_flow(im1, im2, pts, fb_threshold=-1,
window_size=15, max_level=2,
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03)):
# Forward flow
p1, st, err = cv2.calcOpticalFlowPyrLK(im1, im2, pts, None,
winSize=(window_size, window_size),
maxLevel=max_level, criteria=criteria )
# Backward flow
if fb_threshold > 0:
p0r, st0, err = cv2.calcOpticalFlowPyrLK(im2, im1, p1, None,
winSize=(window_size, window_size),
maxLevel=max_level, criteria=criteria)
p0r[st0 == 0] = np.nan
# Set only good
fb_good = (np.fabs(p0r-p0) < fb_threshold).all(axis=1)
p1[~fb_good] = np.nan
st = np.bitwise_and(st, st0)
err[~fb_good] = np.nan
return p1, st, err
def y_sum_by_time(x_arr, y_arr, top=None):
df = pd.DataFrame({'Timestamp': pd.to_datetime(x_arr, unit='s'), 'Status': y_arr})
df['Date'] = df['Timestamp'].apply(lambda x: "%d/%d/%d" % (x.day, x.month, x.year))
df['Hour'] = df['Timestamp'].apply(lambda x: "%d" % (x.hour))
df['Weekday'] = df['Timestamp'].apply(lambda x: "%s" % (x.weekday_name))
times = ['Hour', 'Weekday', 'Date']
result = {}
for groupby in times:
df_group = df.groupby(groupby, as_index=False).agg({'Status': np.sum})
if top != None and top > 0:
#df_group = df_group.nlargest(top, 'Status').sort(['Status', 'Hour'],ascending=False)
idx = df_group.nlargest(top, 'Status') > 0
else:
idx = df_group['Status'].max() == df_group['Status']
result[groupby] = {k: g['Status'].replace(np.nan, 'None').tolist() for k,g in df_group[idx].groupby(groupby)}
return result
def test_pd_outer_join():
dfs = [
pd.DataFrame({
'id': [0, 1, 2, 3],
'a': ['foo', 'bar', 'baz', np.nan],
'b': ['panda', 'zebra', np.nan, np.nan],
}),
pd.DataFrame({
'id': [1, 2, 3, 4],
'b': ['mouse', np.nan, 'tiger', 'egret'],
'c': ['toe', 'finger', 'nose', np.nan],
}),
]
expected = pd.DataFrame({
'id': [0, 1, 2, 3, 4],
'a': ['foo', 'bar', 'baz', np.nan, np.nan],
'b': ['panda', 'zebra', np.nan, 'tiger', 'egret'],
'c': [np.nan, 'toe', 'finger', 'nose', np.nan],
}).set_index('id')
actual = pd_outer_join(dfs, on='id')
print(expected)
print(actual)
assert expected.equals(actual)
def test_against_numpy_nanstd(self):
source = [np.random.random((16, 12, 5)) for _ in range(10)]
for arr in source:
arr[randint(0, 15), randint(0, 11), randint(0, 4)] = np.nan
stack = np.stack(source, axis = -1)
for axis in (0, 1, 2, None):
for ddof in range(4):
with self.subTest('axis = {}, ddof = {}'.format(axis, ddof)):
from_numpy = np.nanstd(stack, axis = axis, ddof = ddof)
from_ivar = last(istd(source, axis = axis, ddof = ddof, ignore_nan = True))
self.assertSequenceEqual(from_numpy.shape, from_ivar.shape)
self.assertTrue(np.allclose(from_ivar, from_numpy))
def frame_from_bardata(self, data, algo_dt):
"""
Create a DataFrame from the given BarData and algo dt.
"""
data = data._data
frame_data = np.empty((len(self.fields), len(self.sids))) * np.nan
for j, sid in enumerate(self.sids):
sid_data = data.get(sid)
if not sid_data:
continue
if algo_dt != sid_data['dt']:
continue
for i, field in enumerate(self.fields):
frame_data[i, j] = sid_data.get(field, np.nan)
return pd.DataFrame(
frame_data,
index=self.fields.copy(),
columns=self.sids.copy(),
)
def information_ratio(algo_volatility, algorithm_return, benchmark_return):
"""
http://en.wikipedia.org/wiki/Information_ratio
Args:
algorithm_returns (np.array-like):
All returns during algorithm lifetime.
benchmark_returns (np.array-like):
All benchmark returns during algo lifetime.
Returns:
float. Information ratio.
"""
if zp_math.tolerant_equals(algo_volatility, 0):
return np.nan
# The square of the annualization factor is in the volatility,
# because the volatility is also annualized,
# i.e. the sqrt(annual factor) is in the volatility's numerator.
# So to have the the correct annualization factor for the
# Sharpe value's numerator, which should be the sqrt(annual factor).
# The square of the sqrt of the annual factor, i.e. the annual factor
# itself, is needed in the numerator to factor out the division by
# its square root.
return (algorithm_return - benchmark_return) / algo_volatility
def sharpe_ratio(algorithm_volatility, algorithm_return, treasury_return):
"""
http://en.wikipedia.org/wiki/Sharpe_ratio
Args:
algorithm_volatility (float): Algorithm volatility.
algorithm_return (float): Algorithm return percentage.
treasury_return (float): Treasury return percentage.
Returns:
float. The Sharpe ratio.
"""
if zp_math.tolerant_equals(algorithm_volatility, 0):
return np.nan
return (algorithm_return - treasury_return) / algorithm_volatility
def test_nan_filter_panel(self):
dates = pd.date_range('1/1/2000', periods=2, freq='B', tz='UTC')
df = pd.Panel(np.random.randn(2, 2, 2),
major_axis=dates,
items=[4, 5],
minor_axis=['price', 'volume'])
# should be filtered
df.loc[4, dates[0], 'price'] = np.nan
# should not be filtered, should have been ffilled
df.loc[5, dates[1], 'price'] = np.nan
source = DataPanelSource(df)
event = next(source)
self.assertEqual(5, event.sid)
event = next(source)
self.assertEqual(4, event.sid)
self.assertRaises(StopIteration, next, source)
def _algo_record_float_magic_should_pass(self, var_type):
test_algo = TradingAlgorithm(
script=record_float_magic % var_type,
sim_params=self.sim_params,
env=self.env,
)
set_algo_instance(test_algo)
self.zipline_test_config['algorithm'] = test_algo
self.zipline_test_config['trade_count'] = 200
zipline = simfactory.create_test_zipline(
**self.zipline_test_config)
output, _ = drain_zipline(self, zipline)
self.assertEqual(len(output), 252)
incr = []
for o in output[:200]:
incr.append(o['daily_perf']['recorded_vars']['data'])
np.testing.assert_array_equal(incr, [np.nan] * 200)
def initialize_with(test_case, tfm_name, days):
def initalize(context):
context.test_case = test_case
context.days = days
context.mins_for_days = []
context.price_bars = (None, [np.nan], [np.nan], [np.nan])
context.vol_bars = (None, [np.nan], [np.nan], [np.nan])
if context.days:
context.warmup = days + 1
else:
context.warmup = 2
context.current_date = None
context.last_close_prices = [np.nan, np.nan, np.nan, np.nan]
add_transform(tfm_name, days)
return initalize
def test_ffill(self):
# test ndim=1
N = 100
s = pd.Series(np.random.randn(N))
mask = random.sample(range(N), 10)
s.iloc[mask] = np.nan
correct = s.ffill().values
test = ffill(s.values)
assert_almost_equal(correct, test)
# test ndim=2
df = pd.DataFrame(np.random.randn(N, N))
df.iloc[mask] = np.nan
correct = df.ffill().values
test = ffill(df.values)
assert_almost_equal(correct, test)
def track(self, im0, im1, p0):
"""
Main tracking method using sparse optical flow (LK)
"""
if p0 is None or not len(p0):
return np.array([])
# Forward flow
p1, st1, err1 = cv2.calcOpticalFlowPyrLK(im0, im1, p0, None, **self.lk_params_)
p1[st1 == 0] = np.nan
if self.fb_check_:
# Backward flow
p0r, st0, err0 = cv2.calcOpticalFlowPyrLK(im1, im0, p1, None, **self.lk_params_)
p0r[st0 == 0] = np.nan
# Set only good
fb_good = (np.fabs(p0r-p0) < 3).all(axis=1)
p1[~fb_good] = np.nan
return p1
def matthews_correl_coeff(ntp, ntn, nfp, nfn):
'''
This calculates the Matthews correlation coefficent.
https://en.wikipedia.org/wiki/Matthews_correlation_coefficient
'''
mcc_top = (ntp*ntn - nfp*nfn)
mcc_bot = msqrt((ntp + nfp)*(ntp + nfn)*(ntn + nfp)*(ntn + nfn))
if mcc_bot > 0:
return mcc_top/mcc_bot
else:
return np.nan
#######################################
## VARIABILITY RECOVERY (PER MAGBIN) ##
#######################################
def key_worker(task):
'''
This gets the required keys from the requested file.
'''
cpf, keys = task
cpd = checkplot._read_checkplot_picklefile(cpf)
resultkeys = []
for k in keys:
try:
resultkeys.append(dict_get(cpd, k))
except:
resultkeys.append(np.nan)
return resultkeys
############
## CONFIG ##
############
def smartcast(castee, caster, subval=None):
'''
This just tries to apply the caster function to castee.
Returns None on failure.
'''
try:
return caster(castee)
except Exception as e:
if caster is float or caster is int:
return nan
elif caster is str:
return ''
else:
return subval
# these are the keys used in the metadata section of the CSV LC
def test_PlotCurveItem():
p = pg.GraphicsWindow()
p.ci.layout.setContentsMargins(4, 4, 4, 4) # default margins vary by platform
v = p.addViewBox()
p.resize(200, 150)
data = np.array([1,4,2,3,np.inf,5,7,6,-np.inf,8,10,9,np.nan,-1,-2,0])
c = pg.PlotCurveItem(data)
v.addItem(c)
v.autoRange()
# Check auto-range works. Some platform differences may be expected..
checkRange = np.array([[-1.1457564053237301, 16.145756405323731], [-3.076811473165955, 11.076811473165955]])
assert np.allclose(v.viewRange(), checkRange)
assertImageApproved(p, 'plotcurveitem/connectall', "Plot curve with all points connected.")
c.setData(data, connect='pairs')
assertImageApproved(p, 'plotcurveitem/connectpairs', "Plot curve with pairs connected.")
c.setData(data, connect='finite')
assertImageApproved(p, 'plotcurveitem/connectfinite', "Plot curve with finite points connected.")
c.setData(data, connect=np.array([1,1,1,0,1,1,0,0,1,0,0,0,1,1,0,0]))
assertImageApproved(p, 'plotcurveitem/connectarray', "Plot curve with connection array.")
def rank_cat(df_tr,ycol,df_te=None,cols=None,rank=True,tag=''):
if cols is None:
cols = [i for i in df_tr.columns.values if df_tr[i].dtype=='object']
if len(cols)==0:
print("no cat cols found")
return
for col in cols:
dic = df_tr.groupby(col)[ycol].mean().to_dict()
if rank:
ks = [i for i in dic]
vs = np.array([dic[i] for i in ks]).argsort().argsort()
dic = {i:j for i,j in zip(ks,vs)}
df_tr[tag+col] = df_tr[col].apply(lambda x: dic[x])
if df_te is not None:
df_te[tag+col] = df_te[col].apply(lambda x: dic.get(x,np.nan))
#overfitting! try LOO!
def get_calibration_metrics(model, data):
scores = (data['X'] * data['Y']).dot(model)
#distinct scores
#compute calibration error at each score
full_metrics = {
'scores': float('nan'),
'count': float('nan'),
'predicted_risk': float('nan'),
'empirical_risk': float('nan')
}
cal_error = np.sqrt(np.sum(a*(a-b)^2)) ( - full_metrics['empirical_risk'])
summary_metrics = {
'mean_calibration_error': float('nan')
}
#counts
#metrics
#mean calibration error across all scores
pass
def round_solution_pool(pool, constraints):
pool.distinct().sort()
P = pool.P
L0_reg_ind = np.isnan(constraints['coef_set'].C_0j)
L0_max = constraints['L0_max']
rounded_pool = SolutionPool(P)
for solution in pool.solutions:
# sort from largest to smallest coefficients
feature_order = np.argsort([-abs(x) for x in solution])
rounded_solution = np.zeros(shape=(1, P))
l0_norm_count = 0
for k in range(0, P):
j = feature_order[k]
if not L0_reg_ind[j]:
rounded_solution[0, j] = np.round(solution[j], 0)
elif l0_norm_count < L0_max:
rounded_solution[0, j] = np.round(solution[j], 0)
l0_norm_count += L0_reg_ind[j]
rounded_pool.add(objvals=np.nan, solutions=rounded_solution)
rounded_pool.distinct().sort()
return rounded_pool
def clean_df(df, fill_nan=True, drop_empty_columns=True):
"""Clean a pandas dataframe by:
1. Filling empty values with Nan
2. Dropping columns with all empty values
Args:
df: Pandas DataFrame
fill_nan (bool): If any empty values (strings, None, etc) should be replaced with NaN
drop_empty_columns (bool): If columns whose values are all empty should be dropped
Returns:
DataFrame: cleaned DataFrame
"""
if fill_nan:
df = df.fillna(value=np.nan)
if drop_empty_columns:
df = df.dropna(axis=1, how='all')
return df.sort_index()
def parse_psqs(psqs_results_file):
"""Parse a PSQS result file and returns a Pandas DataFrame of the results
Args:
psqs_results_file: Path to psqs results file
Returns:
Pandas DataFrame: Summary of PSQS results
"""
# TODO: generalize column names for all results, save as dict instead
psqs_results = pd.read_csv(psqs_results_file, sep='\t', header=None)
psqs_results['pdb_file'] = psqs_results[0].apply(lambda x: str(x).strip('./').strip('.pdb'))
psqs_results = psqs_results.rename(columns = {1:'psqs_local', 2:'psqs_burial', 3:'psqs_contact', 4:'psqs_total'}).drop(0, axis=1)
psqs_results['u_pdb'] = psqs_results['pdb_file'].apply(lambda x: x.upper() if len(x)==4 else np.nan)
psqs_results['i_entry_name'] = psqs_results['pdb_file'].apply(lambda x: x.split('_model1')[0] if len(x)>4 else np.nan)
psqs_results = psqs_results[pd.notnull(psqs_results.psqs_total)]
return psqs_results
def getAccuracyAucOnAllTasks(self, task_list):
all_task_Y = []
all_preds = []
for i in range(len(task_list)):
preds, task_Y = self.getPredsTrueOnOneTask(task_list,i)
if preds is None:
# Skipping task because it does not have valid data
continue
if len(task_Y)>0:
all_task_Y.extend(task_Y)
all_preds.extend(preds)
if not helper.containsEachLabelType(all_preds):
print "for some bizarre reason, the preds for all tasks are the same class"
print "preds", all_preds
print "true_y", all_task_Y
auc = np.nan
else:
auc=roc_auc_score(all_task_Y, all_preds)
acc=hblr.getBinaryAccuracy(all_preds,all_task_Y)
return acc,auc
def getAccuracyAucOnOneTask(self, task_list, task, debug=False):
X_t, y_t = self.extractTaskData(task_list,task)
if len(X_t) == 0:
return np.nan, np.nan
preds = self.internal_predict(X_t, int(task))
if debug:
print "y_t:", y_t
print "preds:", preds
acc = helper.getBinaryAccuracy(preds,y_t)
if len(y_t) > 1 and helper.containsEachSVMLabelType(y_t) and helper.containsEachSVMLabelType(preds):
auc = roc_auc_score(y_t, preds)
else:
auc = np.nan
return acc, auc
def sweepAllParameters(self):
print "\nSweeping all parameters!"
self.calcNumSettingsDesired()
print "\nYou have chosen to test a total of", self.num_settings, "settings"
sys.stdout.flush()
#sweep all possible combinations of parameters
for C in self.c_vals:
for v in self.v_vals:
for regularizer in self.regularizers:
for kernel in self.kernels:
if kernel == 'linear':
self.testOneSetting(C, np.nan, kernel, v, regularizer)
else:
for beta in self.beta_vals:
self.testOneSetting(C, beta, kernel, v, regularizer)
self.val_results_df.to_csv(self.results_path + self.save_prefix + '.csv')
def test_ecdf_formal_custom():
assert dcst.ecdf_formal(0.1, [0, 1, 2, 3]) == 0.25
assert dcst.ecdf_formal(-0.1, [0, 1, 2, 3]) == 0.0
assert dcst.ecdf_formal(0.1, [3, 2, 0, 1]) == 0.25
assert dcst.ecdf_formal(-0.1, [3, 2, 0, 1]) == 0.0
assert dcst.ecdf_formal(2, [3, 2, 0, 1]) == 0.75
assert dcst.ecdf_formal(1, [3, 2, 0, 1]) == 0.5
assert dcst.ecdf_formal(3, [3, 2, 0, 1]) == 1.0
assert dcst.ecdf_formal(0, [3, 2, 0, 1]) == 0.25
with pytest.raises(RuntimeError) as excinfo:
dcst.ecdf_formal([np.nan, np.inf], [0, 1, 2, 3])
excinfo.match('Input cannot have NaNs.')
correct = np.array([1.0, 1.0])
result = dcst.ecdf_formal([3.1, np.inf], [3, 2, 0, 1])
assert np.allclose(correct, result, atol=atol)
def test_draw_bs_pairs_linreg_nan():
x = np.array([])
y = np.array([])
with pytest.raises(RuntimeError) as excinfo:
dcst.draw_bs_pairs_linreg(x, y, size=1)
excinfo.match('Arrays must have at least 2 mutual non-NaN entries.')
x = np.array([np.nan])
y = np.array([np.nan])
with pytest.raises(RuntimeError) as excinfo:
dcst.draw_bs_pairs_linreg(x, y, size=1)
excinfo.match('Arrays must have at least 2 mutual non-NaN entries.')
x = np.array([np.nan, 1])
y = np.array([1, np.nan])
with pytest.raises(RuntimeError) as excinfo:
dcst.draw_bs_pairs_linreg(x, y, size=1)
excinfo.match('Arrays must have at least 2 mutual non-NaN entries.')
x = np.array([0, 1, 5])
y = np.array([1, np.inf, 3])
with pytest.raises(RuntimeError) as excinfo:
dcst.draw_bs_pairs_linreg(x, y, size=1)
excinfo.match('All entries in arrays must be finite.')
def test_pearson_r_edge():
x = np.array([])
y = np.array([])
with pytest.raises(RuntimeError) as excinfo:
dcst.pearson_r(x, y)
excinfo.match('Arrays must have at least 2 mutual non-NaN entries.')
x = np.array([np.nan])
y = np.array([np.nan])
with pytest.raises(RuntimeError) as excinfo:
dcst.pearson_r(x, y)
excinfo.match('Arrays must have at least 2 mutual non-NaN entries.')
x = np.array([np.nan, 1])
y = np.array([1, np.nan])
with pytest.raises(RuntimeError) as excinfo:
dcst.pearson_r(x, y)
excinfo.match('Arrays must have at least 2 mutual non-NaN entries.')
x = np.array([0, 1, 5])
y = np.array([1, np.inf, 3])
with pytest.raises(RuntimeError) as excinfo:
dcst.pearson_r(x, y)
excinfo.match('All entries in arrays must be finite.')
def studentized_diff_of_means(data_1, data_2):
"""
Studentized difference in means of two arrays.
Parameters
----------
data_1 : array_like
One-dimensional array of data.
data_2 : array_like
One-dimensional array of data.
Returns
-------
output : float
Studentized difference of means.
Notes
-----
.. If the variance of both `data_1` and `data_2` is zero, returns
np.nan.
"""
data_1 = _convert_data(data_1)
data_2 = _convert_data(data_2)
return _studentized_diff_of_means(data_1, data_2)
def outlier_from_local_median(piv, treshold=2.0):
"""Outlier detection algorithm for mask creation.
The calculated residual is compared to a threshold which produces a mask.
The mask consists of nan values at the outlier positions.
This mask can be interpolated to remove the outliers.
:param object piv: Piv Class Object
:param double threshold: threshold for identifying outliers
"""
u_res = get_normalized_residual(piv.u)
v_res = get_normalized_residual(piv.v)
res_total = np.sqrt(u_res**2 + v_res**2)
mask = res_total > treshold
piv.u[mask] = np.nan
piv.v[mask] = np.nan
def test_timeseries_bootstrap():
"""
Tests the timeseries_bootstrap method of BASC workflow
"""
np.random.seed(27)
#np.set_printoptions(threshold=np.nan)
# Create a 10x5 matrix which counts up by column-wise
x = np.arange(50).reshape((5,10)).T
actual= timeseries_bootstrap(x,3)
desired = np.array([[ 4, 14, 24, 34, 44],
[ 5, 15, 25, 35, 45],
[ 6, 16, 26, 36, 46],
[ 8, 18, 28, 38, 48],
[ 9, 19, 29, 39, 49],
[ 0, 10, 20, 30, 40],
[ 7, 17, 27, 37, 47],
[ 8, 18, 28, 38, 48],
[ 9, 19, 29, 39, 49],
[ 8, 18, 28, 38, 48]])
np.testing.assert_equal(actual, desired)
def sphankel2(n, kr):
"""Spherical Hankel (second kind) of order n at kr
Parameters
----------
n : array_like
Order
kr: array_like
Argument
Returns
-------
hn2 : complex float
Spherical Hankel function hn (second kind)
"""
n, kr = scalar_broadcast_match(n, kr)
hn2 = _np.full(n.shape, _np.nan, dtype=_np.complex_)
kr_nonzero = kr != 0
hn2[kr_nonzero] = _np.sqrt(_np.pi / 2) / _np.lib.scimath.sqrt(kr[kr_nonzero]) * hankel2(n[kr_nonzero] + 0.5, kr[kr_nonzero])
return hn2
def dsphankel1(n, kr):
"""Derivative spherical Hankel (first kind) of order n at kr
Parameters
----------
n : array_like
Order
kr: array_like
Argument
Returns
-------
dhn1 : complex float
Derivative of spherical Hankel function hn' (second kind)
"""
n, kr = scalar_broadcast_match(n, kr)
dhn1 = _np.full(n.shape, _np.nan, dtype=_np.complex_)
kr_nonzero = kr != 0
dhn1[kr_nonzero] = 0.5 * (sphankel1(n[kr_nonzero] - 1, kr[kr_nonzero]) - sphankel1(n[kr_nonzero] + 1, kr[kr_nonzero]) - sphankel1(n[kr_nonzero], kr[kr_nonzero]) / kr[kr_nonzero])
return dhn1
def dsphankel2(n, kr):
"""Derivative spherical Hankel (second kind) of order n at kr
Parameters
----------
n : array_like
Order
kr: array_like
Argument
Returns
-------
dhn2 : complex float
Derivative of spherical Hankel function hn' (second kind)
"""
n, kr = scalar_broadcast_match(n, kr)
dhn2 = _np.full(n.shape, _np.nan, dtype=_np.complex_)
kr_nonzero = kr != 0
dhn2[kr_nonzero] = 0.5 * (sphankel2(n[kr_nonzero] - 1, kr[kr_nonzero]) - sphankel2(n[kr_nonzero] + 1, kr[kr_nonzero]) - sphankel2(n[kr_nonzero], kr[kr_nonzero]) / kr[kr_nonzero])
return dhn2
def test_sumup(nr_sites, local_dim, rank, rgen, dtype):
mpas = [factory.random_mpa(nr_sites, local_dim, 3, dtype=dtype, randstate=rgen)
for _ in range(rank if rank is not np.nan else 1)]
sum_naive = ft.reduce(mp.MPArray.__add__, mpas)
sum_mp = mp.sumup(mpas)
assert_array_almost_equal(sum_naive.to_array(), sum_mp.to_array())
assert all(r <= 3 * rank for r in sum_mp.ranks)
assert(sum_mp.dtype is dtype)
weights = rgen.randn(len(mpas))
summands = [w * mpa for w, mpa in zip(weights, mpas)]
sum_naive = ft.reduce(mp.MPArray.__add__, summands)
sum_mp = mp.sumup(mpas, weights=weights)
assert_array_almost_equal(sum_naive.to_array(), sum_mp.to_array())
assert all(r <= 3 * rank for r in sum_mp.ranks)
assert(sum_mp.dtype is dtype)
def generateTickStep(dps):
coeff = [1., 2., 5.]
coeffIdx = 0
mult = 1.
step = coeff[coeffIdx] * mult
#Replaces 0 by NaN to ignore 0 as min
dps_new = dps
dps_new[dps_new == 0] = np.nan
dpsRange = max(dps) - min(dps_new)
while dpsRange / step >= 8:
coeffIdx = (coeffIdx + 1) % 3
if coeffIdx == 0:
mult = mult * 10.
step = coeff[coeffIdx] * mult
return step
def write_fits(self, outfile, oldheader=None, clobber=False):
if os.path.exists(outfile) and (not clobber):
raise OSError("Sky FITS already exists: %s" % outfile)
if oldheader is not None:
header = oldheader
header.extend(self.fits_header, update=True)
else:
header = self.fits_header
header.add_history(datetime.now().isoformat())
header.add_history(" ".join(sys.argv))
image = self.image
image[~self.mask] = np.nan
image *= self.factor_K2JyPixel
hdu = fits.PrimaryHDU(data=image, header=header)
try:
hdu.writeto(outfile, overwrite=True)
except TypeError:
hdu.writeto(outfile, clobber=True) # old astropy versions
logger.info("Wrote FITS image of sky model to file: %s" % outfile)
def make_data_frame(words, years, feature_dict):
"""
Makes a pandas dataframe for word, years, and dictionary of feature funcs.
Each feature func should take (word, year) and return feature value.
Constructed dataframe has flat csv style structure and missing values are removed.
"""
temp = collections.defaultdict(list)
feature_dict["word"] = lambda word, year : word
feature_dict["year"] = lambda word, year : year
for word in words:
for year in years:
for feature, feature_func in feature_dict.iteritems():
temp[feature].append(feature_func(word, year))
df = pd.DataFrame(temp)
df = df.replace([np.inf, -np.inf], np.nan)
df = df.dropna()
return df
def test_alpha_rarefaction_with_empty_column_in_metadata(self):
t = biom.Table(np.array([[100, 111, 113], [111, 111, 112]]),
['O1', 'O2'],
['S1', 'S2', 'S3'])
md = qiime2.Metadata(
pd.DataFrame({'pet': ['russ', 'milo', 'peanut', 'summer'],
'foo': [np.nan, np.nan, np.nan, 'bar']},
index=['S1', 'S2', 'S3', 'S4']))
with tempfile.TemporaryDirectory() as output_dir:
alpha_rarefaction(output_dir, t, max_depth=200, metadata=md)
index_fp = os.path.join(output_dir, 'index.html')
self.assertTrue(os.path.exists(index_fp))
with open(index_fp, 'r') as fh:
contents = fh.read()
self.assertTrue('observed_otus' in contents)
self.assertTrue('shannon' in contents)
self.assertTrue('did not contain any values:' in contents)
metric_fp = os.path.join(output_dir, 'shannon-pet.jsonp')
self.assertTrue('summer' not in open(metric_fp).read())
self.assertFalse(
os.path.exists(os.path.join(output_dir, 'shannon-foo.jsonp')))
def htmt(self):
htmt_ = pd.DataFrame(pd.DataFrame.corr(self.data_),
index=self.manifests, columns=self.manifests)
mean = []
allBlocks = []
for i in range(self.lenlatent):
block_ = self.Variables['measurement'][
self.Variables['latent'] == self.latent[i]]
allBlocks.append(list(block_.values))
block = htmt_.ix[block_, block_]
mean_ = (block - np.diag(np.diag(block))).values
mean_[mean_ == 0] = np.nan
mean.append(np.nanmean(mean_))
comb = [[k, j] for k in range(self.lenlatent)
for j in range(self.lenlatent)]
comb_ = [(np.sqrt(mean[comb[i][1]] * mean[comb[i][0]]))
for i in range(self.lenlatent ** 2)]
comb__ = []
for i in range(self.lenlatent ** 2):
block = (htmt_.ix[allBlocks[comb[i][1]],
allBlocks[comb[i][0]]]).values
# block[block == 1] = np.nan
comb__.append(np.nanmean(block))
htmt__ = np.divide(comb__, comb_)
where_are_NaNs = np.isnan(htmt__)
htmt__[where_are_NaNs] = 0
htmt = pd.DataFrame(np.tril(htmt__.reshape(
(self.lenlatent, self.lenlatent)), k=-1), index=self.latent, columns=self.latent)
return htmt
def as_float_array(X, copy=True, force_all_finite=True):
"""Converts an array-like to an array of floats
The new dtype will be np.float32 or np.float64, depending on the original
type. The function can create a copy or modify the argument depending
on the argument copy.
Parameters
----------
X : {array-like, sparse matrix}
copy : bool, optional
If True, a copy of X will be created. If False, a copy may still be
returned if X's dtype is not a floating point type.
force_all_finite : boolean (default=True)
Whether to raise an error on np.inf and np.nan in X.
Returns
-------
XT : {array, sparse matrix}
An array of type np.float
"""
if isinstance(X, np.matrix) or (not isinstance(X, np.ndarray)
and not sp.issparse(X)):
return check_array(X, ['csr', 'csc', 'coo'], dtype=np.float64,
copy=copy, force_all_finite=force_all_finite,
ensure_2d=False)
elif sp.issparse(X) and X.dtype in [np.float32, np.float64]:
return X.copy() if copy else X
elif X.dtype in [np.float32, np.float64]: # is numpy array
return X.copy('F' if X.flags['F_CONTIGUOUS'] else 'C') if copy else X
else:
return X.astype(np.float32 if X.dtype == np.int32 else np.float64)
def explained_variance_1d(ypred,y):
"""
Var[ypred - y] / var[y].
https://www.quora.com/What-is-the-meaning-proportion-of-variance-explained-in-linear-regression
"""
assert y.ndim == 1 and ypred.ndim == 1
vary = np.var(y)
return np.nan if vary==0 else 1 - np.var(y-ypred)/vary
def test_ignore_nan(self):
""" Test that NaNs are handled correctly """
stream = [np.random.random(size = (16,12)) for _ in range(5)]
for s in stream:
s[randint(0, 15), randint(0,11)] = np.nan
with catch_warnings():
simplefilter('ignore')
from_iaverage = last(iaverage(stream, ignore_nan = True))
from_numpy = np.nanmean(np.dstack(stream), axis = 2)
self.assertTrue(np.allclose(from_iaverage, from_numpy))
def test_against_numpy_nanmean(self):
""" Test results against numpy.mean"""
source = [np.random.random((16, 12, 5)) for _ in range(10)]
for arr in source:
arr[randint(0, 15), randint(0, 11), randint(0, 4)] = np.nan
stack = np.stack(source, axis = -1)
for axis in (0, 1, 2, None):
with self.subTest('axis = {}'.format(axis)):
from_numpy = np.nanmean(stack, axis = axis)
out = last(imean(source, axis = axis, ignore_nan = True))
self.assertSequenceEqual(from_numpy.shape, out.shape)
self.assertTrue(np.allclose(out, from_numpy))
def test_against_scipy_with_nans(self):
""" Test that isem outputs the same as scipy.stats.sem when NaNs are ignored. """
source = [np.random.random((16, 12, 5)) for _ in range(10)]
for arr in source:
arr[randint(0, 15), randint(0, 11), randint(0, 4)] = np.nan
stack = np.stack(source, axis = -1)
for axis in (0, 1, 2, None):
for ddof in range(4):
with self.subTest('axis = {}, ddof = {}'.format(axis, ddof)):
from_scipy = scipy_sem(stack, axis = axis, ddof = ddof, nan_policy = 'omit')
from_isem = last(isem(source, axis = axis, ddof = ddof, ignore_nan = True))
self.assertSequenceEqual(from_scipy.shape, from_isem.shape)
self.assertTrue(np.allclose(from_isem, from_scipy))
def test_ignore_nans(self):
""" Test a sum of zeros with NaNs sprinkled """
source = [np.zeros((16,), dtype = np.float) for _ in range(10)]
source.append(np.full((16,), fill_value = np.nan))
summed = csum(source, ignore_nan = True)
self.assertTrue(np.allclose(summed, np.zeros_like(summed)))
def setUp(self):
self.source = [np.random.random((16,5,8)) for _ in range(10)]
self.source[0][0,0,0] = np.nan
self.stack = np.stack(self.source, axis = -1)
def test_ignore_nans(self):
""" Test a sum of zeros with NaNs sprinkled """
source = [np.zeros((16,), dtype = np.float) for _ in range(10)]
source.append(np.full((16,), fill_value = np.nan))
summed = last(isum(source, ignore_nan = True))
self.assertTrue(np.allclose(summed, np.zeros_like(summed)))
def test_ignore_nans(self):
""" Test that NaNs are ignored. """
source = [np.ones((16,), dtype = np.float) for _ in range(10)]
source.append(np.full_like(source[0], np.nan))
product = last(iprod(source, ignore_nan = True))
self.assertTrue(np.allclose(product, np.ones_like(product)))
def frame_to_series(self, field, frame, columns=None):
"""
Convert a frame with a DatetimeIndex and sid columns into a series with
a sid index, using the aggregator defined by the given field.
"""
if isinstance(frame, pd.DataFrame):
columns = frame.columns
frame = frame.values
if not len(frame):
return pd.Series(
data=(0 if field == 'volume' else np.nan),
index=columns,
).values
if field in ['price', 'close']:
# shortcircuit for full last row
vals = frame[-1]
if np.all(~np.isnan(vals)):
return vals
return ffill(frame)[-1]
elif field == 'open':
return bfill(frame)[0]
elif field == 'volume':
return np.nansum(frame, axis=0)
elif field == 'high':
return np.nanmax(frame, axis=0)
elif field == 'low':
return np.nanmin(frame, axis=0)
else:
raise ValueError("Unknown field {}".format(field))
def __repr__(self):
statements = []
for metric in self.METRIC_NAMES:
value = getattr(self, metric)[-1]
if isinstance(value, list):
if len(value) == 0:
value = np.nan
else:
value = value[-1]
statements.append("{m}:{v}".format(m=metric, v=value))
return '\n'.join(statements)