Python numpy 模块,e() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用numpy.e()。
def rf(train_sample, validation_sample, features, seed):
log_base = np.e
rf_est = RandomForestRegressor(n_estimators=500,
criterion='mse',
max_features=4,
max_depth=None,
bootstrap=True,
min_samples_split=4,
min_samples_leaf=1,
min_weight_fraction_leaf=0,
max_leaf_nodes=None,
random_state=seed
).fit(
train_sample[features], np.log1p(train_sample['volume']) / np.log(log_base))
rf_prob = np.power(log_base, rf_est.predict(validation_sample[features])) - 1
print_mape(validation_sample['volume'], rf_prob, 'RF')
return rf_prob
def test_closing_fid(self):
# Test that issue #1517 (too many opened files) remains closed
# It might be a "weak" test since failed to get triggered on
# e.g. Debian sid of 2012 Jul 05 but was reported to
# trigger the failure on Ubuntu 10.04:
# http://projects.scipy.org/numpy/ticket/1517#comment:2
with temppath(suffix='.npz') as tmp:
np.savez(tmp, data='LOVELY LOAD')
# We need to check if the garbage collector can properly close
# numpy npz file returned by np.load when their reference count
# goes to zero. Python 3 running in debug mode raises a
# ResourceWarning when file closing is left to the garbage
# collector, so we catch the warnings. Because ResourceWarning
# is unknown in Python < 3.x, we take the easy way out and
# catch all warnings.
with warnings.catch_warnings():
warnings.simplefilter("ignore")
for i in range(1, 1025):
try:
np.load(tmp)["data"]
except Exception as e:
msg = "Failed to load data from a file: %s" % e
raise AssertionError(msg)
def test_invalid_raise(self):
# Test invalid raise
data = ["1, 1, 1, 1, 1"] * 50
for i in range(5):
data[10 * i] = "2, 2, 2, 2 2"
data.insert(0, "a, b, c, d, e")
mdata = TextIO("\n".join(data))
#
kwargs = dict(delimiter=",", dtype=None, names=True)
# XXX: is there a better way to get the return value of the
# callable in assert_warns ?
ret = {}
def f(_ret={}):
_ret['mtest'] = np.ndfromtxt(mdata, invalid_raise=False, **kwargs)
assert_warns(ConversionWarning, f, _ret=ret)
mtest = ret['mtest']
assert_equal(len(mtest), 45)
assert_equal(mtest, np.ones(45, dtype=[(_, int) for _ in 'abcde']))
#
mdata.seek(0)
assert_raises(ValueError, np.ndfromtxt, mdata,
delimiter=",", names=True)
def result_pretty(self, number_of_runs=0, time_str=None,
fbestever=None):
"""pretty print result.
Returns ``self.result()``
"""
if fbestever is None:
fbestever = self.best.f
s = (' after %i restart' + ('s' if number_of_runs > 1 else '')) \
% number_of_runs if number_of_runs else ''
for k, v in list(self.stop().items()):
print('termination on %s=%s%s' % (k, str(v), s +
(' (%s)' % time_str if time_str else '')))
print('final/bestever f-value = %e %e' % (self.best.last.f,
fbestever))
if self.N < 9:
print('incumbent solution: ' + str(list(self.gp.pheno(self.mean, into_bounds=self.boundary_handler.repair))))
print('std deviation: ' + str(list(self.sigma * self.sigma_vec * sqrt(self.dC) * self.gp.scales)))
else:
print('incumbent solution: %s ...]' % (str(self.gp.pheno(self.mean, into_bounds=self.boundary_handler.repair)[:8])[:-1]))
print('std deviations: %s ...]' % (str((self.sigma * self.sigma_vec * sqrt(self.dC) * self.gp.scales)[:8])[:-1]))
return self.result()
def result_pretty(self, number_of_runs=0, time_str=None,
fbestever=None):
"""pretty print result.
Returns ``self.result()``
"""
if fbestever is None:
fbestever = self.best.f
s = (' after %i restart' + ('s' if number_of_runs > 1 else '')) \
% number_of_runs if number_of_runs else ''
for k, v in list(self.stop().items()):
print('termination on %s=%s%s' % (k, str(v), s +
(' (%s)' % time_str if time_str else '')))
print('final/bestever f-value = %e %e' % (self.best.last.f,
fbestever))
if self.N < 9:
print('incumbent solution: ' + str(list(self.gp.pheno(self.mean, into_bounds=self.boundary_handler.repair))))
print('std deviation: ' + str(list(self.sigma * self.sigma_vec * sqrt(self.dC) * self.gp.scales)))
else:
print('incumbent solution: %s ...]' % (str(self.gp.pheno(self.mean, into_bounds=self.boundary_handler.repair)[:8])[:-1]))
print('std deviations: %s ...]' % (str((self.sigma * self.sigma_vec * sqrt(self.dC) * self.gp.scales)[:8])[:-1]))
return self.result()
def build_graph(self, actor, critic, cfg):
self.ph_action = graph.Placeholder(np.float32, shape=(None, actor.action_size), name="ph_action")
self.ph_advantage = graph.Placeholder(np.float32, shape=(None,), name="ph_adv")
self.ph_discounted_reward = graph.Placeholder(np.float32, shape=(None,), name="ph_edr")
mu, sigma2 = actor.node
sigma2 += tf.constant(1e-8)
log_std_dev = tf.log(sigma2)
self.entropy = tf.reduce_mean(log_std_dev + tf.constant(0.5 * np.log(2. * np.pi * np.e), tf.float32))
l2_dist = tf.square(self.ph_action.node - mu)
sqr_std_dev = tf.constant(2.) * tf.square(sigma2) + tf.constant(1e-6)
log_std_dev = tf.log(sigma2)
log_prob = -l2_dist / sqr_std_dev - tf.constant(.5) * tf.log(tf.constant(2 * np.pi)) - log_std_dev
self.policy_loss = -(tf.reduce_mean(tf.reduce_sum(log_prob, axis=1) * self.ph_advantage.node)
+ cfg.entropy_beta * self.entropy)
# Learning rate for the Critic is sized by critic_scale parameter
self.value_loss = cfg.critic_scale * tf.reduce_mean(tf.square(self.ph_discounted_reward.node - critic.node))
def result_pretty(self, number_of_runs=0, time_str=None,
fbestever=None):
"""pretty print result.
Returns ``self.result()``
"""
if fbestever is None:
fbestever = self.best.f
s = (' after %i restart' + ('s' if number_of_runs > 1 else '')) \
% number_of_runs if number_of_runs else ''
for k, v in self.stop().items():
print('termination on %s=%s%s' % (k, str(v), s +
(' (%s)' % time_str if time_str else '')))
print('final/bestever f-value = %e %e' % (self.best.last.f,
fbestever))
if self.N < 9:
print('incumbent solution: ' + str(list(self.gp.pheno(self.mean, into_bounds=self.boundary_handler.repair))))
print('std deviation: ' + str(list(self.sigma * self.sigma_vec * sqrt(self.dC) * self.gp.scales)))
else:
print('incumbent solution: %s ...]' % (str(self.gp.pheno(self.mean, into_bounds=self.boundary_handler.repair)[:8])[:-1]))
print('std deviations: %s ...]' % (str((self.sigma * self.sigma_vec * sqrt(self.dC) * self.gp.scales)[:8])[:-1]))
return self.result()
def test_closing_fid(self):
# Test that issue #1517 (too many opened files) remains closed
# It might be a "weak" test since failed to get triggered on
# e.g. Debian sid of 2012 Jul 05 but was reported to
# trigger the failure on Ubuntu 10.04:
# http://projects.scipy.org/numpy/ticket/1517#comment:2
with temppath(suffix='.npz') as tmp:
np.savez(tmp, data='LOVELY LOAD')
# We need to check if the garbage collector can properly close
# numpy npz file returned by np.load when their reference count
# goes to zero. Python 3 running in debug mode raises a
# ResourceWarning when file closing is left to the garbage
# collector, so we catch the warnings. Because ResourceWarning
# is unknown in Python < 3.x, we take the easy way out and
# catch all warnings.
with warnings.catch_warnings():
warnings.simplefilter("ignore")
for i in range(1, 1025):
try:
np.load(tmp)["data"]
except Exception as e:
msg = "Failed to load data from a file: %s" % e
raise AssertionError(msg)
def test_invalid_raise(self):
# Test invalid raise
data = ["1, 1, 1, 1, 1"] * 50
for i in range(5):
data[10 * i] = "2, 2, 2, 2 2"
data.insert(0, "a, b, c, d, e")
mdata = TextIO("\n".join(data))
#
kwargs = dict(delimiter=",", dtype=None, names=True)
# XXX: is there a better way to get the return value of the
# callable in assert_warns ?
ret = {}
def f(_ret={}):
_ret['mtest'] = np.ndfromtxt(mdata, invalid_raise=False, **kwargs)
assert_warns(ConversionWarning, f, _ret=ret)
mtest = ret['mtest']
assert_equal(len(mtest), 45)
assert_equal(mtest, np.ones(45, dtype=[(_, int) for _ in 'abcde']))
#
mdata.seek(0)
assert_raises(ValueError, np.ndfromtxt, mdata,
delimiter=",", names=True)
def Entropy(self, tau, mean, std, sigman=1.0):
"""
Predictive entropy acquisition function
Parameters
----------
tau: float
Best observed function evaluation.
mean: float
Point mean of the posterior process.
std: float
Point std of the posterior process.
sigman: float
Noise variance
Returns
-------
float:
Predictive entropy.
"""
sp2 = std **2 + sigman
return 0.5 * np.log(2 * np.pi * np.e * sp2)
def load_flux(self, parameters):
'''
Load just the flux from the grid, with possibly an index truncation.
:param parameters: the stellar parameters
:type parameters: dict
:raises KeyError: if spectrum is not found in the HDF5 file.
:returns: flux array
'''
key = self.flux_name.format(**parameters)
with h5py.File(self.filename, "r") as hdf5:
try:
if self.ind is not None:
fl = hdf5['flux'][key][self.ind[0]:self.ind[1]]
else:
fl = hdf5['flux'][key][:]
except KeyError as e:
raise GridError(e)
# Note: will raise a KeyError if the file is not found.
return fl
def __call__(self, value):
'''
Evaluate the interpolator at a parameter.
:param value:
:type value: float
:raises C.InterpolationError: if *value* is out of bounds.
:returns: ((low_val, high_val), (frac_low, frac_high)), the lower and higher bounding points in the grid
and the fractional distance (0 - 1) between them and the value.
'''
try:
index = self.index_interpolator(value)
except ValueError as e:
raise InterpolationError("Requested value {} is out of bounds. {}".format(value, e))
high = np.ceil(index)
low = np.floor(index)
frac_index = index - low
return ((self.parameter_list[low], self.parameter_list[high]), ((1 - frac_index), frac_index))
def HelCorr(header, observatory="CTIO", idlpath="/Applications/exelis/idl83/bin/idl", debug=False):
"""
Similar to HelCorr_IRAF, but attempts to use an IDL library.
See HelCorr_IRAF docstring for details.
"""
ra = 15.0 * convert(header['RA'])
dec = convert(header['DEC'])
jd = float(header['jd'])
cmd_list = [idlpath,
'-e',
("print, barycorr({:.8f}, {:.8f}, {:.8f}, 0,"
" obsname='{}')".format(jd, ra, dec, observatory)),
]
if debug:
print("RA: ", ra)
print("DEC: ", dec)
print("JD: ", jd)
output = subprocess.check_output(cmd_list).split("\n")
if debug:
for line in output:
print(line)
return float(output[-2])
def FF_Yang_Dou_residual(vbyu, *args):
"""
The Yang_Dou residual function; to be used by numerical root finder
"""
(Re, rough) = args
Rstar = Re / (2 * vbyu * rough)
theta = np.pi * np.log( Rstar / 1.25) / np.log(100 / 1.25)
alpha = (1 - np.cos(theta)) / 2
beta = 1 - (1 - 0.107) * (alpha + theta/np.pi) / 2
R = Re / (2 * vbyu)
rt = 1.
for i in range(1,5):
rt = rt - 1. / np.e * ( i / factorial(i) * (67.8 / R) ** (2 * i))
return vbyu - (1 - rt) * R / 4. - rt * (2.5 * np.log(R) - 66.69 * R**-0.72 + 1.8 - (2.5 * np.log(
(1 + alpha * Rstar / 5) / (1 + alpha * beta * Rstar / 5)) + (5.8 + 1.25) * (alpha * Rstar / (
5 + alpha * Rstar)) ** 2 + 2.5 * (alpha * Rstar / (5 + alpha * Rstar)) - (5.8 + 1.25)
* (alpha * beta * Rstar / (5 + alpha * beta * Rstar)) ** 2 - 2.5 * (alpha * beta * Rstar / (
5 + alpha * beta * Rstar))))
def take_step(self):
curr_best = self.current_best
nn = self.random_move(self.node)
score = self.utility_function(nn)
if np.random.uniform() < np.e ** ((self.current_best - score) / self.temperature):
self.node = nn
self.current_best = score
self.temperature *= self.alpha
# If no improvement return false
if self.current_best == curr_best:
return False
return True
def test_invalid_raise(self):
# Test invalid raise
data = ["1, 1, 1, 1, 1"] * 50
for i in range(5):
data[10 * i] = "2, 2, 2, 2 2"
data.insert(0, "a, b, c, d, e")
mdata = TextIO("\n".join(data))
#
kwargs = dict(delimiter=",", dtype=None, names=True)
# XXX: is there a better way to get the return value of the
# callable in assert_warns ?
ret = {}
def f(_ret={}):
_ret['mtest'] = np.ndfromtxt(mdata, invalid_raise=False, **kwargs)
assert_warns(ConversionWarning, f, _ret=ret)
mtest = ret['mtest']
assert_equal(len(mtest), 45)
assert_equal(mtest, np.ones(45, dtype=[(_, int) for _ in 'abcde']))
#
mdata.seek(0)
assert_raises(ValueError, np.ndfromtxt, mdata,
delimiter=",", names=True)
def lccor(rc,bs=0,fs=1,step=1,kind='int'):
import numpy as np
from AnalysisFunctions import fcorr
ie=1/np.e
rc.vars2load(['bx','by','bz'])
tt = np.zeros((fs-bs)/step)
lxc = np.zeros((fs-bs)/step)
lyc = np.zeros((fs-bs)/step)
lc = np.zeros((fs-bs)/step)
for i in range(bs,fs,step):
print i; idx = (i-bs)/step
rc.loadslice(i);
tt[idx] = rc.time
rx,bxcor=fcorr(rc.bx,rc.bx,ax=0,dx=rc.dx)
ry,bycor=fcorr(rc.by,rc.by,ax=1,dx=rc.dy)
if kind == "ie":
lxc[idx]=rx[abs(bxcor-ie).argmin()]
lyc[idx]=ry[abs(bycor-ie).argmin()]
elif kind == "int":
lxc[idx]=np.sum(bxcor)*rc.dx
lyc[idx]=np.sum(bycor)*rc.dy
lc[idx] = 0.5*(lxc[idx]+lyc[idx])
print tt[idx],lxc[idx],lyc[idx],lc[idx]
return tt,lxc,lyc,lc
def QFT(self,nqbits):
N = 2**nqbits # number of rows and cols
theta = 2.0 * np.pi / N
opmat = [None]*N
for i in range(N):
# print "row",i,"--------------------"
row = []
for j in range(N):
pow = i * j
pow = pow % N
# print "w^",pow
row.append(np.e**(1.j*theta*pow))
opmat[i] = row
# print opmat
opmat = np.matrix(opmat,dtype=complex) / np.sqrt(N)
oper = ["QFT({:d})".format(nqbits),opmat]
return oper
def gain_factor(theta):
gain = np.empty_like(theta)
mask = theta <= 87.541
gain[mask] = (58 + 4 / np.cos(np.deg2rad(theta[mask]))) / 5
mask = np.logical_and(theta <= 96, 87.541 < theta)
gain[mask] = (123 * np.exp(1.06 * (theta[mask] - 89.589)) *
((theta[mask] - 93)**2 / 18 + 0.5))
mask = np.logical_and(96 < theta, theta <= 101)
gain[mask] = 123 * np.exp(1.06 * (theta[mask] - 89.589))
mask = np.logical_and(101 < theta, theta <= 103.49)
gain[mask] = (123 * np.exp(1.06 * (101 - 89.589)) *
np.log(theta[mask] - (101 - np.e)) ** 2)
gain[theta > 103.49] = 6.0e7
return gain
def log_bf(p, s):
"""
log10 of the multi-way Bayes factor, see eq.(18)
p: separations matrix (NxN matrix of arrays)
s: errors (list of N arrays)
"""
n = len(s)
# precision parameter w = 1/sigma^2
w = [numpy.asarray(si, dtype=numpy.float)**-2. for si in s]
norm = (n - 1) * log(2) + 2 * (n - 1) * log_arcsec2rad
wsum = numpy.sum(w, axis=0)
s = numpy.sum(log(w), axis=0) - log(wsum)
q = 0
for i, wi in enumerate(w):
for j, wj in enumerate(w):
if i < j:
q += wi * wj * p[i][j]**2
exponent = - q / 2 / wsum
return (norm + s + exponent) * log10(e)
def aggregate_kvis(self):
kvis_list = [(k.ref_temp_k, (k.m_2_s, False))
for k in self.culled_kvis()]
if hasattr(self.record, 'dvis'):
dvis_list = [(d.ref_temp_k,
(est.dvis_to_kvis(d.kg_ms,
self.density_at_temp(d.ref_temp_k)
),
True)
)
for d in list(self.non_redundant_dvis())]
agg = dict(dvis_list)
agg.update(kvis_list)
else:
agg = dict(kvis_list)
out_items = sorted([(i[0], i[1][0], i[1][1])
for i in agg.iteritems()])
kvis_out, estimated = zip(*[(KVis(m_2_s=k, ref_temp_k=t), e)
for t, k, e in out_items])
return kvis_out, estimated
def test_closing_fid(self):
# Test that issue #1517 (too many opened files) remains closed
# It might be a "weak" test since failed to get triggered on
# e.g. Debian sid of 2012 Jul 05 but was reported to
# trigger the failure on Ubuntu 10.04:
# http://projects.scipy.org/numpy/ticket/1517#comment:2
with temppath(suffix='.npz') as tmp:
np.savez(tmp, data='LOVELY LOAD')
# We need to check if the garbage collector can properly close
# numpy npz file returned by np.load when their reference count
# goes to zero. Python 3 running in debug mode raises a
# ResourceWarning when file closing is left to the garbage
# collector, so we catch the warnings. Because ResourceWarning
# is unknown in Python < 3.x, we take the easy way out and
# catch all warnings.
with warnings.catch_warnings():
warnings.simplefilter("ignore")
for i in range(1, 1025):
try:
np.load(tmp)["data"]
except Exception as e:
msg = "Failed to load data from a file: %s" % e
raise AssertionError(msg)
def test_invalid_raise(self):
# Test invalid raise
data = ["1, 1, 1, 1, 1"] * 50
for i in range(5):
data[10 * i] = "2, 2, 2, 2 2"
data.insert(0, "a, b, c, d, e")
mdata = TextIO("\n".join(data))
#
kwargs = dict(delimiter=",", dtype=None, names=True)
# XXX: is there a better way to get the return value of the
# callable in assert_warns ?
ret = {}
def f(_ret={}):
_ret['mtest'] = np.ndfromtxt(mdata, invalid_raise=False, **kwargs)
assert_warns(ConversionWarning, f, _ret=ret)
mtest = ret['mtest']
assert_equal(len(mtest), 45)
assert_equal(mtest, np.ones(45, dtype=[(_, int) for _ in 'abcde']))
#
mdata.seek(0)
assert_raises(ValueError, np.ndfromtxt, mdata,
delimiter=",", names=True)
def test_closing_fid(self):
# Test that issue #1517 (too many opened files) remains closed
# It might be a "weak" test since failed to get triggered on
# e.g. Debian sid of 2012 Jul 05 but was reported to
# trigger the failure on Ubuntu 10.04:
# http://projects.scipy.org/numpy/ticket/1517#comment:2
with temppath(suffix='.npz') as tmp:
np.savez(tmp, data='LOVELY LOAD')
# We need to check if the garbage collector can properly close
# numpy npz file returned by np.load when their reference count
# goes to zero. Python 3 running in debug mode raises a
# ResourceWarning when file closing is left to the garbage
# collector, so we catch the warnings. Because ResourceWarning
# is unknown in Python < 3.x, we take the easy way out and
# catch all warnings.
with suppress_warnings() as sup:
sup.filter(Warning) # TODO: specify exact message
for i in range(1, 1025):
try:
np.load(tmp)["data"]
except Exception as e:
msg = "Failed to load data from a file: %s" % e
raise AssertionError(msg)
def test_invalid_raise(self):
# Test invalid raise
data = ["1, 1, 1, 1, 1"] * 50
for i in range(5):
data[10 * i] = "2, 2, 2, 2 2"
data.insert(0, "a, b, c, d, e")
mdata = TextIO("\n".join(data))
#
kwargs = dict(delimiter=",", dtype=None, names=True)
# XXX: is there a better way to get the return value of the
# callable in assert_warns ?
ret = {}
def f(_ret={}):
_ret['mtest'] = np.ndfromtxt(mdata, invalid_raise=False, **kwargs)
assert_warns(ConversionWarning, f, _ret=ret)
mtest = ret['mtest']
assert_equal(len(mtest), 45)
assert_equal(mtest, np.ones(45, dtype=[(_, int) for _ in 'abcde']))
#
mdata.seek(0)
assert_raises(ValueError, np.ndfromtxt, mdata,
delimiter=",", names=True)
def gaussian_entropy(sigma):
"""Get the entropy of a multivariate Gaussian distribution with
ALL DIMENSIONS INDEPENDENT.
C.f. eq.(8.7) of [here](http://www.biopsychology.org/norwich/isp/\
chap8.pdf).
NOTE:
Gaussian entropy is independent of its center `mu`.
Args:
sigma:
Tensor of shape `[None]`.
Returns:
Scalar.
"""
n_dims = np.prod(sigma.get_shape().as_list())
return 0.5 * n_dims * tf.log(2. * np.pi * np.e) \
+ tf.reduce_sum(tf.log(sigma))
def result_pretty(self, number_of_runs=0, time_str=None,
fbestever=None):
"""pretty print result.
Returns ``self.result()``
"""
if fbestever is None:
fbestever = self.best.f
s = (' after %i restart' + ('s' if number_of_runs > 1 else '')) \
% number_of_runs if number_of_runs else ''
for k, v in list(self.stop().items()):
print('termination on %s=%s%s' % (k, str(v), s +
(' (%s)' % time_str if time_str else '')))
print('final/bestever f-value = %e %e' % (self.best.last.f,
fbestever))
if self.N < 9:
print('incumbent solution: ' + str(list(self.gp.pheno(self.mean, into_bounds=self.boundary_handler.repair))))
print('std deviation: ' + str(list(self.sigma * self.sigma_vec * sqrt(self.dC) * self.gp.scales)))
else:
print('incumbent solution: %s ...]' % (str(self.gp.pheno(self.mean, into_bounds=self.boundary_handler.repair)[:8])[:-1]))
print('std deviations: %s ...]' % (str((self.sigma * self.sigma_vec * sqrt(self.dC) * self.gp.scales)[:8])[:-1]))
return self.result()
def xgboost(train_sample, validation_sample, features, model_param):
def evalmape(preds, dtrain):
labels = dtrain.get_label()
preds = np.power(log_base, preds) - 1
# return a pair metric_name, result
# since preds are margin(before logistic transformation, cutoff at 0)
return 'mape', np.abs((labels - preds) / labels).sum() / len(labels)
param = {'max_depth': model_param['depth'], 'eta': model_param['lr'], 'silent': 1, 'objective': 'reg:linear', 'booster': 'gbtree',
'subsample': model_param['sample'],
'seed':model_param['seed'],
'colsample_bytree':1, 'min_child_weight':1, 'gamma':0}
param['eval_metric'] = 'mae'
num_round = model_param['tree']
log_base = np.e
plst = param.items()
dtrain = xgb.DMatrix(train_sample[features], np.log1p(train_sample['volume'])/np.log(log_base))
dtest = xgb.DMatrix(validation_sample[features], validation_sample['volume'])
watchlist = [(dtest, 'eval'), (dtrain, 'train')]
bst = xgb.train(plst, dtrain, num_round, watchlist, feval=evalmape)
xgboost_prob = np.power(log_base, bst.predict(dtest)) - 1
# MAPE
print_mape(validation_sample['volume'], xgboost_prob, 'XGBOOST')
return xgboost_prob
def exrf(train_sample, validation_sample, features, seed):
log_base = np.e
exrf_est = ExtraTreesRegressor(n_estimators=1000,
criterion='mse',
max_features='auto',
max_depth=None,
bootstrap=True,
min_samples_split=4,
min_samples_leaf=1,
min_weight_fraction_leaf=0,
max_leaf_nodes=None,
random_state=seed
).fit(
train_sample[features], np.log1p(train_sample['volume']) / np.log(log_base))
exrf_prob = np.power(log_base, exrf_est.predict(validation_sample[features])) - 1
print_mape(validation_sample['volume'], exrf_prob, 'EXTRA-RF')
return exrf_prob
def result_pretty(self, number_of_runs=0, time_str=None,
fbestever=None):
"""pretty print result.
Returns ``self.result()``
"""
if fbestever is None:
fbestever = self.best.f
s = (' after %i restart' + ('s' if number_of_runs > 1 else '')) \
% number_of_runs if number_of_runs else ''
for k, v in list(self.stop().items()):
print('termination on %s=%s%s' % (k, str(v), s +
(' (%s)' % time_str if time_str else '')))
print('final/bestever f-value = %e %e' % (self.best.last.f,
fbestever))
if self.N < 9:
print('incumbent solution: ' + str(list(self.gp.pheno(self.mean, into_bounds=self.boundary_handler.repair))))
print('std deviation: ' + str(list(self.sigma * self.sigma_vec * sqrt(self.dC) * self.gp.scales)))
else:
print('incumbent solution: %s ...]' % (str(self.gp.pheno(self.mean, into_bounds=self.boundary_handler.repair)[:8])[:-1]))
print('std deviations: %s ...]' % (str((self.sigma * self.sigma_vec * sqrt(self.dC) * self.gp.scales)[:8])[:-1]))
return self.result()
def define_assignment_operator(parser):
"""Define assignment and reading of simple variables."""
parser.calculator_symbol_dict = {} # Store symbol dict as a new parser attribute.
symbol_dict = parser.calculator_symbol_dict
symbol_dict["pi"] = np.pi # Predefine pi.
symbol_dict["e"] = np.e # Predefine e.
# Note that on_ties for identifiers is set to -1, so that when string
# lengths are equal defined function names will take precedence over generic
# identifiers (which are only defined as a group regex).
parser.def_token("k_identifier", r"[a-zA-Z_](?:\w*)", on_ties=-1)
parser.def_literal("k_identifier", eval_fun=lambda t: symbol_dict.get(t.value, 0.0))
def eval_assign(t):
"""Evaluate the identifier token `t` and save the value in `symbol_dict`."""
rhs = t[1].eval_subtree()
symbol_dict[t[0].value] = rhs
return rhs
parser.def_infix_op("k_equals", 5, "right",
precond_fun=lambda tok, lex: lex.peek(-1).token_label == "k_identifier",
eval_fun=eval_assign)
def test_closing_fid(self):
# Test that issue #1517 (too many opened files) remains closed
# It might be a "weak" test since failed to get triggered on
# e.g. Debian sid of 2012 Jul 05 but was reported to
# trigger the failure on Ubuntu 10.04:
# http://projects.scipy.org/numpy/ticket/1517#comment:2
with temppath(suffix='.npz') as tmp:
np.savez(tmp, data='LOVELY LOAD')
# We need to check if the garbage collector can properly close
# numpy npz file returned by np.load when their reference count
# goes to zero. Python 3 running in debug mode raises a
# ResourceWarning when file closing is left to the garbage
# collector, so we catch the warnings. Because ResourceWarning
# is unknown in Python < 3.x, we take the easy way out and
# catch all warnings.
with warnings.catch_warnings():
warnings.simplefilter("ignore")
for i in range(1, 1025):
try:
np.load(tmp)["data"]
except Exception as e:
msg = "Failed to load data from a file: %s" % e
raise AssertionError(msg)
def test_invalid_raise(self):
# Test invalid raise
data = ["1, 1, 1, 1, 1"] * 50
for i in range(5):
data[10 * i] = "2, 2, 2, 2 2"
data.insert(0, "a, b, c, d, e")
mdata = TextIO("\n".join(data))
#
kwargs = dict(delimiter=",", dtype=None, names=True)
# XXX: is there a better way to get the return value of the
# callable in assert_warns ?
ret = {}
def f(_ret={}):
_ret['mtest'] = np.ndfromtxt(mdata, invalid_raise=False, **kwargs)
assert_warns(ConversionWarning, f, _ret=ret)
mtest = ret['mtest']
assert_equal(len(mtest), 45)
assert_equal(mtest, np.ones(45, dtype=[(_, int) for _ in 'abcde']))
#
mdata.seek(0)
assert_raises(ValueError, np.ndfromtxt, mdata,
delimiter=",", names=True)
def test_real(self):
val = ng.get_data('const.e')
assert type(val) == np.ndarray
assert len(val) == 1
assert val.dtype == 'float64'
assert val[0] == pytest.approx(np.e)
def entropy(self):
return U.sum(self.logstd + .5 * np.log(2.0 * np.pi * np.e), -1)
def test_uniformfloat_to_integer(self):
f1 = UniformFloatHyperparameter("param", 1, 10, q=0.1, log=True)
with warnings.catch_warnings():
f2 = f1.to_integer()
warnings.simplefilter("ignore")
# TODO is this a useful rounding?
# TODO should there be any rounding, if e.g. lower=0.1
self.assertEqual("param, Type: UniformInteger, Range: [1, 10], "
"Default: 3, on log-scale", str(f2))
def gl_quad3d(fun,n,x_lim = None,y_lim = None,z_lim = None,args=()):
if x_lim is None:
a,b = -1, 1
else:
a,b= x_lim[0],x_lim[1]
if y_lim is None:
c ,d = -1,1
else:
c ,d = y_lim[0],y_lim[1]
if z_lim is None:
e,f= -1,1
else:
e ,f = z_lim[0],z_lim[1]
if not callable(fun):
return (b-a)*(d-c)*(f-e)*fun
else:
loc,w = np.polynomial.legendre.leggauss(n)
s = (1/8.*(b-a)*(d-c)*(f-e)*fun(((b-a)*v1/2.+(a+b)/2.,
(d-c)*v2/2.+(c+d)/2.,
(f-e)*v3/2.+(e+f)/2.),*args)*w[i]*w[j]*w[k]
for i,v1 in enumerate(loc)
for j,v2 in enumerate(loc)
for k,v3 in enumerate(loc))
return sum(s)
def __iadd__(self, other):
'''add an instance (e.g., from another sentence).'''
if type(other) is tuple:
## avoid creating new CiderScorer instances
self.cook_append(other[0], other[1])
else:
self.ctest.extend(other.ctest)
self.crefs.extend(other.crefs)
return self
def __iadd__(self, other):
'''add an instance (e.g., from another sentence).'''
if type(other) is tuple:
## avoid creating new CiderScorer instances
self.cook_append(other[0], other[1])
else:
self.ctest.extend(other.ctest)
self.crefs.extend(other.crefs)
return self
def __iadd__(self, other):
'''add an instance (e.g., from another sentence).'''
if type(other) is tuple:
## avoid creating new CiderScorer instances
self.cook_append(other[0], other[1])
else:
self.ctest.extend(other.ctest)
self.crefs.extend(other.crefs)
return self
def test_complex_arrays(self):
ncols = 2
nrows = 2
a = np.zeros((ncols, nrows), dtype=np.complex128)
re = np.pi
im = np.e
a[:] = re + 1.0j * im
# One format only
c = BytesIO()
np.savetxt(c, a, fmt=' %+.3e')
c.seek(0)
lines = c.readlines()
assert_equal(
lines,
[b' ( +3.142e+00+ +2.718e+00j) ( +3.142e+00+ +2.718e+00j)\n',
b' ( +3.142e+00+ +2.718e+00j) ( +3.142e+00+ +2.718e+00j)\n'])
# One format for each real and imaginary part
c = BytesIO()
np.savetxt(c, a, fmt=' %+.3e' * 2 * ncols)
c.seek(0)
lines = c.readlines()
assert_equal(
lines,
[b' +3.142e+00 +2.718e+00 +3.142e+00 +2.718e+00\n',
b' +3.142e+00 +2.718e+00 +3.142e+00 +2.718e+00\n'])
# One format for each complex number
c = BytesIO()
np.savetxt(c, a, fmt=['(%.3e%+.3ej)'] * ncols)
c.seek(0)
lines = c.readlines()
assert_equal(
lines,
[b'(3.142e+00+2.718e+00j) (3.142e+00+2.718e+00j)\n',
b'(3.142e+00+2.718e+00j) (3.142e+00+2.718e+00j)\n'])
def test_invalid_raise_with_usecols(self):
# Test invalid_raise with usecols
data = ["1, 1, 1, 1, 1"] * 50
for i in range(5):
data[10 * i] = "2, 2, 2, 2 2"
data.insert(0, "a, b, c, d, e")
mdata = TextIO("\n".join(data))
kwargs = dict(delimiter=",", dtype=None, names=True,
invalid_raise=False)
# XXX: is there a better way to get the return value of the
# callable in assert_warns ?
ret = {}
def f(_ret={}):
_ret['mtest'] = np.ndfromtxt(mdata, usecols=(0, 4), **kwargs)
assert_warns(ConversionWarning, f, _ret=ret)
mtest = ret['mtest']
assert_equal(len(mtest), 45)
assert_equal(mtest, np.ones(45, dtype=[(_, int) for _ in 'ae']))
#
mdata.seek(0)
mtest = np.ndfromtxt(mdata, usecols=(0, 1), **kwargs)
assert_equal(len(mtest), 50)
control = np.ones(50, dtype=[(_, int) for _ in 'ab'])
control[[10 * _ for _ in range(5)]] = (2, 2)
assert_equal(mtest, control)
def __call__(self, samples, x):
z = T.log(self.sigma * T.sqrt(2 * pi)).sum()
d_s = (samples[:, None, :] - x[None, :, :]) / self.sigma[None, None, :]
e = log_mean_exp((-.5 * d_s ** 2).sum(axis=2), axis=0)
return (e - z).mean()
def entropy(self, dist_info):
log_stds = dist_info["log_std"]
return np.sum(log_stds + np.log(np.sqrt(2 * np.pi * np.e)), axis=-1)
def entropy_sym(self, dist_info_var):
log_std_var = dist_info_var["log_std"]
return TT.sum(log_std_var + TT.log(np.sqrt(2 * np.pi * np.e)), axis=-1)
def shift_or_mirror_into_invertible_domain(self, solution_genotype):
"""return the reference solution that has the same ``box_constraints_transformation(solution)``
value, i.e. ``tf.shift_or_mirror_into_invertible_domain(x) = tf.inverse(tf.transform(x))``.
This is an idempotent mapping (leading to the same result independent how often it is
repeatedly applied).
"""
return self.inverse(self(solution_genotype))
raise NotImplementedError('this is an abstract method that should be implemented in the derived class')
def repair_genotype(self, x, copy_if_changed=False):
"""make sure that solutions fit to the sample distribution, this interface will probably change.
In particular the frequency of x - self.mean being long is limited.
"""
x = array(x, copy=False)
mold = array(self.mean, copy=False)
if 1 < 3: # hard clip at upper_length
upper_length = self.N**0.5 + 2 * self.N / (self.N + 2) # should become an Option, but how? e.g. [0, 2, 2]
fac = self.mahalanobis_norm(x - mold) / upper_length
if fac > 1:
if copy_if_changed:
x = (x - mold) / fac + mold
else: # should be 25% faster:
x -= mold
x /= fac
x += mold
# print self.countiter, k, fac, self.mahalanobis_norm(pop[k] - mold)
# adapt also sigma: which are the trust-worthy/injected solutions?
else:
if 'checktail' not in self.__dict__: # hasattr(self, 'checktail')
raise NotImplementedError
# from check_tail_smooth import CheckTail # for the time being
# self.checktail = CheckTail()
# print('untested feature checktail is on')
fac = self.checktail.addchin(self.mahalanobis_norm(x - mold))
if fac < 1:
x = fac * (x - mold) + mold
return x