Python numpy 模块,trunc() 实例源码
我们从Python开源项目中,提取了以下10个代码示例,用于说明如何使用numpy.trunc()。
def epoch_to_epoch16(self, epoch):
"""
Converts a CDF EPOCH to a CDF EPOCH16 value
Parameters
==========
epoch : double
EPOCH to convert. Lists and numpy arrays are acceptable.
Returns
=======
out : (double, double)
EPOCH16 corresponding to epoch
"""
e = numpy.require(epoch, numpy.float64)
s = numpy.trunc(e / 1000.0)
#ugly numpy stuff, probably a better way....
res = numpy.hstack((s, (e - s * 1000.0) * 1e9))
if len(res) <= 2:
return res
newshape = list(res.shape[0:-2])
newshape.append(res.shape[-1] // 2)
newshape.append(2)
return numpy.rollaxis(res.reshape(newshape), -1, -2)
def encode(self, inp, lengths):
#input shape: minibatchsize x input_len
#output shape: minibatchsize x input_len x n_inputnodes
minibatchsize = inp.shape[0]
output = np.zeros((minibatchsize, inp.shape[1]+self.ticker_steps, self.n_inputnodes), dtype=float)
lengths -= (self.ticker_steps - 1)
for mb in np.arange(minibatchsize):
scaled_pos = inp[mb,:] / self.node_range
# equals output[np.arange(len(input)), np.trunc(scaled_pos)+1] except for the last timestep
output[mb, np.arange(inp.shape[1]), scaled_pos.astype(int)+(scaled_pos.astype(int)<self.data_nodes)] = np.abs(self.max_act * (scaled_pos-np.trunc(scaled_pos)))
output[mb, np.arange(inp.shape[1]), scaled_pos.astype(int)] = np.abs(self.max_act - output[mb, np.arange(inp.shape[1]), scaled_pos.astype(int)+(scaled_pos.astype(int)<self.data_nodes)])
output[mb, np.arange(inp.shape[1]), -self.exp-1:-1] = int_to_binary(inp[mb,:] % self.node_range, self.exp)
if self.ticker_steps > 0:
output[mb, lengths[mb]:, :] = 0
output[mb, lengths[mb]:lengths[mb]+self.ticker_steps, -1] = 1
return output
def check_out_data_set(self):
for set in ['train', 'valid', 'test']:
if self.prm.data[set + "_data_name"] != None:
file_name = self.prm.data["data_location"] + self.prm.data[set + "_data_name"]
try:
d = klepto.archives.file_archive(file_name, cached=True,serialized=True)
d.load()
data_set_x = d['x']
data_set_y = d['y']
d.clear()
self.prm.data[set + "_set_len"] = data_set_x.__len__()
if data_set_x.__len__() != data_set_y.__len__():
raise Warning("x and y " + set + "_data_name have not the same length")
self.prm.data["x_size"] = data_set_x[0].shape[1]
if self.prm.data["x_size"] != int(self.prm.struct["net_size"][0]):
raise Warning(set + " data x size and net input size are unequal")
if self.prm.optimize['CTC'] == False:
self.prm.data["y_size"] = data_set_y[0].shape[1]
if self.prm.data["y_size"] != int(self.prm.struct["net_size"][-1]):
raise Warning(set + " data y size and net input size are unequal")
else:
self.prm.data["y_size"] = self.prm.struct["net_size"][-1]
del data_set_x
del data_set_y
self.prm.data[set + "_batch_quantity"] = int(np.trunc(self.prm.data[set + "_set_len" ]/self.prm.data["batch_size"]))
self.prm.data["checked_data"][set] = True
except KeyError:
raise Warning("data_location or " + set + "_data_name wrong")
###### Create mini batches and storage them in klepto files
########################################
def test_numpy_method():
# This type of code is used frequently by PyMC3 users
x = tt.dmatrix('x')
data = np.random.rand(5, 5)
x.tag.test_value = data
for fct in [np.arccos, np.arccosh, np.arcsin, np.arcsinh,
np.arctan, np.arctanh, np.ceil, np.cos, np.cosh, np.deg2rad,
np.exp, np.exp2, np.expm1, np.floor, np.log,
np.log10, np.log1p, np.log2, np.rad2deg,
np.sin, np.sinh, np.sqrt, np.tan, np.tanh, np.trunc]:
y = fct(x)
f = theano.function([x], y)
utt.assert_allclose(np.nan_to_num(f(data)),
np.nan_to_num(fct(data)))
def __init__(self, points, rho, dimension):
"""Constructor
Initializes the grid and helper structures using the provided points
and rho parameter.
Args:
points: A numpy array containing the coordinates of the particles.
rho: Needed to compute the rho-boundary of the system.
dimension: The dimension of the particle system.
"""
self.points = points
self.rho = rho
self.dimension = dimension
self.cell_size = 2.0 * rho
self.aabb_min = np.amin(points, axis=0)
self.aabb_max = np.amax(points, axis=0)
self.grid_dims = (self.aabb_max - self.aabb_min) / self.cell_size
# Regarding the + 3: 1 for left side, 1 for right side, 1 for rounding
# up
self.grid_dims = np.trunc(self.grid_dims) + 3
self.grid_dims = self.grid_dims.astype(int)
self.grid_min = self.aabb_min - self.cell_size
self.grid_max = self.grid_min + self.grid_dims * self.cell_size
self.grid_count = np.zeros(self.grid_dims, dtype=int)
self.grid_elems = np.empty(self.grid_dims, dtype=object)
self.update_grid()
self.tree = NeighborsTree(
self.points, leaf_size=10, metric='euclidean')
self.neighbor_cell_list = self.compute_neighbor_cell_list()
def _detect_anoms(data, k=0.49, alpha=0.05, num_obs_per_period=None,
use_decomp=True, use_esd=False, direction="pos", verbose=False):
# validation
assert num_obs_per_period, "must supply period length for time series decomposition"
assert direction in ['pos', 'neg', 'both'], 'direction options: pos | neg | both'
assert data.size >= num_obs_per_period * 2, 'Anomaly detection needs at least 2 periods worth of data'
assert data[data.isnull()].empty, 'Data contains NA. We suggest replacing NA with interpolated values before detecting anomaly'
# conversion
one_tail = True if direction in ['pos', 'neg'] else False
upper_tail = True if direction in ['pos', 'both'] else False
n = data.size
# -- Step 1: Decompose data. This returns a univarite remainder which will be used for anomaly detection. Optionally, we might NOT decompose.
# Note: R use stl, but here we will use MA, the result may be different TODO.. Here need improvement
decomposed = sm.tsa.seasonal_decompose(data, freq=num_obs_per_period, two_sided=False)
smoothed = data - decomposed.resid.fillna(0)
data = data - decomposed.seasonal - data.mean()
max_outliers = int(np.trunc(data.size * k))
assert max_outliers, 'With longterm=TRUE, AnomalyDetection splits the data into 2 week periods by default. You have {0} observations in a period, which is too few. Set a higher piecewise_median_period_weeks.'.format(data.size)
R_idx = pd.Series()
# Compute test statistic until r=max_outliers values have been
# removed from the sample.
for i in range(1, max_outliers + 1):
if verbose:
print(i, '/', max_outliers, ' completed')
if not data.mad():
break
if not one_tail:
ares = abs(data - data.median())
elif upper_tail:
ares = data - data.median()
else:
ares = data.median() - data
ares = ares / data.mad()
tmp_anom_index = ares[ares.values == ares.max()].index
cand = pd.Series(data.loc[tmp_anom_index], index=tmp_anom_index)
data.drop(tmp_anom_index, inplace=True)
# Compute critical value.
p = 1 - alpha / (n - i + 1) if one_tail else (1 - alpha / (2 * (n - i + 1)))
t = sp.stats.t.ppf(p, n - i - 1)
lam = t * (n - i) / np.sqrt((n - i - 1 + t ** 2) * (n - i + 1))
if ares.max() > lam:
R_idx = R_idx.append(cand)
return {
'anoms': R_idx,
'stl': smoothed
}
def is_stationary(self, x):
"""Test whether the time series is stationary.
Parameters
----------
x : array-like, shape=(n_samples,)
The time series vector.
"""
if not self._base_case(x):
return np.nan, False
# ensure vector
x = column_or_1d(check_array(
x, ensure_2d=False, dtype=DTYPE,
force_all_finite=True)) # type: np.ndarray
n = x.shape[0]
# check on status of null
null = self.null
# fit a model on an arange to determine the residuals
if null == 'trend':
t = np.arange(n).reshape(n, 1)
# these numbers came out of the R code.. I've found 0 doc for these
table = c(0.216, 0.176, 0.146, 0.119)
elif null == 'level':
t = np.ones(n).reshape(n, 1)
# these numbers came out of the R code.. I've found 0 doc for these
table = c(0.739, 0.574, 0.463, 0.347)
else:
raise ValueError("null must be one of %r" % self._valid)
# fit the model
lm = LinearRegression().fit(t, x)
e = x - lm.predict(t) # residuals
tablep = c(0.01, 0.025, 0.05, 0.10)
s = np.cumsum(e)
eta = (s * s).sum() / (n**2)
s2 = (e * e).sum() / n
scalar, denom = 10, 14
if self.lshort:
scalar, denom = 3, 13
l = int(np.trunc(scalar * np.sqrt(n) / denom))
# compute the C subroutine
s2 = C_tseries_pp_sum(e, n, l, s2)
stat = eta / s2
# do approximation
_, pval = approx(table, tablep, xout=stat, rule=2)
# R does a test for rule=1, but we don't want to do that, because they
# just do it to issue a warning in case the P-value is smaller/greater
# than the printed value is.
return pval[0], pval[0] < self.alpha