Python numpy 模块,nanstd() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用numpy.nanstd()。
def test_against_numpy_nanstd(self):
source = [np.random.random((16, 12, 5)) for _ in range(10)]
for arr in source:
arr[randint(0, 15), randint(0, 11), randint(0, 4)] = np.nan
stack = np.stack(source, axis = -1)
for axis in (0, 1, 2, None):
for ddof in range(4):
with self.subTest('axis = {}, ddof = {}'.format(axis, ddof)):
from_numpy = np.nanstd(stack, axis = axis, ddof = ddof)
from_ivar = last(istd(source, axis = axis, ddof = ddof, ignore_nan = True))
self.assertSequenceEqual(from_numpy.shape, from_ivar.shape)
self.assertTrue(np.allclose(from_ivar, from_numpy))
def plot_heatmaps(data, mis, column_label, cont, topk=30, prefix=''):
cmap = sns.cubehelix_palette(as_cmap=True, light=.9)
m, nv = mis.shape
for j in range(m):
inds = np.argsort(- mis[j, :])[:topk]
if len(inds) >= 2:
plt.clf()
order = np.argsort(cont[:,j])
subdata = data[:, inds][order].T
subdata -= np.nanmean(subdata, axis=1, keepdims=True)
subdata /= np.nanstd(subdata, axis=1, keepdims=True)
columns = [column_label[i] for i in inds]
sns.heatmap(subdata, vmin=-3, vmax=3, cmap=cmap, yticklabels=columns, xticklabels=False, mask=np.isnan(subdata))
filename = '{}/heatmaps/group_num={}.png'.format(prefix, j)
if not os.path.exists(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))
plt.title("Latent factor {}".format(j))
plt.yticks(rotation=0)
plt.savefig(filename, bbox_inches='tight')
plt.close('all')
#plot_rels(data[:, inds], map(lambda q: column_label[q], inds), colors=cont[:, j],
# outfile=prefix + '/relationships/group_num=' + str(j), latent=labels[:, j], alpha=0.1)
def test_ddof_too_big(self):
nanfuncs = [np.nanvar, np.nanstd]
stdfuncs = [np.var, np.std]
dsize = [len(d) for d in _rdat]
for nf, rf in zip(nanfuncs, stdfuncs):
for ddof in range(5):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter('always')
tgt = [ddof >= d for d in dsize]
res = nf(_ndat, axis=1, ddof=ddof)
assert_equal(np.isnan(res), tgt)
if any(tgt):
assert_(len(w) == 1)
assert_(issubclass(w[0].category, RuntimeWarning))
else:
assert_(len(w) == 0)
def _calculate(self, X, y, categorical, metafeatures, helpers):
skews = helpers.get_value("Skewnesses")
std = np.nanstd(skews) if len(skews) > 0 else 0
return std if np.isfinite(std) else 0
# @metafeatures.define("cancor1")
# def cancor1(X, y):
# pass
# @metafeatures.define("cancor2")
# def cancor2(X, y):
# pass
################################################################################
# Information-theoretic metafeatures
def test_ddof_too_big(self):
nanfuncs = [np.nanvar, np.nanstd]
stdfuncs = [np.var, np.std]
dsize = [len(d) for d in _rdat]
for nf, rf in zip(nanfuncs, stdfuncs):
for ddof in range(5):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter('always')
tgt = [ddof >= d for d in dsize]
res = nf(_ndat, axis=1, ddof=ddof)
assert_equal(np.isnan(res), tgt)
if any(tgt):
assert_(len(w) == 1)
assert_(issubclass(w[0].category, RuntimeWarning))
else:
assert_(len(w) == 0)
def add_MACD(data, Ns=[12,26,9]):
'''
:param data: DataFrame containing stock price info in the second column
:param Ns: List of short term long term EMA to use and look-back window of MACD's EMA
:return:
'''
symbol = data.columns.values[1] # assuming stock price is in the second column in data
MACD = cal_EMA(data.ix[:,[symbol]],N=Ns[0]) - cal_EMA(data.ix[:,[symbol]],N=Ns[1])
data['MACD'] = MACD
signal = cal_EMA(data.MACD[Ns[1]:],N=Ns[2])
# # normalized them
# MACD = (MACD - np.nanmean(MACD))/(2*np.nanstd(MACD))
# signal = (signal - np.nanmean(signal))/(2*np.nanstd(signal))
data['MACD'] = MACD
data['Signal'] = 'NaN'
data.loc[Ns[1]:,'Signal'] = signal
return data
def add_MACD(data, Ns=None):
'''
:param data: DataFrame containing stock price info in the second column
:param Ns: List of short term long term EMA to use and look-back window of MACD's EMA
:return:
'''
if Ns is None:
Ns = [12, 26, 9]
symbol = data.columns.values[1] # assuming stock price is in the second column in data
MACD = cal_EMA(data.loc[:, symbol], N=Ns[0]) - cal_EMA(data.loc[:, symbol], N=Ns[1])
data['MACD'] = MACD
signal = cal_EMA(data.MACD[Ns[1]:], N=Ns[2])
# # normalized them
# MACD = (MACD - np.nanmean(MACD))/(2*np.nanstd(MACD))
# signal = (signal - np.nanmean(signal))/(2*np.nanstd(signal))
# data['MACD'] = MACD
data['Signal'] = 'NaN'
data.loc[Ns[1]:, 'Signal'] = signal
return data
def addNoise(img, snr=25, rShot=0.5):
'''
adds Gaussian (thermal) and shot noise to [img]
[img] is assumed to be noise free
[rShot] - shot noise ratio relative to all noise
'''
s0, s1 = img.shape[:2]
m = img.mean()
if np.isnan(m):
m = np.nanmean(img)
assert m != 0, 'image mean cannot be zero'
img = img / m
noise = np.random.normal(size=s0 * s1).reshape(s0, s1)
if rShot > 0:
noise *= (rShot * img**0.5 + 1)
noise /= np.nanstd(noise)
noise[np.isnan(noise)] = 0
return m * (img + noise / snr)
def histo_plot(figure,X,color,xlabel="",ylabel="",cumul=False,bar=True,n_points=400,smooth_factor=0.1,spline_order=3,linewidth=3,alpha=1.0,label=""):
if '%' in xlabel:
magnitude = 100
X_values = np.array(np.minimum(np.around(X),n_points+1),int)
else:
# magnitude = np.power(10,np.around(4*np.log10(X.mean()))/4+0.5)
magnitude = np.power(10,np.around(4*np.log10(np.nanmean(X)+np.nanstd(X)+1e-7))/4+1)
magnitude = np.around(magnitude,int(-np.log10(magnitude))+1)
# print magnitude
#magnitude = X.mean()+5.0*X.std()
X_values = np.array(np.minimum(np.around(n_points*X[True-np.isnan(X)]/magnitude),n_points+1),int)
X_histo = np.zeros(n_points+1,float)
for x in np.linspace(0,n_points,n_points+1):
X_histo[x] = nd.sum(np.ones_like(X_values,float),X_values,index=x)
if '%' in ylabel:
X_histo[x] /= X_values.size/100.0
if cumul:
X_histo[x] += X_histo[x-1]
if bar:
bar_plot(figure,np.linspace(0,magnitude,n_points+1),X_histo,np.array([1,1,1]),color,xlabel,ylabel,label=label)
else:
smooth_plot(figure,np.linspace(0,magnitude,n_points+1),X_histo,color,color,xlabel,ylabel,n_points=n_points,smooth_factor=smooth_factor,spline_order=spline_order,linewidth=linewidth,alpha=alpha,label=label)
def test_ddof_too_big(self):
nanfuncs = [np.nanvar, np.nanstd]
stdfuncs = [np.var, np.std]
dsize = [len(d) for d in _rdat]
for nf, rf in zip(nanfuncs, stdfuncs):
for ddof in range(5):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter('always')
tgt = [ddof >= d for d in dsize]
res = nf(_ndat, axis=1, ddof=ddof)
assert_equal(np.isnan(res), tgt)
if any(tgt):
assert_(len(w) == 1)
assert_(issubclass(w[0].category, RuntimeWarning))
else:
assert_(len(w) == 0)
def process(self, obj_data):
'''
Removes table data with large snow errors
@param obj_data: Input DataWrapper, will be modified in place
'''
bad_stations = []
sigma_multiplier = self.ap_paramList[0]()
for label, data in obj_data.getIterator():
if len(data[data[self.snow_column]==4]) > 0 and len(data[data[self.snow_column]==2]) > 0:
snow = data[data[self.snow_column]==4].loc[:,self.column_name]
no_snow = data[data[self.snow_column]==2].loc[:,self.column_name]
non_snow_std = np.nanstd(no_snow)
snow_std = np.nanstd(snow)
if snow_std > sigma_multiplier * non_snow_std:
bad_stations.append(label)
if len(bad_stations) > 0:
obj_data.removeFrames(bad_stations)
def test_ddof_too_big(self):
nanfuncs = [np.nanvar, np.nanstd]
stdfuncs = [np.var, np.std]
dsize = [len(d) for d in _rdat]
for nf, rf in zip(nanfuncs, stdfuncs):
for ddof in range(5):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter('always')
tgt = [ddof >= d for d in dsize]
res = nf(_ndat, axis=1, ddof=ddof)
assert_equal(np.isnan(res), tgt)
if any(tgt):
assert_(len(w) == 1)
assert_(issubclass(w[0].category, RuntimeWarning))
else:
assert_(len(w) == 0)
def X(self):
# Scale the data across features
X = stats.zscore(np.asarray(self.betas))
# Remove zero-variance features
X = X[:, self.good_voxels]
assert not np.isnan(X).any()
# Regress out behavioral confounds
rt = np.asarray(self.rt)
m, s = np.nanmean(rt), np.nanstd(rt)
rt = np.nan_to_num((rt - m) / s)
X = OLS(X, rt).fit().resid
return X
def split_and_zscore(self, data, test_run):
# Enforse type and size of the data
data = np.asarray(data)
if data.ndim == 1:
data = np.expand_dims(data, 1)
# Identify training and test samples
train = np.asarray(self.runs != test_run)
test = np.asarray(self.runs == test_run)
train_data = data[train]
test_data = data[test]
# Compute the mean and standard deviation of the training set
m, s = np.nanmean(train_data), np.nanstd(train_data)
# Scale the training and test set
train_data = (train_data - m) / s
test_data = (test_data - m) / s
return train_data, test_data
def test_dtype_from_dtype(self):
mat = np.eye(3)
codes = 'efdgFDG'
for nf, rf in zip(self.nanfuncs, self.stdfuncs):
for c in codes:
with suppress_warnings() as sup:
if nf in {np.nanstd, np.nanvar} and c in 'FDG':
# Giving the warning is a small bug, see gh-8000
sup.filter(np.ComplexWarning)
tgt = rf(mat, dtype=np.dtype(c), axis=1).dtype.type
res = nf(mat, dtype=np.dtype(c), axis=1).dtype.type
assert_(res is tgt)
# scalar case
tgt = rf(mat, dtype=np.dtype(c), axis=None).dtype.type
res = nf(mat, dtype=np.dtype(c), axis=None).dtype.type
assert_(res is tgt)
def test_dtype_from_char(self):
mat = np.eye(3)
codes = 'efdgFDG'
for nf, rf in zip(self.nanfuncs, self.stdfuncs):
for c in codes:
with suppress_warnings() as sup:
if nf in {np.nanstd, np.nanvar} and c in 'FDG':
# Giving the warning is a small bug, see gh-8000
sup.filter(np.ComplexWarning)
tgt = rf(mat, dtype=c, axis=1).dtype.type
res = nf(mat, dtype=c, axis=1).dtype.type
assert_(res is tgt)
# scalar case
tgt = rf(mat, dtype=c, axis=None).dtype.type
res = nf(mat, dtype=c, axis=None).dtype.type
assert_(res is tgt)
def test_ddof_too_big(self):
nanfuncs = [np.nanvar, np.nanstd]
stdfuncs = [np.var, np.std]
dsize = [len(d) for d in _rdat]
for nf, rf in zip(nanfuncs, stdfuncs):
for ddof in range(5):
with suppress_warnings() as sup:
sup.record(RuntimeWarning)
sup.filter(np.ComplexWarning)
tgt = [ddof >= d for d in dsize]
res = nf(_ndat, axis=1, ddof=ddof)
assert_equal(np.isnan(res), tgt)
if any(tgt):
assert_(len(sup.log) == 1)
else:
assert_(len(sup.log) == 0)
def zscore_cooccur(prob_observed, prob_shuffle):
"""Compare co-occurrence observed probabilities with shuffle
Parameters
----------
prob_observed : np.array
prob_shuffle : np.array
Returns
-------
prob_zscore : np.array
"""
num_neurons = prob_observed.shape[0]
prob_zscore = np.zeros((num_neurons, num_neurons))
for i in range(num_neurons):
for j in range(num_neurons):
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
prob_zscore[i][j] = (prob_observed[i][j] -
np.nanmean(np.squeeze(prob_shuffle[:, i, j]))) / np.nanstd(np.squeeze(prob_shuffle[:, i, j]))
return prob_zscore
def init_distrib_idx(self, distrib, idx=None):
assert isinstance(distrib, DistribGauss)
x = distrib.get_mu()
if idx is None:
# initialize prior and thus average over all cases
mu = np.nanmean(x, axis=0, keepdims=True)
else:
# select cases idx
mu = x[idx, :]
idx_nan = np.isnan(mu)
if np.any(idx_nan):
# we need to randomly select new values for all NaNs
idx_good = np.ones_like(idx, dtype=bool)
idx_good[idx, :] = False
idx_good[np.isnan(x)] = False
x_good = x[idx_good, :]
num_nan = np.count_nonzero(idx_nan)
mu[idx_nan] = np.random.choice(x_good, num_nan, replace=False)
mu = np.copy(mu) # make sure to not overwrite data
std = np.empty_like(mu)
std.fill(np.asscalar(np.nanstd(x)))
self.init_data(mu, std)
def normalize_mean_std(X, x_mean=None, x_std=None, axis=0, ignore_nan=False):
if ignore_nan:
mean = np.nanmean
std = np.nanstd
else:
mean = np.mean
std = np.std
if x_mean is None:
x_mean = mean(X, axis=axis, keepdims=True)
if x_std is None:
x_std = std(X, axis=axis, keepdims=True)
x_std[~np.isfinite(1 / x_std)] = 1
if np.any(~np.isfinite(x_mean)):
warnings.warn("x_mean contains NaN or Inf values!")
if np.any(~np.isfinite(x_std)):
warnings.warn("x_std contains NaN or Inf values!")
X = X - x_mean
X = X / x_std
return X, x_mean, x_std
def average_with_nan(detector_list, error_estimate=ErrorEstimate.stderr):
"""
Calculate average detector object, excluding malformed data (NaN) from averaging.
:param detector_list:
:param error_estimate:
:return:
"""
# TODO add compatibility check
result = Detector()
result.counter = len(detector_list)
result.data_raw = np.nanmean([det.data_raw for det in detector_list], axis=0)
if result.counter > 1 and error_estimate != ErrorEstimate.none:
# s = stddev = sqrt(1/(n-1)sum(x-<x>)**2)
# s : corrected sample standard deviation
result.error_raw = np.nanstd([det.data_raw for det in detector_list], axis=0, ddof=1)
# if user requested standard error then we calculate it as:
# S = stderr = stddev / sqrt(n), or in other words,
# S = s/sqrt(N) where S is the corrected standard deviation of the mean.
if error_estimate == ErrorEstimate.stderr:
result.error_raw /= np.sqrt(result.counter) # np.sqrt() always returns np.float64
else:
result.error_raw = np.zeros_like(result.data_raw)
return result
def test_ddof_too_big(self):
nanfuncs = [np.nanvar, np.nanstd]
stdfuncs = [np.var, np.std]
dsize = [len(d) for d in _rdat]
for nf, rf in zip(nanfuncs, stdfuncs):
for ddof in range(5):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter('always')
tgt = [ddof >= d for d in dsize]
res = nf(_ndat, axis=1, ddof=ddof)
assert_equal(np.isnan(res), tgt)
if any(tgt):
assert_(len(w) == 1)
assert_(issubclass(w[0].category, RuntimeWarning))
else:
assert_(len(w) == 0)
def getStdError(self):
"""
get standard deviation of error over all joints, averaged over sequence
:return: standard deviation of error
"""
return numpy.nanmean(numpy.nanstd(numpy.sqrt(numpy.square(self.gt - self.joints).sum(axis=2)), axis=1))
def getJointStdError(self, jointID):
"""
get standard deviation of one joint, averaged over sequence
:param jointID: joint ID
:return: standard deviation of joint error
"""
return numpy.nanstd(numpy.sqrt(numpy.square(self.gt[:, jointID, :] - self.joints[:, jointID, :]).sum(axis=1)))
def test_nanstd(self):
tgt = np.std(self.mat)
for mat in self.integer_arrays():
assert_equal(np.nanstd(mat), tgt)
tgt = np.std(self.mat, ddof=1)
for mat in self.integer_arrays():
assert_equal(np.nanstd(mat, ddof=1), tgt)
def test_ddof(self):
nanfuncs = [np.nanvar, np.nanstd]
stdfuncs = [np.var, np.std]
for nf, rf in zip(nanfuncs, stdfuncs):
for ddof in [0, 1]:
tgt = [rf(d, ddof=ddof) for d in _rdat]
res = nf(_ndat, axis=1, ddof=ddof)
assert_almost_equal(res, tgt)
def mean_std(data):
# TODO: assert is a np.array
mean = np.nanmean(data, axis=0)
std = np.nanstd(data, axis=0)
return [mean, std]
def compute(self, today, asset_ids, out, values):
# *inputs are M x N numpy arrays, where M is the window_length and N is the number of securities
# out is an empty array of length N. out will be the output of our custom factor each day. The job of compute is to write output values into out.
# asset_ids will be an integer array of length N containing security ids corresponding to the columns in our *inputs arrays.
# today will be a pandas Timestamp representing the day for which compute is being called.
# Calculates the column-wise standard deviation, ignoring NaNs
out[:] = numpy.nanstd(values, axis=0)
# instantiate custom factor in make_pipeline()
def reject_outliers(data, m=3):
data[abs(data - np.nanmean(data, axis=0)) > m * np.nanstd(data, axis=0)] = np.nan
return data
def _get_stderr_lb_varyinglens(x):
mus, stds, ns = [], [], []
for temp_list in zip_longest(*x, fillvalue=np.nan):
mus.append(np.nanmean(temp_list))
n = len(temp_list) - np.sum(np.isnan(temp_list))
stds.append(np.nanstd(temp_list, ddof=1 if n > 1 else 0))
ns.append(n)
return np.array(mus) - np.array(stds) / np.sqrt(ns)
def _calculate(self, X, y, categorical, metafeatures, helpers):
values = [val for val in helpers.get_value("NumSymbols") if val > 0]
if len(values) == 0:
return 0
# FIXME Error handle
std = np.nanstd(values)
return std if np.isfinite(std) else 0
def _calculate(self, X, y, categorical, metafeatures, helpers):
kurts = helpers.get_value("Kurtosisses")
std = np.nanstd(kurts) if len(kurts) > 0 else 0
return std if np.isfinite(std) else 0
def normalize(array, window=11, normalize_variance=True):
"""
Arguments:
array (np.array): 2D array (time, member)
window (int): Window length to compute climatology
normalize_variance (bool): Adjust variance
Returns:
np.array: Array of normalized values (same size as input array)
"""
N = array.shape[1]
"""
Remove climatology so we can look at annomalies. Use separate obs and fcst climatology
otherwise the fcst variance is higher because obs gets the advantage of using its own
climatology.
"""
clim = climatology(array, window, use_future_years=True)
values = copy.deepcopy(array)
for i in range(0, N):
values[:, i] = (values[:, i] - clim)
if normalize_variance and array.shape[1] > 2:
"""
This removes any seasonally varying variance, which can cause the 1-year variance to be
larger than the 1/2 year variance, because the 1/2 year variance samples the summer months
more often than the winter months, because of the windowing approach. Also, this
normalization does not guarantee that the std of the whole timeseries is 1, therefore in
the plot, don't expect the first point to be 1.
The timeseries is scaled up again to match the average anomaly variance in the timeseries.
"""
std = np.nanstd(array, axis=1)
if np.min(std) == 0:
warning("Standard deviation of 0 at one or more days. Not normalizing variance")
else:
meanstd = np.nanmean(std)
for i in range(0, N):
values[:, i] = values[:, i] / std * meanstd
return values
def getStdError(self):
"""
get standard deviation of error over all joints, averaged over sequence
:return: standard deviation of error
"""
return numpy.nanmean(numpy.nanstd(numpy.sqrt(numpy.square(self.gt - self.joints).sum(axis=2)), axis=1))
def getJointStdError(self, jointID):
"""
get standard deviation of one joint, averaged over sequence
:param jointID: joint ID
:return: standard deviation of joint error
"""
return numpy.nanstd(numpy.sqrt(numpy.square(self.gt[:, jointID, :] - self.joints[:, jointID, :]).sum(axis=1)))
def test_basic_stats(x):
s = SummaryStats()
s.update(x)
assert s.count() == np.count_nonzero(~np.isnan(x))
np.testing.assert_allclose(s.sum(), np.nansum(x), rtol=RTOL, atol=ATOL)
np.testing.assert_equal(s.min(), np.nanmin(x) if len(x) else np.nan)
np.testing.assert_equal(s.max(), np.nanmax(x) if len(x) else np.nan)
np.testing.assert_allclose(s.mean(), np.nanmean(x) if len(x) else np.nan,
rtol=RTOL, atol=ATOL)
np.testing.assert_allclose(s.var(), np.nanvar(x) if len(x) else np.nan,
rtol=RTOL, atol=ATOL)
np.testing.assert_allclose(s.std(), np.nanstd(x) if len(x) else np.nan,
rtol=RTOL, atol=ATOL)
def plot_heatmaps(data, labels, alpha, mis, column_label, cont, topk=20, prefix='', focus=''):
cmap = sns.cubehelix_palette(as_cmap=True, light=.9)
m, nv = mis.shape
for j in range(m):
inds = np.where(np.logical_and(alpha[j] > 0, mis[j] > 0.))[0]
inds = inds[np.argsort(- alpha[j, inds] * mis[j, inds])][:topk]
if focus in column_label:
ifocus = column_label.index(focus)
if not ifocus in inds:
inds = np.insert(inds, 0, ifocus)
if len(inds) >= 2:
plt.clf()
order = np.argsort(cont[:,j])
subdata = data[:, inds][order].T
subdata -= np.nanmean(subdata, axis=1, keepdims=True)
subdata /= np.nanstd(subdata, axis=1, keepdims=True)
columns = [column_label[i] for i in inds]
sns.heatmap(subdata, vmin=-3, vmax=3, cmap=cmap, yticklabels=columns, xticklabels=False, mask=np.isnan(subdata))
filename = '{}/heatmaps/group_num={}.png'.format(prefix, j)
if not os.path.exists(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))
plt.title("Latent factor {}".format(j))
plt.savefig(filename, bbox_inches='tight')
plt.close('all')
#plot_rels(data[:, inds], list(map(lambda q: column_label[q], inds)), colors=cont[:, j],
# outfile=prefix + '/relationships/group_num=' + str(j), latent=labels[:, j], alpha=0.1)
def first_order(feature, aggregates, verbose=False):
if not type(aggregates) is list:
aggregates = [aggregates]
for aggregate in aggregates:
if verbose:
print(' first order computation: ' + aggregate)
if aggregate == 'log':
feature = np.log(feature)
elif aggregate == 'sqrt':
feature = np.sqrt(feature)
elif aggregate == 'minlog':
feature = np.log(1 - feature)
elif aggregate == 'minsqrt':
feature = np.sqrt(1 - feature)
elif aggregate == 'mean':
# feature = np.mean(feature, axis=0)
feature = np.nanmean(feature, axis=0)
elif aggregate == 'var':
feature = np.var(feature, axis=0)
elif aggregate == 'std':
# feature = np.std(feature, axis=0)
feature = np.nanstd(feature, axis=0)
elif aggregate == 'stdmean':
feature = np.hstack([np.mean(feature, axis=0), np.std(feature, axis=0)])
elif aggregate == 'cov':
feature = np.flatten(np.cov(feature, axis=0))
elif aggregate == 'totvar':
feature = np.array([np.mean(np.var(feature, axis=0))])
elif aggregate == 'totstd':
feature = np.array([np.mean(np.std(feature, axis=0))])
elif aggregate == 'entropy':
feature = feature.flatten()
feature = np.array([stats.entropy(feature)])
elif aggregate == 'normentropy':
feature = feature.flatten()
feature = np.array([stats.entropy(feature) / np.log(feature.size)])
elif aggregate == 'information':
feature = - np.log(feature)
return feature
def test_nanstd(self):
tgt = np.std(self.mat)
for mat in self.integer_arrays():
assert_equal(np.nanstd(mat), tgt)
tgt = np.std(self.mat, ddof=1)
for mat in self.integer_arrays():
assert_equal(np.nanstd(mat, ddof=1), tgt)
def test_ddof(self):
nanfuncs = [np.nanvar, np.nanstd]
stdfuncs = [np.var, np.std]
for nf, rf in zip(nanfuncs, stdfuncs):
for ddof in [0, 1]:
tgt = [rf(d, ddof=ddof) for d in _rdat]
res = nf(_ndat, axis=1, ddof=ddof)
assert_almost_equal(res, tgt)
def __get_pd_mean(data, c=1.):
"""Get the mean and the standard deviation of data
Args:
data (numpy.ndarray): the data
Returns:
float, float: the mean and the standard deviation
"""
p = np.nanmean(data)
d = np.nanstd(data) / c
return p, d
def calculate_residual_distributions(self):
"""
This function ...
:return:
"""
# Inform the user
log.info("Calculating distributions of residual pixel values ...")
# Loop over the different colours
for colour_name in self.observed_colours:
# Debugging
log.debug("Calculating the distribution for the pixels of the " + colour_name + " residual map ...")
# Get an 1D array of the valid pixel values
pixel_values = None
# Create the distribution
distribution = Distribution.from_values(pixel_values)
# Debugging
#log.debug("Median " + colour_name + " residual: " + str(np.nanmedian(np.abs(residual))))
#log.debug("Standard deviation of " + colour_name + " residual: " + str(np.nanstd(residual)))
# Add the distribution to the dictionary
self.residual_distributions[colour_name] = distribution
# -----------------------------------------------------------------
def calculate_residual_distributions(self):
"""
This function ...
:return:
"""
# Inform the user
log.info("Calculating distributions of residual pixel values ...")
# Loop over the different colours
for colour_name in self.observed_colours:
# Debugging
log.debug("Calculating the distribution for the pixels of the " + colour_name + " residual map ...")
# Get an 1D array of the valid pixel values
pixel_values = None
# Create the distribution
distribution = Distribution.from_values(pixel_values)
# Debugging
#log.debug("Median " + colour_name + " residual: " + str(np.nanmedian(np.abs(residual))))
#log.debug("Standard deviation of " + colour_name + " residual: " + str(np.nanstd(residual)))
# Add the distribution to the dictionary
self.residual_distributions[colour_name] = distribution
# -----------------------------------------------------------------
def scaleParamsFromReference(img, reference):
# saving startup time:
from scipy.optimize import curve_fit
def ff(arr):
arr = imread(arr, 'gray')
if arr.size > 300000:
arr = arr[::10, ::10]
m = np.nanmean(arr)
s = np.nanstd(arr)
r = m - 3 * s, m + 3 * s
b = (r[1] - r[0]) / 5
return arr, r, b
img, imgr, imgb = ff(img)
reference, refr, refb = ff(reference)
nbins = np.clip(15, max(imgb, refb), 50)
refh = np.histogram(reference, bins=nbins, range=refr)[
0].astype(np.float32)
imgh = np.histogram(img, bins=nbins, range=imgr)[0].astype(np.float32)
import pylab as plt
plt.figure(1)
plt.plot(refh)
plt.figure(2)
plt.plot(imgh)
plt.show()
def fn(x, offs, div):
return (x - offs) / div
params, fitCovariances = curve_fit(fn, refh, imgh, p0=(0, 1))
perr = np.sqrt(np.diag(fitCovariances))
print('error scaling to reference image: %s' % perr[0])
# if perr[0] < 0.1:
return params[0], params[1]
def _update(self, limits=None):
curves = self.display.widget.curves
x = self.display.stack.values
y = []
s = self.pStd.value()
yStd = []
for curve in curves:
if limits:
b1 = np.argmax(curve.xData >= limits[0])
b2 = np.argmax(curve.xData >= limits[1])
if b2 == 0:
b2 = -1
else:
b1, b2 = None, None
y.append(np.nanmean(curve.yData[b1:b2]))
if s:
yStd.append(np.nanstd(curve.yData[b1:b2]))
if self.out is None or self.out.isClosed():
self.out = self.display.workspace.addDisplay(
origin=self.display,
axes=self.display.axes.copy(('stack', 1)),
title='ROI',
names=['ROI'],
data=[(x, y)])
else:
self.out.widget.curves[0].setData(x, y)
if s:
if self.outStd is None or self.outStd.isClosed():
self.outStd = self.display.workspace.addDisplay(
origin=self.display,
axes=self.display.axes.copy(('stack', 1)),
title='ROI - std',
names=['ROI - std'],
data=[(x, yStd)])
else:
self.outStd.widget.curves[0].setData(x, yStd)
def get_mat_movingstd(tsmat, periods):
mstd = np.empty(shape = tsmat.shape)
mstd.fill(np.NAN)
for i in xrange(tsmat.shape[0]):
j = i - periods + 1
if j < 0:
j = 0
mstd[i,:] = np.nanstd(tsmat[j:i+1,:], 0)
return mstd
def calc_cumulative_dist(momlim=0.3,region_list=['L1688','NGC1333','B18','OrionA'],
file_extension='DR1_rebase3'):
projDir = '/media/DATAPART/projects/GAS/testing/'
min_bin = 20.5 # log 10 N(H2) - set by B18
max_bin = 26. # log 10 N(H2) - set by Orion A
bin_size = 0.05
nbins = np.int((max_bin - min_bin)/bin_size)
data_bin_vals = [min_bin + x * bin_size for x in range(nbins+1)]
# Loop over regions
for region_i in range(len(region_list)):
region = region_list[region_i]
nh3ImFits = projDir + '{0}/{0}_NH3_11_{1}_mom0_QA_trim.fits'.format(region,file_extension)
herColFits = 'nh2_regridded/{0}_NH2_regrid.fits'.format(region)
nh3_fits = fits.open(nh3ImFits)
nh2_regrid_fits = fits.open(herColFits)
h2_data = np.log10(nh2_regrid_fits[0].data)
nh3_data = nh3_fits[0].data
h2_mean_array = np.zeros(nbins)
h2_std_array = np.zeros(nbins)
nh3_frac_array = np.zeros(nbins)
for bin_i in range(nbins-1):
bin_h2_indices = np.where(np.logical_and(h2_data >= data_bin_vals[bin_i],
h2_data < data_bin_vals[bin_i+1]))
bin_h2_data = h2_data[bin_h2_indices]
bin_nh3_data = nh3_data[bin_h2_indices]
if np.count_nonzero(bin_nh3_data) != 0:
frac_above_mom = np.count_nonzero(bin_nh3_data > momlim)/(1.0*np.count_nonzero(bin_nh3_data))
else:
frac_above_mom = 0
bin_mean_h2 = np.nanmean(bin_h2_data)
bin_std_h2 = np.nanstd(bin_h2_data)
h2_mean_array[bin_i] = bin_mean_h2
nh3_frac_array[bin_i] = frac_above_mom
h2_std_array[bin_i] = bin_std_h2
# Write out bins
np.savetxt('cumulative/{0}_cumulative.txt'.format(region),
np.transpose([h2_mean_array,nh3_frac_array,h2_std_array]))
def corr(data):
ns = data.shape[0];
nt = data.shape[1];
mean = np.nanmean(data, axis = 0);
std = np.nanstd(data - mean, axis = 0);
c = np.zeros((nt, nt));
for t1 in range(nt):
#for t2 in range(nt):
#c[t1,t2] = np.nanmean( (data[:,t1] - mean[t1]) * (data[:,t2] - mean[t2]), axis = 0); # / (std[t1] * std[t2]);
c[t1,:] = np.nanmean( (data[:,:].T * data[:, t1]).T, axis = 0);
return c;
def downstddv(x):
if x.size < 4:
return np.nan
median = np.nanpercentile(x, 50)
return np.nanstd(x[x < median])