Python tensorflow 模块,nn() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tensorflow.nn()。
def global_pool(inp, kind='avg', keep_dims=False, name=None):
if kind not in ['max', 'avg']:
raise ValueError('Only global avg or max pool is allowed, but'
'you requested {}.'.format(kind))
if name is None:
name = 'global_{}_pool'.format(kind)
h, w = inp.get_shape().as_list()[1:3]
out = getattr(tf.nn, kind + '_pool')(inp,
ksize=[1,h,w,1],
strides=[1,1,1,1],
padding='VALID')
if keep_dims:
output = tf.identity(out, name=name)
else:
output = tf.reshape(out, [out.get_shape().as_list()[0], -1], name=name)
return output
def __init__(self, layers, batch_size, activation_func=tf.nn.sigmoid, saved_graph=None, sess=None, learning_rate=0.0001, batch_norm=False):
"""
@param layers is a list of integers, determining the amount of layers and their size
starting with the input size
"""
if len(layers) < 2:
print("Amount of layers must be greater than 1")
exit(0)
self.batch_size = batch_size
self.learning_rate = learning_rate
self.activation_func = activation_func
self.batch_norm = batch_norm
self.is_training = True
# Use this in data preprocessing
self.layers = layers
self._make_graph(layers)
if saved_graph is not None and sess is not None:
self.import_from_file(sess, saved_graph)
def variable_summaries(var, name, collections=None):
"""Attach a lot of summaries to a Tensor (for TensorBoard visualization).
Args:
- var: Tensor for variable from which we want to log.
- name: Variable name.
- collections: List of collections to save the summary to.
"""
with tf.name_scope(name):
mean = tf.reduce_mean(var)
tf.summary.scalar('mean', mean, collections)
num_params = tf.reduce_prod(tf.shape(var))
tf.summary.scalar('num_params', num_params, collections)
with tf.name_scope('stddev'):
stddev = tf.sqrt(tf.reduce_mean(tf.square(var - mean)))
tf.summary.scalar('stddev', stddev, collections)
tf.summary.scalar('max', tf.reduce_max(var), collections)
tf.summary.scalar('min', tf.reduce_min(var), collections)
tf.summary.histogram('histogram', var, collections)
tf.summary.scalar('sparsity', tf.nn.zero_fraction(var), collections)
def _build_loss(self, readout, labels):
"""Build the layer including the loss and the accuracy.
Args:
readout (tensor): The readout layer. A probability distribution over the classes.
labels (tensor): Labels as integers.
Returns:
tensor: The loss tensor (cross entropy).
"""
with tf.name_scope('loss'):
self.loss = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits=readout, labels=labels))
tf.summary.scalar('cross_entropy', self.loss)
correct_prediction = tf.nn.in_top_k(readout, labels, 1)
self.accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
tf.summary.scalar('accuracy', self.accuracy)
return self.loss
def build_output(self, inputs, inferences):
scores = tf.nn.softmax(inferences, name='scores')
tf.add_to_collection('outputs', scores)
with tf.name_scope('labels'):
label_indices = tf.arg_max(inferences, 1, name='arg_max')
labels = self.classification.output_labels(label_indices)
tf.add_to_collection('outputs', labels)
keys = self.classification.keys(inputs)
if keys:
# Key feature, if it exists, is a passthrough to the output.
# The use of identity is to name the tensor and correspondingly the output field.
keys = tf.identity(keys, name='key')
tf.add_to_collection('outputs', keys)
return {
'label': labels,
'score': scores
}
def _latent(self, x):
if x is None:
mean = None
stddev = None
logits = None
class_predictions = None
z = self.epsilon
else:
enc_output = tf.reshape(x, [-1, self.flags['hidden_size'] * 2])
mean, stddev = tf.split(1, 2, enc_output) # Compute latent variables (z) by calculating mean, stddev
stddev = tf.nn.softplus(stddev)
with tf.variable_scope("y_network"):
mlp = Layers(mean)
mlp.fc(self.flags['num_classes'])
logits = mlp.get_output()
class_predictions = tf.nn.softmax(logits)
z = (mean + self.epsilon * stddev) #* tf.cast(y_hat, tf.float32)
return mean, stddev, class_predictions, logits, z
def flatten(self, keep_prob=1):
"""
Flattens 4D Tensor (from Conv Layer) into 2D Tensor (to FC Layer)
:param keep_prob: int. set to 1 for no dropout
"""
self.count['flat'] += 1
scope = 'flat_' + str(self.count['flat'])
with tf.variable_scope(scope):
# Reshape function
input_nodes = tf.Dimension(
self.input.get_shape()[1] * self.input.get_shape()[2] * self.input.get_shape()[3])
output_shape = tf.stack([-1, input_nodes])
self.input = tf.reshape(self.input, output_shape)
# Dropout function
if keep_prob != 1:
self.input = tf.nn.dropout(self.input, keep_prob=keep_prob)
print(scope + ' output: ' + str(self.input.get_shape()))
def batch_norm_layer(inp):
"""As explained in A. Gerón's book, in the default batch_normalization
there is no scaling, i.e. gamma is set to 1. This makes sense for layers
with no activation function or ReLU (like ours), since the next layers
weights can take care of the scaling. In other circumstances, include
scaling
"""
# get the size from input tensor (remember, 1D convolution -> input tensor 3D)
size = int(inp.shape[2])
batch_mean, batch_var = tf.nn.moments(inp,[0])
scale = tf.Variable(tf.ones([size]))
beta = tf.Variable(tf.zeros([size]))
x = tf.nn.batch_normalization(inp,batch_mean,batch_var,beta,scale,
variance_epsilon=1e-3)
return x
def __init__(self, cell, function, reuse=None):
if not isinstance(cell, tf.contrib.rnn.RNNCell):
raise TypeError('The parameter cell is not an RNNCell.')
if isinstance(function, six.string_types):
try:
function = getattr(tf, function)
except AttributeError:
try:
function = getattr(tf.nn, function)
except AttributeError:
raise ValueError('The desired function "%s" was '
'not found.' % function)
self._cell = cell
self._function = function
def multilayer_perceptron(final_output, weights, biases):
"""MLP over output with attention over enc outputs
Args:
final_output: [batch_size x 2*size]
Returns:
logit: [batch_size x target_label_size]
"""
# Layer 1
layer_1 = tf.add(tf.matmul(final_output, weights["h1"]), biases["b1"])
layer_1 = tf.nn.relu(layer_1)
# Layer 2
layer_2 = tf.add(tf.matmul(layer_1, weights["h2"]), biases["b2"])
layer_2 = tf.nn.relu(layer_2)
# output layer
layer_out = tf.add(tf.matmul(layer_2, weights["out"]), biases["out"])
return layer_out
def simple_rnn(rnn_input, initial_state=None):
"""Implements Simple RNN
Args:
rnn_input: List of tensors of sizes [-1, sentembed_size]
Returns:
encoder_outputs, encoder_state
"""
# Setup cell
cell_enc = get_lstm_cell()
# Setup RNNs
dtype = tf.float16 if FLAGS.use_fp16 else tf.float32
rnn_outputs, rnn_state = tf.nn.rnn(cell_enc, rnn_input, dtype=dtype, initial_state=initial_state)
# print(rnn_outputs)
# print(rnn_state)
return rnn_outputs, rnn_state
def sigmoid_layer(builder, size):
x = builder.tensor()
m = int(x.get_shape()[1])
n = size
w = tf.Variable(tf.random_uniform([m, n], -1.0, 1.0))
b = tf.Variable(tf.random_uniform([n], -1.0, 1.0))
y = tf.nn.sigmoid(tf.matmul(x, w) + b)
return y.builder()
def register_layer_functions(name, f):
explanation = """and the keyword argument `activation_fn` is set to `tf.nn.{0}`.""".format(name)
@TensorBuilder.Register1("tf.contrib.layers", name + "_layer", wrapped=fully_connected, explanation=explanation) #, _return_type=TensorBuilder)
def layer_function(*args, **kwargs):
kwargs['activation_fn'] = f
return fully_connected(*args, **kwargs)
def register_conv_layer_functions(name, f):
explanation = """and the keyword argument `activation_fn` is set to `tf.nn.{0}`.""".format(name)
@TensorBuilder.Register1("tf.contrib.layers", name + "_conv2d_layer", wrapped=convolution2d, explanation=explanation) #, _return_type=TensorBuilder)
def layer_function(*args, **kwargs):
kwargs['activation_fn'] = f
return convolution2d(*args, **kwargs)
def _get_func(self, attr):
custom_func = object.__getattribute__(self, 'CUSTOM_FUNC')
custom_func_names = [f.__name__ for f in custom_func]
if attr in custom_func_names: # is it one of the custom functions?
func = custom_func[custom_func_names.index(attr)]
else:
func = getattr(tf.nn, attr) # ok, so it is a tf.nn function
return func
def parametric_relu(x, name=None):
alphas = tf.get_variable('{}/alpha'.format(name) if name else 'alpha',
x.get_shape()[-1],
initializer=tf.constant_initializer(0.0),
dtype=tf.float32)
return tf.nn.relu(x) + alphas * (x - abs(x)) * 0.5
def selu(x, name=None):
with tf.name_scope('{}/elu'.format(name) if name else 'elu') as _:
alpha = 1.6732632423543772848170429916717
scale = 1.0507009873554804934193349852946
return scale*tf.where(x >= 0.0, x, alpha*tf.nn.elu(x))
# Aliases
def activation_from_string(activation_str):
if activation_str is None:
return tf.identity
return getattr(tf.nn, activation_str)
def get_activation_function(activation_function):
if not activation_function:
return lambda a: a
try:
return getattr(tf.nn, activation_function)
except AttributeError:
raise ValueError(
'Invalid activation function "{}"'.format(activation_function))
def create_model(self, model_input, vocab_size, num_frames, **unused_params):
"""Creates a model which uses a logistic classifier over the average of the
frame-level features.
This class is intended to be an example for implementors of frame level
models. If you want to train a model over averaged features it is more
efficient to average them beforehand rather than on the fly.
Args:
model_input: A 'batch_size' x 'max_frames' x 'num_features' matrix of
input features.
vocab_size: The number of classes in the dataset.
num_frames: A vector of length 'batch' which indicates the number of
frames for each video (before padding).
Returns:
A dictionary with a tensor containing the probability predictions of the
model in the 'predictions' key. The dimensions of the tensor are
'batch_size' x 'num_classes'.
"""
num_frames = tf.cast(tf.expand_dims(num_frames, 1), tf.float32)
feature_size = model_input.get_shape().as_list()[2]
denominators = tf.reshape(
tf.tile(num_frames, [1, feature_size]), [-1, feature_size])
avg_pooled = tf.reduce_sum(model_input,
axis=[1]) / denominators
output = slim.fully_connected(
avg_pooled, vocab_size, activation_fn=tf.nn.sigmoid,
weights_regularizer=slim.l2_regularizer(1e-8))
return {"predictions": output}
def create_model(self, model_input, vocab_size, num_frames, **unused_params):
"""Creates a model which uses a stack of LSTMs to represent the video.
Args:
model_input: A 'batch_size' x 'max_frames' x 'num_features' matrix of
input features.
vocab_size: The number of classes in the dataset.
num_frames: A vector of length 'batch' which indicates the number of
frames for each video (before padding).
Returns:
A dictionary with a tensor containing the probability predictions of the
model in the 'predictions' key. The dimensions of the tensor are
'batch_size' x 'num_classes'.
"""
lstm_size = FLAGS.lstm_cells
number_of_layers = FLAGS.lstm_layers
## Batch normalize the input
stacked_lstm = tf.contrib.rnn.MultiRNNCell(
[
tf.contrib.rnn.BasicLSTMCell(
lstm_size, forget_bias=1.0, state_is_tuple=False)
for _ in range(number_of_layers)
],
state_is_tuple=False)
loss = 0.0
with tf.variable_scope("RNN"):
outputs, state = tf.nn.dynamic_rnn(stacked_lstm, model_input,
sequence_length=num_frames,
dtype=tf.float32)
aggregated_model = getattr(video_level_models,
FLAGS.video_level_classifier_model)
return aggregated_model().create_model(
model_input=state,
vocab_size=vocab_size,
**unused_params)
def create_model(self, model_input, vocab_size, num_frames, **unused_params):
"""
Args:
model_input: A 'batch_size' x 'max_frames' x 'num_features' matrix of
input features.
vocab_size: The number of classes in the dataset.
num_frames: A vector of length 'batch' which indicates the number of
frames for each video (before padding).
Returns:
A dictionary with a tensor containing the probability predictions of the
model in the 'predictions' key. The dimensions of the tensor are
'batch_size' x 'num_classes'.
"""
num_frames = tf.cast(tf.expand_dims(num_frames, 1), tf.float32)
feature_size = model_input.get_shape().as_list()[2]
denominators = tf.reshape(
tf.tile(num_frames, [1, feature_size]), [-1, feature_size])
avg_pooled = tf.reduce_sum(model_input,
axis=[1]) / denominators
# top 5 values for each feature coordinate across frames
#top_5_val = tf.nn.top_k(model_input, 5).values
#max_val = tf.nn
# geometric mean
#geom_mean = tf.sqrt(tf.reduce_prod(model_input, axis=[1]))
output = slim.fully_connected(
avg_pooled, vocab_size, activation_fn=tf.nn.sigmoid,
weights_regularizer=slim.l2_regularizer(1e-8))
with open('frame_level.data','a') as f_handle:
np.savetxt(f_handle,avg_pooled)
return {"predictions": output}
def feedforward_layer(self, input_tensor, layer_number):
"""Build a feedforward layer ended with an activation function.
Args:
input_tensor: The output from the layer before.
layer_number (int): The number of the layer in the network.
Returns:
tensor: The activated output.
"""
layer_spec = self.network_spec['layers'][layer_number]
with tf.name_scope('feedforward' + str(layer_number)):
weighted = self._feedforward_step(input_tensor, layer_spec['size'])
activation = getattr(tf.nn, layer_spec['activation_function'])(weighted)
return activation
def conv_layer(self, input_tensor, layer_number):
"""Build a convolution layer ended with an activation function.
Args:
input_tensor: The output from the layer before.
layer_number (int): The number of the layer in the network.
Returns:
tensor: The activated output.
"""
inchannels, input_tensor = self._ensure_2d(input_tensor)
layer_spec = self.network_spec['layers'][layer_number]
filter_shape = (layer_spec['filter']['height'],
layer_spec['filter']['width'],
inchannels,
layer_spec['filter']['outchannels'])
filter_strides = (layer_spec['strides']['inchannels'],
layer_spec['strides']['x'],
layer_spec['strides']['y'],
layer_spec['strides']['batch'])
with tf.name_scope('conv' + str(layer_number)):
w = self._weight_variable(filter_shape, name='W')
b = self._bias_variable([layer_spec['filter']['outchannels']], name='b')
conv = tf.nn.conv2d(input_tensor, w, strides=filter_strides, padding='SAME')
activation = getattr(tf.nn, layer_spec['activation_function'])(conv + b, name='activation')
return activation
def maxpool_layer(self, input_tensor, layer_number):
"""Build a maxpooling layer.
Args:
input_tensor: The output from the layer before.
layer_number (int): The number of the layer in the network.
Returns:
tensor: The max pooled output.
"""
_, input_tensor = self._ensure_2d(input_tensor)
layer_spec = self.network_spec['layers'][layer_number]
kernel_shape = (1, # First number has to be one for ksize of maxpool layer.
layer_spec['kernel']['height'],
layer_spec['kernel']['width'],
layer_spec['kernel']['outchannels'])
kernel_strides = (layer_spec['strides']['inchannels'],
layer_spec['strides']['x'],
layer_spec['strides']['y'],
layer_spec['strides']['batch'])
with tf.name_scope('maxpool' + str(layer_number)):
pool = tf.nn.max_pool(input_tensor, ksize=kernel_shape,
strides=kernel_strides, padding='SAME', name='maxpool')
return pool
def recur_func(self):
func = self._config.get('Functions', 'recur_func')
if func == 'identity':
return tf.identity
else:
return getattr(tf.nn, func)
def mlp_func(self):
func = self._config.get('Functions', 'mlp_func')
if func == 'identity':
return tf.identity
else:
return getattr(tf.nn, func)
def recur_func(self):
func = self._config.get('Functions', 'recur_func')
if func == 'identity':
return tf.identity
elif func == 'leaky_relu':
return lambda x: tf.maximum(.1*x, x)
else:
return getattr(tf.nn, func)
def info_func(self):
func = self._config.get('Functions', 'info_func')
if func == 'identity':
return tf.identity
elif func == 'leaky_relu':
return lambda x: tf.maximum(.1*x, x)
else:
return getattr(tf.nn, func)
def mlp_func(self):
func = self._config.get('Functions', 'mlp_func')
if func == 'identity':
return tf.identity
elif func == 'leaky_relu':
return lambda x: tf.maximum(.1*x, x)
else:
return getattr(tf.nn, func)
def _init_parser(parser):
"""Initializes the parser for feed-forward models.
"""
optimization = parser.add_argument_group(title='Optimization',
description='Arguments determining the optimizer behavior.')
optimization.add_argument('--learning-rate', metavar='rate', type=float, default=0.01,
help='The magnitude of learning to perform at each step.')
nn = parser.add_argument_group(title='Neural Network',
description='Arguments controlling the structure of the neural network.')
nn.add_argument('--hidden-layers', metavar='units', type=int, required=False,
action=parser.var_args_action,
help='The size of each hidden layer to add.')
def build_training(self, global_steps, inputs, inferences):
with tf.name_scope('target'):
label_indices = self.classification.target_label_indices(inputs)
with tf.name_scope('error'):
cross_entropy = tf.nn.softmax_cross_entropy_with_logits(logits=inferences,
labels=label_indices,
name='softmax_cross_entropy')
loss = tf.reduce_mean(cross_entropy, name='loss')
averager = tf.train.ExponentialMovingAverage(0.99, name='loss_averager')
averaging = averager.apply([loss])
with tf.name_scope(''):
tf.summary.scalar('metrics/loss', loss)
tf.summary.scalar('metrics/loss.average', averager.average(loss))
with tf.control_dependencies([averaging]):
with tf.name_scope(self.args.optimizer.get_name()):
gradients = self.args.optimizer.compute_gradients(loss, var_list=tf.trainable_variables())
train = self.args.optimizer.apply_gradients(gradients, global_steps, name='optimize')
with tf.name_scope(''):
for gradient, t in gradients:
if gradient is not None:
tf.summary.histogram(t.op.name + '.gradients', gradient)
return loss, train
def conv2d(x, W):
"""conv2d returns a 2d convolution layer with full stride."""
return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')
def max_pool_2x2(x):
"""max_pool_2x2 downsamples a feature map by 2X."""
return tf.nn.max_pool(
x, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')
def train(config={'activation': 'relu'}, reporter=None):
global FLAGS, status_reporter, activation_fn
status_reporter = reporter
activation_fn = getattr(tf.nn, config['activation'])
parser = argparse.ArgumentParser()
parser.add_argument(
'--data_dir', type=str, default='/tmp/tensorflow/mnist/input_data',
help='Directory for storing input data')
FLAGS, unparsed = parser.parse_known_args()
tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)
# !!! Example of using the ray.tune Python API !!!
def conv2d(self, filter_size, output_channels, stride=1, padding='SAME', activation_fn=tf.nn.relu, b_value=0.0, s_value=1.0, bn=True, stoch=False):
"""
:param filter_size: int. assumes square filter
:param output_channels: int
:param stride: int
:param padding: 'VALID' or 'SAME'
:param activation_fn: tf.nn function
:param b_value: float
:param s_value: float
"""
self.count['conv'] += 1
self._layer_count += 1
scope = 'conv_' + str(self.count['conv'])
if stoch is True:
clean = False
else:
clean = True
with tf.variable_scope(scope):
input_channels = self.input.get_shape()[3]
output_shape = [filter_size, filter_size, input_channels, output_channels]
w = self.weight_variable(name='weights', shape=output_shape)
self.input = tf.nn.conv2d(self.input, w, strides=[1, stride, stride, 1], padding=padding)
if bn is True:
self.input = self.conv_batch_norm(self.input, clean=clean, count=self._layer_count)
if stoch is True:
self.input = tf.random_normal(tf.shape(self.input)) + self.input
self._noisy_z_dict[self._layer_count] = self.input
if b_value is not None:
b = self.const_variable(name='bias', shape=[output_channels], value=b_value)
self.input = tf.add(self.input, b)
if s_value is not None:
s = self.const_variable(name='scale', shape=[output_channels], value=s_value)
self.input = tf.multiply(self.input, s)
if activation_fn is not None:
self.input = activation_fn(self.input)
self.print_log(scope + ' output: ' + str(self.input.get_shape()))
def fc(self, output_nodes, keep_prob=1, activation_fn=tf.nn.relu, b_value=0.0, s_value=None, bn=False, stoch=False, ladder=False, clean=False):
self.count['fc'] += 1
self._layer_count += 1
scope = 'fc_' + str(self.count['fc'])
with tf.variable_scope(scope):
input_nodes = self.input.get_shape()[1]
output_shape = [input_nodes, output_nodes]
w = self.weight_variable(name='weights', shape=output_shape)
self.input = tf.matmul(self.input, w)
if bn is True:
self.input = self.batch_norm(self.input, clean=clean, count=self._layer_count)
if ladder is True:
b_value = s_value = None
noisy_z_ind = self.layer_num - self.count['deconv'] - self.count['fc']
noisy_z = self._noisy_z_dict[noisy_z_ind]
z_hat = self.ladder_g_function(noisy_z, self.input)
self._z_hat_bn[noisy_z_ind] = (z_hat - self.clean_batch_dict[noisy_z_ind][0]) / self.clean_batch_dict[noisy_z_ind][1]
if stoch is True:
self.input = tf.random_normal(tf.shape(self.input)) + self.input
self._noisy_z_dict[self._layer_count] = self.input
if b_value is not None:
b = self.const_variable(name='bias', shape=[output_nodes], value=b_value)
self.input = tf.add(self.input, b)
if s_value is not None:
s = self.const_variable(name='scale', shape=[output_nodes], value=s_value)
self.input = tf.multiply(self.input, s)
if activation_fn is not None:
self.input = activation_fn(self.input)
if keep_prob != 1:
self.input = tf.nn.dropout(self.input, keep_prob=keep_prob)
self.print_log(scope + ' output: ' + str(self.input.get_shape()))
def batch_norm(self, x, epsilon=1e-3, clean=False, count=1):
# Calculate batch mean and variance
batch_mean1, batch_var1 = tf.nn.moments(x, [0], keep_dims=True)
# Apply the initial batch normalizing transform
z1_hat = (x - batch_mean1) / tf.sqrt(batch_var1 + epsilon)
if clean is True:
self.clean_batch_dict[count] = (tf.squeeze(batch_mean1), tf.squeeze(batch_var1))
self._clean_z[count] = z1_hat
return z1_hat
def conv_batch_norm(self, x, epsilon=1e-3, clean=False, count=1):
# Calculate batch mean and variance
batch_mean1, batch_var1 = tf.nn.moments(x, [0, 1, 2], keep_dims=True)
# Apply the initial batch normalizing transform
z1_hat = (x - batch_mean1) / tf.sqrt(batch_var1 + epsilon)
if clean is True:
self.clean_batch_dict[count] = (tf.squeeze(batch_mean1), tf.squeeze(batch_var1))
self._clean_z[count] = z1_hat
return z1_hat
def conv2d(self, filter_size, output_channels, stride=1, padding='SAME', bn=True, activation_fn=tf.nn.relu,
b_value=0.0, s_value=1.0, trainable=True):
"""
2D Convolutional Layer.
:param filter_size: int. assumes square filter
:param output_channels: int
:param stride: int
:param padding: 'VALID' or 'SAME'
:param activation_fn: tf.nn function
:param b_value: float
:param s_value: float
"""
self.count['conv'] += 1
scope = 'conv_' + str(self.count['conv'])
with tf.variable_scope(scope):
# Conv function
input_channels = self.input.get_shape()[3]
if filter_size == 0: # outputs a 1x1 feature map; used for FCN
filter_size = self.input.get_shape()[2]
padding = 'VALID'
output_shape = [filter_size, filter_size, input_channels, output_channels]
w = self.weight_variable(name='weights', shape=output_shape, trainable=trainable)
self.input = tf.nn.conv2d(self.input, w, strides=[1, stride, stride, 1], padding=padding)
if bn is True: # batch normalization
self.input = self.batch_norm(self.input)
if b_value is not None: # bias value
b = self.const_variable(name='bias', shape=[output_channels], value=b_value, trainable=trainable)
self.input = tf.add(self.input, b)
if s_value is not None: # scale value
s = self.const_variable(name='scale', shape=[output_channels], value=s_value, trainable=trainable)
self.input = tf.multiply(self.input, s)
if activation_fn is not None: # activation function
self.input = activation_fn(self.input)
print(scope + ' output: ' + str(self.input.get_shape()))
def fc(self, output_nodes, keep_prob=1, activation_fn=tf.nn.relu, b_value=0.0, s_value=1.0, bn=True,
trainable=True):
"""
Fully Connected Layer
:param output_nodes: int
:param keep_prob: int. set to 1 for no dropout
:param activation_fn: tf.nn function
:param b_value: float or None
:param s_value: float or None
:param bn: bool
"""
self.count['fc'] += 1
scope = 'fc_' + str(self.count['fc'])
with tf.variable_scope(scope):
# Flatten if necessary
if len(self.input.get_shape()) == 4:
input_nodes = tf.Dimension(
self.input.get_shape()[1] * self.input.get_shape()[2] * self.input.get_shape()[3])
output_shape = tf.stack([-1, input_nodes])
self.input = tf.reshape(self.input, output_shape)
# Matrix Multiplication Function
input_nodes = self.input.get_shape()[1]
output_shape = [input_nodes, output_nodes]
w = self.weight_variable(name='weights', shape=output_shape, trainable=trainable)
self.input = tf.matmul(self.input, w)
if bn is True: # batch normalization
self.input = self.batch_norm(self.input, 'fc')
if b_value is not None: # bias value
b = self.const_variable(name='bias', shape=[output_nodes], value=b_value, trainable=trainable)
self.input = tf.add(self.input, b)
if s_value is not None: # scale value
s = self.const_variable(name='scale', shape=[output_nodes], value=s_value, trainable=trainable)
self.input = tf.multiply(self.input, s)
if activation_fn is not None: # activation function
self.input = activation_fn(self.input)
if keep_prob != 1: # dropout function
self.input = tf.nn.dropout(self.input, keep_prob=keep_prob)
print(scope + ' output: ' + str(self.input.get_shape()))
def maxpool(self, k=2, s=None, globe=False):
"""
Takes max value over a k x k area in each input map, or over the entire map (global = True)
:param k: int
:param globe: int, whether to pool over each feature map in its entirety
"""
self.count['mp'] += 1
scope = 'maxpool_' + str(self.count['mp'])
with tf.variable_scope(scope):
if globe is True: # Global Pool Parameters
k1 = self.input.get_shape()[1]
k2 = self.input.get_shape()[2]
s1 = 1
s2 = 1
padding = 'VALID'
else:
k1 = k
k2 = k
if s is None:
s1 = k
s2 = k
else:
s1 = s
s2 = s
padding = 'SAME'
# Max Pool Function
self.input = tf.nn.max_pool(self.input, ksize=[1, k1, k2, 1], strides=[1, s1, s2, 1], padding=padding)
print(scope + ' output: ' + str(self.input.get_shape()))
def avgpool(self, k=2, s=None, globe=False):
"""
Averages the values over a k x k area in each input map, or over the entire map (global = True)
:param k: int
:param globe: int, whether to pool over each feature map in its entirety
"""
self.count['ap'] += 1
scope = 'avgpool_' + str(self.count['mp'])
with tf.variable_scope(scope):
if globe is True: # Global Pool Parameters
k1 = self.input.get_shape()[1]
k2 = self.input.get_shape()[2]
s1 = 1
s2 = 1
padding = 'VALID'
else:
k1 = k
k2 = k
if s is None:
s1 = k
s2 = k
else:
s1 = s
s2 = s
padding = 'SAME'
# Average Pool Function
self.input = tf.nn.avg_pool(self.input, ksize=[1, k1, k2, 1], strides=[1, s1, s2, 1], padding=padding)
print(scope + ' output: ' + str(self.input.get_shape()))
def noisy_and(self, num_classes, trainable=True):
""" Multiple Instance Learning (MIL), flexible pooling function
:param num_classes: int, determine number of output maps
"""
assert self.input.get_shape()[3] == num_classes # input tensor should have map depth equal to # of classes
scope = 'noisyAND'
with tf.variable_scope(scope):
a = self.const_variable(name='a', shape=[1], value=1.0, trainable=trainable)
b = self.const_variable(name='b', shape=[1, num_classes], value=0.0, trainable=trainable)
mean = tf.reduce_mean(self.input, axis=[1, 2])
self.input = (tf.nn.sigmoid(a * (mean - b)) - tf.nn.sigmoid(-a * b)) / (
tf.sigmoid(a * (1 - b)) - tf.sigmoid(-a * b))
print(scope + ' output: ' + str(self.input.get_shape()))
def weight_variable(name, shape, trainable):
"""
:param name: string
:param shape: 4D array
:return: tf variable
"""
w = tf.get_variable(name=name, shape=shape, initializer=tf.contrib.layers.variance_scaling_initializer(),
trainable=trainable)
weights_norm = tf.reduce_sum(tf.nn.l2_loss(w),
name=name + '_norm') # Should user want to optimize weight decay
tf.add_to_collection('weight_losses', weights_norm)
return w
def conv1d_layer(inp, filter_shape):
"""This is a 1d conv, so filter_shape = [dim, input_channels, out_channels]"""
W = tf.Variable(tf.truncated_normal(filter_shape, stddev=0.01))
b = tf.Variable(tf.random_normal(shape=[filter_shape[2]]))
# or you could initialize it as constant
# b = tf.Variable(tf.constant(0.1, shape=[filter_shape[3]]))
x = tf.nn.conv1d(inp,W,stride=1,padding="VALID")
x = tf.nn.bias_add(x, b)
x = tf.nn.relu(x)
return x
def max_pool1d_layer(inp, ksize, strides):
"""tf.nn does not have max_pool_1d, so we have to expand the incoming layer
as if we were dealing with a 2D convolution and then squeeze it again.
Again, since this is a 1D conv, the size of the window (ksize) and the stride
of the sliding window must have only one dimension (height) != 1
"""
x = tf.expand_dims(inp, 3)
x = tf.nn.max_pool(x, ksize=ksize, strides=strides, padding="VALID")
x = tf.squeeze(x, [3])
return x
def dense_layer(inp, n_neurons):
# input to a fully connected layer -> 2D [batch_size, n_inputs]
n_inputs = int(inp.shape[1])
W = tf.Variable(tf.truncated_normal((n_inputs,n_neurons), stddev=0.1))
b = tf.Variable(tf.random_normal(shape=[n_neurons]))
# or if you prefer
# b = tf.Variable(tf.zeros([n_neurons]))
x = tf.matmul(inp,W) + b
x = tf.nn.relu(x)
return x
def get_discriminator_op(self, r_preds, g_preds, d_weights):
"""Returns an op that updates the discriminator weights correctly.
Args:
r_preds: Tensor with shape (batch_size, num_timesteps, 1), the
disciminator predictions for real data.
g_preds: Tensor with shape (batch_size, num_timesteps, 1), the
discriminator predictions for generated data.
d_weights: a list of trainable tensors representing the weights
associated with the discriminator model.
Returns:
dis_op, the op to run to train the discriminator.
"""
with tf.variable_scope('loss/discriminator'):
discriminator_opt = tf.train.AdamOptimizer(1e-3)
eps = 1e-12
r_loss = -tf.reduce_mean(tf.log(r_preds + eps))
f_loss = -tf.reduce_mean(tf.log(1 + eps - g_preds))
dis_loss = r_loss + f_loss
# dis_loss = tf.reduce_mean(g_preds) - tf.reduce_mean(r_preds)
# tf.summary.scalar('real', r_loss)
# tf.summary.scalar('generated', f_loss)
with tf.variable_scope('regularization'):
dis_reg_loss = sum([tf.nn.l2_loss(w) for w in d_weights]) * 1e-6
tf.summary.scalar('regularization', dis_reg_loss)
total_loss = dis_loss + dis_reg_loss
with tf.variable_scope('discriminator_update'):
dis_op = self.get_updates(total_loss, discriminator_opt,
d_weights)
tf.summary.scalar('total', total_loss)
return dis_op
def __init__(self, n_input, n_hidden, activation_func='softplus',
optimizer_name='AdamOptimizer',
learning_rate=0.001,
logdir='/tmp',
log_every_n=100,
session_kwargs={},
seed=42,
tied_weights=False,
linear_decoder=True,
):
'''
params:
activation_func (string): a name of activation_func in tf.nn
optimizer_name (string): a name of the optimizer object name tf.train
'''
self.n_input = n_input
self.n_hidden = n_hidden
self.activation_func = activation_func
self.optimizer_name = optimizer_name
self.learning_rate = learning_rate
self.logdir = logdir
self.log_every_n = log_every_n
self.session_kwargs = session_kwargs
self.seed = seed
self.tied_weights = tied_weights
self.linear_decoder = linear_decoder
self._init_all()