Python numpy 模块,cast() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用numpy.cast()。
def glorot_normal(shape, gain=1.0, c01b=False):
orig_shape = shape
if c01b:
if len(shape) != 4:
raise RuntimeError(
"If c01b is True, only shapes of length 4 are accepted")
n1, n2 = shape[0], shape[3]
receptive_field_size = shape[1] * shape[2]
else:
if len(shape) < 2:
shape = (1,) + tuple(shape)
n1, n2 = shape[:2]
receptive_field_size = np.prod(shape[2:])
std = gain * np.sqrt(2.0 / ((n1 + n2) * receptive_field_size))
return np.cast[floatX](
get_rng().normal(0.0, std, size=orig_shape))
def adamax_updates(params, cost, lr=0.001, mom1=0.9, mom2=0.999):
updates = []
grads = T.grad(cost, params)
for p, g in zip(params, grads):
mg = th.shared(np.cast[th.config.floatX](p.get_value() * 0.))
v = th.shared(np.cast[th.config.floatX](p.get_value() * 0.))
if mom1>0:
v_t = mom1*v + (1. - mom1)*g
updates.append((v,v_t))
else:
v_t = g
mg_t = T.maximum(mom2*mg, abs(g))
g_t = v_t / (mg_t + 1e-6)
p_t = p - lr * g_t
updates.append((mg, mg_t))
updates.append((p, p_t))
return updates
def adam_updates(params, cost, lr=0.001, mom1=0.9, mom2=0.999):
updates = []
grads = T.grad(cost, params)
t = th.shared(np.cast[th.config.floatX](1.))
for p, g in zip(params, grads):
v = th.shared(np.cast[th.config.floatX](p.get_value() * 0.))
mg = th.shared(np.cast[th.config.floatX](p.get_value() * 0.))
v_t = mom1*v + (1. - mom1)*g
mg_t = mom2*mg + (1. - mom2)*T.square(g)
v_hat = v_t / (1. - mom1 ** t)
mg_hat = mg_t / (1. - mom2 ** t)
g_t = v_hat / T.sqrt(mg_hat + 1e-8)
p_t = p - lr * g_t
updates.append((v, v_t))
updates.append((mg, mg_t))
updates.append((p, p_t))
updates.append((t, t+1))
return updates
def adam_updates(params, cost, lr=0.001, mom1=0.9, mom2=0.999):
updates = []
grads = T.grad(cost, params)
t = th.shared(np.cast[th.config.floatX](1.))
for p, g in zip(params, grads):
v = th.shared(np.cast[th.config.floatX](p.get_value() * 0.))
mg = th.shared(np.cast[th.config.floatX](p.get_value() * 0.))
v_t = mom1*v + (1. - mom1)*g
mg_t = mom2*mg + (1. - mom2)*T.square(g)
v_hat = v_t / (1. - mom1 ** t)
mg_hat = mg_t / (1. - mom2 ** t)
g_t = v_hat / T.sqrt(mg_hat + 1e-8)
p_t = p - lr * g_t
updates.append((v, v_t))
updates.append((mg, mg_t))
updates.append((p, p_t))
updates.append((t, t+1))
return updates
def get_output_for(self, input, deterministic=False, **kwargs):
if deterministic:
norm_features = (input-self.avg_batch_mean.dimshuffle(*self.dimshuffle_args)) / T.sqrt(1e-6 + self.avg_batch_var).dimshuffle(*self.dimshuffle_args)
else:
batch_mean = T.mean(input,axis=self.axes_to_sum).flatten()
centered_input = input-batch_mean.dimshuffle(*self.dimshuffle_args)
batch_var = T.mean(T.square(centered_input),axis=self.axes_to_sum).flatten()
batch_stdv = T.sqrt(1e-6 + batch_var)
norm_features = centered_input / batch_stdv.dimshuffle(*self.dimshuffle_args)
# BN updates
new_m = 0.9*self.avg_batch_mean + 0.1*batch_mean
new_v = 0.9*self.avg_batch_var + T.cast((0.1*input.shape[0])/(input.shape[0]-1),th.config.floatX)*batch_var
self.bn_updates = [(self.avg_batch_mean, new_m), (self.avg_batch_var, new_v)]
if hasattr(self, 'g'):
activation = norm_features*self.g.dimshuffle(*self.dimshuffle_args)
else:
activation = norm_features
if hasattr(self, 'b'):
activation += self.b.dimshuffle(*self.dimshuffle_args)
return self.nonlinearity(activation)
def get_output_for(self, input, deterministic=False, **kwargs):
if deterministic:
norm_features = (input-self.avg_batch_mean.dimshuffle(*self.dimshuffle_args)) / T.sqrt(1e-6 + self.avg_batch_var).dimshuffle(*self.dimshuffle_args)
else:
batch_mean = T.mean(input,axis=self.axes_to_sum).flatten()
centered_input = input-batch_mean.dimshuffle(*self.dimshuffle_args)
batch_var = T.mean(T.square(centered_input),axis=self.axes_to_sum).flatten()
batch_stdv = T.sqrt(1e-6 + batch_var)
norm_features = centered_input / batch_stdv.dimshuffle(*self.dimshuffle_args)
# BN updates
new_m = 0.9*self.avg_batch_mean + 0.1*batch_mean
new_v = 0.9*self.avg_batch_var + T.cast((0.1*input.shape[0])/(input.shape[0]-1),th.config.floatX)*batch_var
self.bn_updates = [(self.avg_batch_mean, new_m), (self.avg_batch_var, new_v)]
if hasattr(self, 'g'):
activation = norm_features*self.g.dimshuffle(*self.dimshuffle_args)
else:
activation = norm_features
if hasattr(self, 'b'):
activation += self.b.dimshuffle(*self.dimshuffle_args)
return self.nonlinearity(activation)
def __call__(self, learning_rate):
"""Update the learning rate according to the exponential decay
schedule.
"""
if self._count == 0.:
self._base_lr = learning_rate.get_vale()
self._count += 1
if not self._min_reached:
new_lr = self._base_lr * (self.decay_factor ** (-self._count))
if new_lr <= self.min_lr:
self._min_reached = True
new_lr = self._min_reached
else:
new_lr = self.min_lr
learning_rate.set_value(np.cast[theano.config.floatX](new_lr))
def as_floatX(variable):
"""
This code is taken from pylearn2:
Casts a given variable into dtype config.floatX
numpy ndarrays will remain numpy ndarrays
python floats will become 0-D ndarrays
all other types will be treated as theano tensors
"""
if isinstance(variable, float):
return numpy.cast[theano.config.floatX](variable)
if isinstance(variable, numpy.ndarray):
return numpy.cast[theano.config.floatX](variable)
return theano.tensor.cast(variable, theano.config.floatX)
def as_floatX(variable):
"""
This code is taken from pylearn2:
Casts a given variable into dtype config.floatX
numpy ndarrays will remain numpy ndarrays
python floats will become 0-D ndarrays
all other types will be treated as theano tensors
"""
if isinstance(variable, float):
return numpy.cast[theano.config.floatX](variable)
if isinstance(variable, numpy.ndarray):
return numpy.cast[theano.config.floatX](variable)
return theano.tensor.cast(variable, theano.config.floatX)
def parameter_prediction(self, test_set_x): #, batch_size
""" This function is to predict the output of NN
:param test_set_x: input features for a testing sentence
:type test_set_x: python array variable
:returns: predicted features
"""
n_test_set_x = test_set_x.shape[0]
test_out = theano.function([], self.final_layer.output,
givens={self.x: test_set_x, self.is_train: np.cast['int32'](0)}, on_unused_input='ignore')
predict_parameter = test_out()
return predict_parameter
## the function to output activations at a hidden layer
def generate_hidden_layer(self, test_set_x, bn_layer_index):
""" This function is to predict the bottleneck features of NN
:param test_set_x: input features for a testing sentence
:type test_set_x: python array variable
:returns: predicted bottleneck features
"""
n_test_set_x = test_set_x.shape[0]
test_out = theano.function([], self.rnn_layers[bn_layer_index].output,
givens={self.x: test_set_x, self.is_train: np.cast['int32'](0)}, on_unused_input='ignore')
predict_parameter = test_out()
return predict_parameter
def parameter_prediction(self, test_set_x): #, batch_size
""" This function is to predict the output of NN
:param test_set_x: input features for a testing sentence
:type test_set_x: python array variable
:returns: predicted features
"""
n_test_set_x = test_set_x.shape[0]
test_out = theano.function([], self.final_layer.output,
givens={self.x: test_set_x[0:n_test_set_x], self.is_train: np.cast['int32'](0)}, on_unused_input='ignore')
predict_parameter = test_out()
return predict_parameter
def parameter_prediction_S2S(self, test_set_x, test_set_d):
""" This function is to predict the output of NN
:param test_set_x: input features for a testing sentence
:param test_set_d: phone durations for a testing sentence
:type test_set_x: python array variable
:type test_set_d: python array variable
:returns: predicted features
"""
n_test_set_x = test_set_x.shape[0]
test_out = theano.function([], self.final_layer.output,
givens={self.x: test_set_x[0:n_test_set_x], self.d: test_set_d[0:n_test_set_x], self.is_train: np.cast['int32'](0)}, on_unused_input='ignore')
predict_parameter = test_out()
return predict_parameter
def generate_hidden_layer(self, test_set_x, bn_layer_index):
""" This function is to predict the bottleneck features of NN
:param test_set_x: input features for a testing sentence
:type test_set_x: python array variable
:returns: predicted bottleneck features
"""
n_test_set_x = test_set_x.shape[0]
test_out = theano.function([], self.rnn_layers[bn_layer_index].output,
givens={self.x: test_set_x, self.is_train: np.cast['int32'](0)}, on_unused_input='ignore')
predict_parameter = test_out()
return predict_parameter
def get_training_data(num_samples):
"""Generates some training data."""
# As (x, y) Cartesian coordinates.
x = np.random.randint(0, 2, size=(num_samples, 2))
y = x[:, 0] + 2 * x[:, 1] # 2-digit binary to integer.
y = np.cast['int32'](y)
x = np.cast['float32'](x) * 1.6 - 0.8 # Scales to [-1, 1].
x += np.random.uniform(-0.1, 0.1, size=x.shape)
y_ohe = np.cast['float32'](np.eye(4)[y])
y = np.cast['float32'](np.expand_dims(y, -1))
return x, y, y_ohe
def pcnn_norm(x, colorspace="RGB", reverse=False):
"""Normalize the input from and to [-1, 1].
Args:
x: input image array (3D or 4D)
colorspace (str): Source/target colorspace, depending on the value of `reverse`
reverse (bool, optional): If False, converts the input from the given colorspace to float in the range [-1, 1].
Otherwise, converts the input to the valid range for the given colorspace. Defaults to False.
Returns:
x_norm: normalized input
"""
if colorspace == "RGB":
return np.cast[np.uint8](x * 127.5 + 127.5) if reverse else np.cast[np.float32]((x - 127.5) / 127.5)
elif colorspace == "lab":
if x.shape[-1] == 1:
return (x * 50. + 50.) if reverse else np.cast[np.float32]((x - 50.) / 50.)
else:
a = np.array([50., +0.5, -0.5], dtype=np.float32)
b = np.array([50., 127.5, 127.5], dtype=np.float32)
return np.cast[np.float64](x * b + a) if reverse else np.cast[np.float32]((x - a) / b)
else:
raise ValueError("Unknown colorspace" % colorspace)
def __init__(self, input, n_in, n_out, prob_drop=0.5, verbose=False):
self.verbose = verbose
self.prob_drop = prob_drop
self.prob_keep = 1.0 - prob_drop
self.flag_on = theano.shared(np.cast[theano.config.floatX](1.0))
self.flag_off = 1.0 - self.flag_on
seed_this = DropoutLayer.seed_common.randint(0, 2**31-1)
mask_rng = theano.tensor.shared_randomstreams.RandomStreams(seed_this)
self.mask = mask_rng.binomial(n=1, p=self.prob_keep, size=input.shape)
self.output = \
self.flag_on * T.cast(self.mask, theano.config.floatX) * input + \
self.flag_off * self.prob_keep * input
DropoutLayer.layers.append(self)
if self.verbose:
print 'dropout layer with P_drop: ' + str(self.prob_drop)
def load_data(dataset):
if dataset.split('.')[-1] == 'gz':
f = gzip.open(dataset, 'r')
else:
f = open(dataset, 'r')
train_set, valid_set, test_set = pkl.load(f)
f.close()
def shared_dataset(data_xy, borrow=True):
data_x, data_y = data_xy
shared_x = theano.shared(
np.asarray(data_x, dtype=theano.config.floatX),
borrow=borrow)
shared_y = theano.shared(
np.asarray(data_y, dtype=theano.config.floatX),
borrow=borrow)
return shared_x, T.cast(shared_y, 'int32')
train_set_x, train_set_y = shared_dataset(train_set)
valid_set_x, valid_set_y = shared_dataset(valid_set)
test_set_x, test_set_y = shared_dataset(test_set)
return [(train_set_x, train_set_y),
(valid_set_x, valid_set_y),
(test_set_x, test_set_y )]
def adam(loss, params, learning_rate, beta1=0.9, beta2=0.999, epsilon=1e-8):
grads = T.grad(loss, params)
updates = OrderedDict()
t_prev = theano.shared(np.cast[theano.config.floatX](0))
t = t_prev + 1
a_t = learning_rate * T.sqrt(1-beta2**t)/(1-beta1**t)
for param, grad in zip(params, grads):
value = param.get_value(borrow=True)
m_prev = theano.shared(
np.zeros(value.shape, dtype=value.dtype),
broadcastable=param.broadcastable)
v_prev = theano.shared(
np.zeros(value.shape, dtype=value.dtype),
broadcastable=param.broadcastable)
m_t = beta1 * m_prev + (1 - beta1) * grad
v_t = beta2 * v_prev + (1 - beta2) * grad ** 2
step = a_t * m_t / (T.sqrt(v_t) + epsilon)
updates[m_prev] = m_t
updates[v_prev] = v_t
updates[param] = param - step
updates[t_prev] = t
return updates
def get_output_for(self, input, deterministic=False, **kwargs):
if deterministic:
norm_features = (input-self.avg_batch_mean.dimshuffle(*self.dimshuffle_args)) / T.sqrt(1e-6 + self.avg_batch_var).dimshuffle(*self.dimshuffle_args)
else:
batch_mean = T.mean(input,axis=self.axes_to_sum).flatten()
centered_input = input-batch_mean.dimshuffle(*self.dimshuffle_args)
batch_var = T.mean(T.square(centered_input),axis=self.axes_to_sum).flatten()
batch_stdv = T.sqrt(1e-6 + batch_var)
norm_features = centered_input / batch_stdv.dimshuffle(*self.dimshuffle_args)
# BN updates
new_m = 0.9*self.avg_batch_mean + 0.1*batch_mean
new_v = 0.9*self.avg_batch_var + T.cast((0.1*input.shape[0])/(input.shape[0]-1),th.config.floatX)*batch_var
self.bn_updates = [(self.avg_batch_mean, new_m), (self.avg_batch_var, new_v)]
if hasattr(self, 'g'):
activation = norm_features*self.g.dimshuffle(*self.dimshuffle_args)
else:
activation = norm_features
if hasattr(self, 'b'):
activation += self.b.dimshuffle(*self.dimshuffle_args)
return self.nonlinearity(activation)
def one_hot(labels, num_classes, name='one_hot'):
"""Transform numeric labels into onehot_labels.
Args:
labels: [batch_size] target labels.
num_classes: total number of classes.
scope: Optional scope for op_scope.
Returns:
one hot encoding of the labels.
"""
with tf.op_scope(name):
batch_size = labels.get_shape()[0]
indices = tf.expand_dims(tf.range(0, batch_size), 1)
labels = tf.cast(tf.expand_dims(labels, 1), indices.dtype)
concated = tf.concat(1, [indices, labels])
onehot_labels = tf.sparse_to_dense(
concated, tf.pack([batch_size, num_classes]), 1.0, 0.0)
onehot_labels.set_shape([batch_size, num_classes])
return onehot_labels
def adam_updates(params, cost, lr=0.001, mom1=0.9, mom2=0.999):
updates = []
grads = T.grad(cost, params)
t = th.shared(np.cast[th.config.floatX](1.))
for p, g in zip(params, grads):
v = th.shared(np.cast[th.config.floatX](p.get_value() * 0.))
mg = th.shared(np.cast[th.config.floatX](p.get_value() * 0.))
v_t = mom1*v + (1. - mom1)*g
mg_t = mom2*mg + (1. - mom2)*T.square(g)
v_hat = v_t / (1. - mom1 ** t)
mg_hat = mg_t / (1. - mom2 ** t)
g_t = v_hat / T.sqrt(mg_hat + 1e-8)
p_t = p - lr * g_t
updates.append((v, v_t))
updates.append((mg, mg_t))
updates.append((p, p_t))
updates.append((t, t+1))
return updates
def get_output_for(self, input, deterministic=False, **kwargs):
if deterministic:
norm_features = (input-self.avg_batch_mean.dimshuffle(*self.dimshuffle_args)) / T.sqrt(1e-6 + self.avg_batch_var).dimshuffle(*self.dimshuffle_args)
else:
batch_mean = T.mean(input,axis=self.axes_to_sum).flatten()
centered_input = input-batch_mean.dimshuffle(*self.dimshuffle_args)
batch_var = T.mean(T.square(centered_input),axis=self.axes_to_sum).flatten()
batch_stdv = T.sqrt(1e-6 + batch_var)
norm_features = centered_input / batch_stdv.dimshuffle(*self.dimshuffle_args)
# BN updates
new_m = 0.9*self.avg_batch_mean + 0.1*batch_mean
new_v = 0.9*self.avg_batch_var + T.cast((0.1*input.shape[0])/(input.shape[0]-1),th.config.floatX)*batch_var
self.bn_updates = [(self.avg_batch_mean, new_m), (self.avg_batch_var, new_v)]
if hasattr(self, 'g'):
activation = norm_features*self.g.dimshuffle(*self.dimshuffle_args)
else:
activation = norm_features
if hasattr(self, 'b'):
activation += self.b.dimshuffle(*self.dimshuffle_args)
return self.nonlinearity(activation)
def adamax_updates(params, cost, lr=0.001, mom1=0.9, mom2=0.999):
updates = []
grads = T.grad(cost, params)
for p, g in zip(params, grads):
mg = th.shared(np.cast[th.config.floatX](p.get_value() * 0.))
v = th.shared(np.cast[th.config.floatX](p.get_value() * 0.))
if mom1>0:
v_t = mom1*v + (1. - mom1)*g
updates.append((v,v_t))
else:
v_t = g
mg_t = T.maximum(mom2*mg, abs(g))
g_t = v_t / (mg_t + 1e-6)
p_t = p - lr * g_t
updates.append((mg, mg_t))
updates.append((p, p_t))
return updates
def adam_updates(params, cost, lr=0.001, mom1=0.9, mom2=0.999):
updates = []
grads = T.grad(cost, params)
t = th.shared(np.cast[th.config.floatX](1.))
for p, g in zip(params, grads):
v = th.shared(np.cast[th.config.floatX](p.get_value() * 0.))
mg = th.shared(np.cast[th.config.floatX](p.get_value() * 0.))
v_t = mom1*v + (1. - mom1)*g
mg_t = mom2*mg + (1. - mom2)*T.square(g)
v_hat = v_t / (1. - mom1 ** t)
mg_hat = mg_t / (1. - mom2 ** t)
g_t = v_hat / T.sqrt(mg_hat + 1e-8)
p_t = p - lr * g_t
updates.append((v, v_t))
updates.append((mg, mg_t))
updates.append((p, p_t))
updates.append((t, t+1))
return updates
def get_output_for(self, input, deterministic=False, **kwargs):
if deterministic:
norm_features = (input-self.avg_batch_mean.dimshuffle(*self.dimshuffle_args)) / T.sqrt(1e-6 + self.avg_batch_var).dimshuffle(*self.dimshuffle_args)
else:
batch_mean = T.mean(input,axis=self.axes_to_sum).flatten()
centered_input = input-batch_mean.dimshuffle(*self.dimshuffle_args)
batch_var = T.mean(T.square(centered_input),axis=self.axes_to_sum).flatten()
batch_stdv = T.sqrt(1e-6 + batch_var)
norm_features = centered_input / batch_stdv.dimshuffle(*self.dimshuffle_args)
# BN updates
new_m = 0.9*self.avg_batch_mean + 0.1*batch_mean
new_v = 0.9*self.avg_batch_var + T.cast((0.1*input.shape[0])/(input.shape[0]-1.), th.config.floatX)*batch_var
self.bn_updates = [(self.avg_batch_mean, new_m), (self.avg_batch_var, new_v)]
if hasattr(self, 'g'):
activation = norm_features*self.g.dimshuffle(*self.dimshuffle_args)
else:
activation = norm_features
if hasattr(self, 'b'):
activation += self.b.dimshuffle(*self.dimshuffle_args)
return self.nonlinearity(activation)
def adam_updates(params, cost, lr=0.001, mom1=0.9, mom2=0.999):
updates = []
grads = T.grad(cost, params)
t = th.shared(np.cast[th.config.floatX](1.))
for p, g in zip(params, grads):
v = th.shared(np.cast[th.config.floatX](p.get_value() * 0.))
mg = th.shared(np.cast[th.config.floatX](p.get_value() * 0.))
v_t = mom1*v + (1. - mom1)*g
mg_t = mom2*mg + (1. - mom2)*T.square(g)
v_hat = v_t / (1. - mom1 ** t)
mg_hat = mg_t / (1. - mom2 ** t)
g_t = v_hat / T.sqrt(mg_hat + 1e-8)
p_t = p - lr * g_t
updates.append((v, v_t))
updates.append((mg, mg_t))
updates.append((p, p_t))
updates.append((t, t+1))
return updates
def adam_conditional_updates(params, cost, mincost, lr=0.001, mom1=0.9, mom2=0.999): # if cost is less than mincost, don't do update
updates = []
grads = T.grad(cost, params)
t = th.shared(np.cast[th.config.floatX](1.))
for p, g in zip(params, grads):
v = th.shared(np.cast[th.config.floatX](p.get_value() * 0.))
mg = th.shared(np.cast[th.config.floatX](p.get_value() * 0.))
v_t = mom1*v + (1. - mom1)*g
mg_t = mom2*mg + (1. - mom2)*T.square(g)
v_hat = v_t / (1. - mom1 ** t)
mg_hat = mg_t / (1. - mom2 ** t)
g_t = v_hat / T.sqrt(mg_hat + 1e-8)
p_t = p - lr * g_t
updates.append((v, ifelse(cost<mincost,v,v_t)))
updates.append((mg, ifelse(cost<mincost,mg,mg_t)))
updates.append((p, ifelse(cost<mincost,p,p_t)))
updates.append((t, ifelse(cost<mincost,t,t+1)))
return updates
def get_output_for(self, input, deterministic=False, **kwargs):
if deterministic:
norm_features = (input-self.avg_batch_mean.dimshuffle(*self.dimshuffle_args)) / T.sqrt(1e-6 + self.avg_batch_var).dimshuffle(*self.dimshuffle_args)
else:
batch_mean = T.mean(input,axis=self.axes_to_sum).flatten()
centered_input = input-batch_mean.dimshuffle(*self.dimshuffle_args)
batch_var = T.mean(T.square(centered_input),axis=self.axes_to_sum).flatten()
batch_stdv = T.sqrt(1e-6 + batch_var)
norm_features = centered_input / batch_stdv.dimshuffle(*self.dimshuffle_args)
# BN updates
new_m = 0.9*self.avg_batch_mean + 0.1*batch_mean
new_v = 0.9*self.avg_batch_var + T.cast((0.1*input.shape[0])/(input.shape[0]-1),th.config.floatX)*batch_var
self.bn_updates = [(self.avg_batch_mean, new_m), (self.avg_batch_var, new_v)]
if hasattr(self, 'g'):
activation = norm_features*self.g.dimshuffle(*self.dimshuffle_args)
else:
activation = norm_features
if hasattr(self, 'b'):
activation += self.b.dimshuffle(*self.dimshuffle_args)
if self.nonlinearity is not None:
return self.nonlinearity(activation)
else:
return activation
def shared_dataset(data_xy, borrow=True):
""" Function that loads the dataset into shared variables
The reason we store our dataset in shared variables is to allow
Theano to copy it into the GPU memory (when code is run on GPU).
Since copying data into the GPU is slow, copying a minibatch everytime
is needed (the default behaviour if the data is not in a shared
variable) would lead to a large decrease in performance.
"""
data_x, data_y = data_xy
shared_x = theano.shared(np.asarray(data_x,
dtype=theano.config.floatX),
borrow=borrow)
shared_y = theano.shared(np.asarray(data_y,
dtype=theano.config.floatX),
borrow=borrow)
return shared_x, T.cast(shared_y, 'int32')
def shared_dataset(data_xy, borrow=True):
""" Function that loads the dataset into shared variables
The reason we store our dataset in shared variables is to allow
Theano to copy it into the GPU memory (when code is run on GPU).
Since copying data into the GPU is slow, copying a minibatch everytime
is needed (the default behaviour if the data is not in a shared
variable) would lead to a large decrease in performance.
"""
data_x, data_y = data_xy
shared_x = theano.shared(np.asarray(data_x,
dtype=theano.config.floatX),
borrow=borrow)
shared_y = theano.shared(np.asarray(data_y,
dtype=theano.config.floatX),
borrow=borrow)
return shared_x, T.cast(shared_y, 'int32')
def parameter_prediction(self, test_set_x): #, batch_size
""" This function is to predict the output of NN
:param test_set_x: input features for a testing sentence
:type test_set_x: python array variable
:returns: predicted features
"""
n_test_set_x = test_set_x.shape[0]
test_out = theano.function([], self.final_layer.output,
givens={self.x: test_set_x[0:n_test_set_x], self.is_train: np.cast['int32'](0)}, on_unused_input='ignore')
predict_parameter = test_out()
return predict_parameter
def parameter_prediction_S2S(self, test_set_x, test_set_d):
""" This function is to predict the output of NN
:param test_set_x: input features for a testing sentence
:param test_set_d: phone durations for a testing sentence
:type test_set_x: python array variable
:type test_set_d: python array variable
:returns: predicted features
"""
n_test_set_x = test_set_x.shape[0]
test_out = theano.function([], self.final_layer.output,
givens={self.x: test_set_x[0:n_test_set_x], self.d: test_set_d[0:n_test_set_x], self.is_train: np.cast['int32'](0)}, on_unused_input='ignore')
predict_parameter = test_out()
return predict_parameter
def parameter_prediction(self, test_set_x): #, batch_size
""" This function is to predict the output of NN
:param test_set_x: input features for a testing sentence
:type test_set_x: python array variable
:returns: predicted features
"""
n_test_set_x = test_set_x.shape[0]
test_out = theano.function([], self.final_layer.output,
givens={self.x: test_set_x[0:n_test_set_x], self.is_train: np.cast['int32'](0)}, on_unused_input='ignore')
predict_parameter = test_out()
return predict_parameter
def __init__(self, input, prob_drop=0.5):
self.prob_drop = prob_drop
self.prob_keep = 1.0 - prob_drop
self.flag_on = theano.shared(np.cast[theano.config.floatX](1.0))
self.flag_off = 1.0 - self.flag_on # 1 during test
seed_this = DropoutLayer.seed_common.randint(0, 2**31-1)
mask_rng = theano.tensor.shared_randomstreams.RandomStreams(seed_this)
self.mask = mask_rng.binomial(n=1, p=self.prob_keep, size=input.shape)
self.output = \
self.flag_on * T.cast(self.mask, theano.config.floatX) * input + \
self.flag_off * self.prob_keep * input
DropoutLayer.layers.append(self)
print 'dropout layer with P_drop: ' + str(self.prob_drop)
def parameter_prediction(self, test_set_x): #, batch_size
""" This function is to predict the output of NN
:param test_set_x: input features for a testing sentence
:type test_set_x: python array variable
:returns: predicted features
"""
n_test_set_x = test_set_x.shape[0]
test_out = theano.function([], self.final_layer.output,
givens={self.x: test_set_x[0:n_test_set_x], self.is_train: np.cast['int32'](0)}, on_unused_input='ignore')
predict_parameter = test_out()
return predict_parameter
def parameter_prediction_S2S(self, test_set_x, test_set_d):
""" This function is to predict the output of NN
:param test_set_x: input features for a testing sentence
:param test_set_d: phone durations for a testing sentence
:type test_set_x: python array variable
:type test_set_d: python array variable
:returns: predicted features
"""
n_test_set_x = test_set_x.shape[0]
test_out = theano.function([], self.final_layer.output,
givens={self.x: test_set_x[0:n_test_set_x], self.d: test_set_d[0:n_test_set_x], self.is_train: np.cast['int32'](0)}, on_unused_input='ignore')
predict_parameter = test_out()
return predict_parameter
def parameter_prediction(self, test_set_x): #, batch_size
""" This function is to predict the output of NN
:param test_set_x: input features for a testing sentence
:type test_set_x: python array variable
:returns: predicted features
"""
n_test_set_x = test_set_x.shape[0]
test_out = theano.function([], self.final_layer.output,
givens={self.x: test_set_x[0:n_test_set_x], self.is_train: np.cast['int32'](0)}, on_unused_input='ignore')
predict_parameter = test_out()
return predict_parameter
def categorical_accuracy(y_pred, y_true, top_k=1, reduction=tf.reduce_mean,
name="CategoricalAccuracy"):
""" Non-differentiable """
with tf.variable_scope(name):
if y_true.get_shape().ndims == y_pred.get_shape().ndims:
y_true = tf.argmax(y_true, axis=-1)
elif y_true.get_shape().ndims != y_pred.get_shape().ndims - 1:
raise TypeError('rank mismatch between y_true and y_pred')
if top_k == 1:
# standard categorical accuracy
top = tf.argmax(y_pred, axis=-1)
y_true = tf.cast(y_true, top.dtype.base_dtype)
match_values = tf.equal(top, y_true)
else:
match_values = tf.nn.in_top_k(y_pred, tf.cast(y_true, 'int32'),
k=top_k)
match_values = tf.cast(match_values, dtype='float32')
return reduction(match_values)
def to_llr(x, name="LogLikelihoodRatio"):
''' Convert a matrix of probabilities into log-likelihood ratio
:math:`LLR = log(\\frac{prob(data|target)}{prob(data|non-target)})`
'''
if not is_tensor(x):
x /= np.sum(x, axis=-1, keepdims=True)
x = np.clip(x, 10e-8, 1. - 10e-8)
return np.log(x / (np.cast(1., x.dtype) - x))
else:
with tf.variable_scope(name):
x /= tf.reduce_sum(x, axis=-1, keepdims=True)
x = tf.clip_by_value(x, 10e-8, 1. - 10e-8)
return tf.log(x / (tf.cast(1., x.dtype.base_dtype) - x))
# ===========================================================================
# Speech task metrics
# ===========================================================================
def glorot_uniform(shape, gain=1.0, c01b=False):
orig_shape = shape
if c01b:
if len(shape) != 4:
raise RuntimeError(
"If c01b is True, only shapes of length 4 are accepted")
n1, n2 = shape[0], shape[3]
receptive_field_size = shape[1] * shape[2]
else:
if len(shape) < 2:
shape = (1,) + tuple(shape)
n1, n2 = shape[:2]
receptive_field_size = np.prod(shape[2:])
std = gain * np.sqrt(2.0 / ((n1 + n2) * receptive_field_size))
a = 0.0 - np.sqrt(3) * std
b = 0.0 + np.sqrt(3) * std
return np.cast[floatX](
get_rng().uniform(low=a, high=b, size=orig_shape))
def he_normal(shape, gain=1.0, c01b=False):
if gain == 'relu':
gain = np.sqrt(2)
if c01b:
if len(shape) != 4:
raise RuntimeError(
"If c01b is True, only shapes of length 4 are accepted")
fan_in = np.prod(shape[:3])
else:
if len(shape) <= 2:
fan_in = shape[0]
elif len(shape) > 2:
fan_in = np.prod(shape[1:])
std = gain * np.sqrt(1.0 / fan_in)
return np.cast[floatX](
get_rng().normal(0.0, std, size=shape))
def orthogonal(shape, gain=1.0):
if gain == 'relu':
gain = np.sqrt(2)
if len(shape) < 2:
raise RuntimeError("Only shapes of length 2 or more are supported, but "
"given shape:%s" % str(shape))
flat_shape = (shape[0], np.prod(shape[1:]))
a = get_rng().normal(0.0, 1.0, flat_shape)
u, _, v = np.linalg.svd(a, full_matrices=False)
# pick the one with the correct shape
q = u if u.shape == flat_shape else v
q = q.reshape(shape)
return np.cast[floatX](gain * q)
# ===========================================================================
# Fast initialization
# ===========================================================================
def adam_updates(params, cost, lr=0.001, mom1=0.9, mom2=0.999):
updates = []
grads = T.grad(cost, params)
t = th.shared(np.cast[th.config.floatX](1.))
for p, g in zip(params, grads):
v = th.shared(np.cast[th.config.floatX](p.get_value() * 0.))
mg = th.shared(np.cast[th.config.floatX](p.get_value() * 0.))
v_t = mom1*v + (1. - mom1)*g
mg_t = mom2*mg + (1. - mom2)*T.square(g)
v_hat = v_t / (1. - mom1 ** t)
mg_hat = mg_t / (1. - mom2 ** t)
g_t = v_hat / T.sqrt(mg_hat + 1e-8)
p_t = p - lr * g_t
updates.append((v, v_t))
updates.append((mg, mg_t))
updates.append((p, p_t))
updates.append((t, t+1))
return updates
def get_output_for(self, input, deterministic=False, set_bn_updates=True, **kwargs):
if deterministic:
norm_features = (input-self.avg_batch_mean.dimshuffle(*self.dimshuffle_args)) / T.sqrt(1e-6 + self.avg_batch_var).dimshuffle(*self.dimshuffle_args)
else:
batch_mean = T.mean(input,axis=self.axes_to_sum).flatten()
centered_input = input-batch_mean.dimshuffle(*self.dimshuffle_args)
batch_var = T.mean(T.square(centered_input),axis=self.axes_to_sum).flatten()
batch_stdv = T.sqrt(1e-6 + batch_var)
norm_features = centered_input / batch_stdv.dimshuffle(*self.dimshuffle_args)
# BN updates
if set_bn_updates:
new_m = 0.9*self.avg_batch_mean + 0.1*batch_mean
new_v = 0.9*self.avg_batch_var + T.cast((0.1*input.shape[0])/(input.shape[0]-1),th.config.floatX)*batch_var
self.bn_updates = [(self.avg_batch_mean, new_m), (self.avg_batch_var, new_v)]
if hasattr(self, 'g'):
activation = norm_features*self.g.dimshuffle(*self.dimshuffle_args)
else:
activation = norm_features
if hasattr(self, 'b'):
activation += self.b.dimshuffle(*self.dimshuffle_args)
return self.nonlinearity(activation)
def shared_dataset(data_xy, borrow=True):
""" Function that loads the dataset into shared variables
The reason we store our dataset in shared variables is to allow
Theano to copy it into the GPU memory (when code is run on GPU).
Since copying data into the GPU is slow, copying a minibatch everytime
is needed (the default behaviour if the data is not in a shared
variable) would lead to a large decrease in performance.
"""
data_x, data_y = data_xy
shared_x = theano.shared(np.asarray(data_x,
dtype=theano.config.floatX),
borrow=borrow)
shared_y = theano.shared(np.asarray(data_y,
dtype=theano.config.floatX),
borrow=borrow)
return shared_x, T.cast(shared_y, 'int32')
def shared_dataset(data_xy, borrow=True):
""" Function that loads the dataset into shared variables
The reason we store our dataset in shared variables is to allow
Theano to copy it into the GPU memory (when code is run on GPU).
Since copying data into the GPU is slow, copying a minibatch everytime
is needed (the default behaviour if the data is not in a shared
variable) would lead to a large decrease in performance.
"""
data_x, data_y = data_xy
shared_x = theano.shared(np.asarray(data_x,
dtype=theano.config.floatX),
borrow=borrow)
shared_y = theano.shared(np.asarray(data_y,
dtype=theano.config.floatX),
borrow=borrow)
return shared_x, T.cast(shared_y, 'int32')
def __init__(self,
init_momentum,
averaging_coeff=0.95,
stabilizer=1e-2,
use_first_order=False,
bound_inc=False,
momentum_clipping=None):
init_momentum = float(init_momentum)
assert init_momentum >= 0.
assert init_momentum <= 1.
averaging_coeff = float(averaging_coeff)
assert averaging_coeff >= 0.
assert averaging_coeff <= 1.
stabilizer = float(stabilizer)
assert stabilizer >= 0.
self.__dict__.update(locals())
del self.self
self.momentum = sharedX(self.init_momentum)
self.momentum_clipping = momentum_clipping
if momentum_clipping is not None:
self.momentum_clipping = np.cast[config.floatX](momentum_clipping)
def __init__(self,
init_momentum=0.9,
averaging_coeff=0.99,
stabilizer=1e-4,
update_param_norm_ratio=0.003,
gradient_clipping=None):
init_momentum = float(init_momentum)
assert init_momentum >= 0.
assert init_momentum <= 1.
averaging_coeff = float(averaging_coeff)
assert averaging_coeff >= 0.
assert averaging_coeff <= 1.
stabilizer = float(stabilizer)
assert stabilizer >= 0.
self.__dict__.update(locals())
del self.self
self.momentum = sharedX(self.init_momentum)
self.update_param_norm_ratio = update_param_norm_ratio
self.gradient_clipping = gradient_clipping
if gradient_clipping is not None:
self.gradient_clipping = np.cast[config.floatX](gradient_clipping)
def as_floatX(variable):
"""
This code is taken from pylearn2:
Casts a given variable into dtype config.floatX
numpy ndarrays will remain numpy ndarrays
python floats will become 0-D ndarrays
all other types will be treated as theano tensors
"""
if isinstance(variable, float):
return numpy.cast[theano.config.floatX](variable)
if isinstance(variable, numpy.ndarray):
return numpy.cast[theano.config.floatX](variable)
return theano.tensor.cast(variable, theano.config.floatX)