我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用theano.tensor.tanh()。
def gelu(x): return 0.5 * x * (1 + T.tanh(T.sqrt(2 / np.pi) * (x + 0.044715 * T.pow(x, 3))))
def lyr_lstm( self, name_, s_x_, s_cell_, s_hid_, idim_, hdim_, axis_=-1, lyr_linear_=None, op_act_=T.tanh, op_gate_=T.nnet.sigmoid): s_inp = T.join(axis_, s_x_, s_hid_) if lyr_linear_ is None: lyr_linear_ = self.lyr_linear s_gates_lin, s_inp_lin = T.split( lyr_linear_(name_+'_rec', s_inp, idim_+hdim_, hdim_*4), [hdim_*3,hdim_], 2, axis=axis_) s_igate, s_fgate, s_ogate = T.split(op_gate_(s_gates_lin), [hdim_]*3, 3, axis=axis_) s_cell_tp1 = s_igate*op_act_(s_inp_lin) + s_fgate*s_cell_ s_hid_tp1 = op_act_(s_cell_tp1)*s_ogate return s_cell_tp1, s_hid_tp1
def __VanillaRNNstep( name, input_dim, hidden_dim, current_inp, last_hidden, weightnorm=True): """ CAUTION: Not for stand-alone usage. It is defined here (instead of inside VanillaRNN function) to not clutter the code. :todo: - Implement! - Test! """ # S_t = tanh(U*X_t+W*S_{t-1}) raise NotImplementedError
def _step(self, x_, m_, h_, c_): preact = tensor.dot(h_, self.U) + x_ i = tensor.nnet.sigmoid(_slice(preact, 0, self.hidden_dim)) f = tensor.nnet.sigmoid(_slice(preact, 1, self.hidden_dim) + self.forget_bias) o = tensor.nnet.sigmoid(_slice(preact, 2, self.hidden_dim)) j = tensor.tanh(_slice(preact, 3, self.hidden_dim)) c = f * c_ + i * j c = m_[:, None] * c + (1. - m_)[:, None] * c_ h = o * tensor.tanh(c) if self.recurrent_dropout_layer != None: h = self.recurrent_dropout_layer.connect(h, self.is_train) h = m_[:, None] * h + (1. - m_)[:, None] * h_ return h, c
def _step(self, x_, m_, h_, c_): preact= tensor.dot(h_, self.U) + _slice(x_, 0, self.hidden_dim * 5) # i: input. f: forget. o: output. t: transform. # j: input w\ non-linearity. k: input w\o non-linearity. i = tensor.nnet.sigmoid(_slice(preact, 0, self.hidden_dim)) f = tensor.nnet.sigmoid(_slice(preact, 1, self.hidden_dim) + self.forget_bias) o = tensor.nnet.sigmoid(_slice(preact, 2, self.hidden_dim)) t = tensor.nnet.sigmoid(_slice(preact, 3, self.hidden_dim)) j = tensor.tanh(_slice(preact, 4, self.hidden_dim)) k = _slice(x_, 5, self.hidden_dim) c = f * c_ + i * j c = m_[:, None] * c + (1. - m_)[:, None] * c_ h = t * o * tensor.tanh(c) + (1. - t) * k if self.recurrent_dropout_layer != None: h = self.recurrent_dropout_layer.connect(h, self.is_train) h = m_[:, None] * h + (1. - m_)[:, None] * h_ return h, c
def _step(self, x_, px_, m_, h_, c_): preact = tensor.dot(h_, self.U) + px_ # i: input. f: forget. o: output. t: transform. # j: input w\ non-linearity. k: input w\o non-linearity. i = tensor.nnet.sigmoid(_slice(preact, 0, self.hidden_dim)) f = tensor.nnet.sigmoid(_slice(preact, 1, self.hidden_dim) + self.forget_bias) o = tensor.nnet.sigmoid(_slice(preact, 2, self.hidden_dim)) t = tensor.nnet.sigmoid(_slice(preact, 3, self.hidden_dim)) j = tensor.tanh(_slice(preact, 4, self.hidden_dim)) c = f * c_ + i * j c = m_[:, None] * c + (1. - m_)[:, None] * c_ h = t * o * tensor.tanh(c) + (1. - t) * x_ if self.recurrent_dropout_layer != None: h = self.recurrent_dropout_layer.connect(h, self.is_train) h = m_[:, None] * h + (1. - m_)[:, None] * h_ return h, c
def _step(self, x_, px_, m_, h_, c_): preact = tensor.dot(h_, self.U) + px_ i = tensor.nnet.sigmoid(_slice(preact, 0, self.hidden_dim)) f = tensor.nnet.sigmoid(_slice(preact, 1, self.hidden_dim) + self.forget_bias) o = tensor.nnet.sigmoid(_slice(preact, 2, self.hidden_dim)) j = tensor.tanh(_slice(preact, 3, self.hidden_dim)) c = f * c_ + i * j c = m_[:, None] * c + (1. - m_)[:, None] * c_ # Residual connection. h = o * tensor.tanh(c) + x_ if self.recurrent_dropout_layer != None: h = self.recurrent_dropout_layer.connect(h, self.is_train) h = m_[:, None] * h + (1. - m_)[:, None] * h_ return h, c
def __init__(self, inputDim=None, nFilters=None, filterDim=None, activation=T.tanh, filter_shape=None, image_shape=None, outputDim=None, stride=(1, 1), border_mode='valid'): """ :type filter_shape: tuple or list of length 4 :param filter_shape: (number of filters, num inputVar feature maps, filter height,filter width) :type image_shape: tuple or list of length 4 :param image_shape: (batch size, num inputVar feature maps, image height, image width) :type stride: tuple or list of length 2 :param stride: the downsampling (pooling) factor (#rows,#cols) """ super(ConvLayerParams, self).__init__(inputDim, outputDim) self._nFilters = nFilters self._filterDim = filterDim self._filter_shape = filter_shape self._image_shape = image_shape self._activation = activation self._stride = stride self._border_mode = border_mode self.update()
def __init__(self, inputDim=None, nFilters=None, filterDim=None, activation=T.tanh, poolsize=(1, 1), poolType=0, filter_shape=None, image_shape=None, outputDim=None, stride=(1, 1), border_mode='valid'): """ :type filter_shape: tuple or list of length 4 :param filter_shape: (number of filters, num inputVar feature maps, filter height,filter width) :type image_shape: tuple or list of length 4 :param image_shape: (batch size, num inputVar feature maps, image height, image width) :type poolsize: tuple or list of length 2 :param poolsize: the downsampling (pooling) factor (#rows,#cols) """ super(ConvPoolLayerParams, self).__init__(inputDim, outputDim) self._nFilters = nFilters self._filterDim = filterDim self._poolsize = poolsize self._poolType = poolType self._filter_shape = filter_shape self._image_shape = image_shape self._activation = activation self._stride = stride self._border_mode = border_mode self.update()
def lyr_gru_flat( self, name_, s_x_, s_state_, idim_, hdim_, axis_=-1, lyr_linear_=None, op_act_=T.tanh, op_gate_=T.nnet.sigmoid, params_group_='params' ): ''' GRU layer, flat version In order to use, you need to provide state variable ''' if lyr_linear_ is None: lyr_linear_ = self.lyr_linear s_igate = lyr_linear_(name_+'_igate', idim_+hdim_, idim_, params_group_=params_group_) s_inp_gated = T.join(axis_, s_x_ * op_gate_(s_igate), s_state_) s_gate_lin, s_state_tp1_lin = T.split(lyr_linear_(name_+'_gate', s_inp_gated, idim_+hdim_, hdim_*2), [hdim_,hdim_], 2, axis_) s_gate = op_gate_(s_gate_lin) return s_state_*s_gate + op_act_(s_state_tp1_lin)*(1.-s_gate)
def gru_layer(tparams, emb, options): hiddenDimSize = options['hiddenDimSize'] timesteps = emb.shape[0] if emb.ndim == 3: n_samples = emb.shape[1] else: n_samples = 1 def stepFn(wx, h, U_gru): uh = T.dot(h, U_gru) r = T.nnet.sigmoid(_slice(wx, 0, hiddenDimSize) + _slice(uh, 0, hiddenDimSize)) z = T.nnet.sigmoid(_slice(wx, 1, hiddenDimSize) + _slice(uh, 1, hiddenDimSize)) h_tilde = T.tanh(_slice(wx, 2, hiddenDimSize) + r * _slice(uh, 2, hiddenDimSize)) h_new = z * h + ((1. - z) * h_tilde) return h_new Wx = T.dot(emb, tparams['W_gru']) + tparams['b_gru'] results, updates = theano.scan(fn=stepFn, sequences=[Wx], outputs_info=T.alloc(numpy_floatX(0.0), n_samples, hiddenDimSize), non_sequences=[tparams['U_gru']], name='gru_layer', n_steps=timesteps) return results
def one_step(self, x, h_tm1, s_tm1): """ Run the forward pass for a single timestep of a LSTM h_tm1: initial h s_tm1: initial s (cell state) """ g = T.tanh(T.dot(x, self.W_gx) + T.dot(h_tm1, self.W_gh) + self.b_g) i = T.nnet.sigmoid(T.dot(x, self.W_ix) + T.dot(h_tm1, self.W_ih) + self.b_i) f = T.nnet.sigmoid(T.dot(x, self.W_fx) + T.dot(h_tm1, self.W_fh) + self.b_f) o = T.nnet.sigmoid(T.dot(x, self.W_ox) + T.dot(h_tm1, self.W_oh) + self.b_o) s = i * g + s_tm1 * f h = T.tanh(s) * o return h, s
def set_net_params(self): '''Returns MLP parameters for scan.''' super(GRU, self).set_net_params() if self.input_net_aux is None: self.input_net_aux = MLP( self.dim_in, 2 * self.dim_h, 2 * self.dim_hs[0], 1, rng=self.rng, trng=self.trng, h_act='T.nnet.sigmoid', out_act='T.tanh', name='input_net_aux') else: assert self.input_net_aux.dim_in == self.dim_in assert self.input_net_aux.dim_out == 2 * self.dim_hs[0] self.input_net_aux.name = self.name + '_input_net_aux' self.nets.append(self.input_net_aux) for i in xrange(self.n_layers - 1): n = MLP(self.dim_hs[i], 2 * self.dim_hs[i+1], rng=self.rng, trng=self.trng, distribution='centered_binomial', name='rnn_net_aux%d' % i) self.inter_nets.append(n) #insert(2 * i + 1, n)
def _step(self, m, y, h_, Ur): '''Step function for RNN call. Args: m (T.tensor): masks. y (T.tensor): inputs. h_ (T.tensor): recurrent state. Ur (theano.shared): recurrent connection. Returns: T.tensor: next recurrent state. ''' preact = T.dot(h_, Ur) + y h = T.tanh(preact) h = m * h + (1 - m) * h_ return h
def fullyconnected_layer(tparams, state_below, options, prefix, activ='lambda x: x', **kwargs): """ compute the forward pass for a fully connected layer Parameters ---------- tparams : OrderedDict of theano shared variables, {parameter name: value} state_below : theano 3d tensor, input data, dimensions: (num of time steps, batch size, dim of vector) options : dictionary, {hyperparameter: value} prefix : string, layer name activ : string, activation function: 'liner', 'tanh', or 'rectifier' Returns ------- : theano 3d tensor, output data, dimensions: (num of time steps, batch size, dim of vector) """ return eval(activ)(tensor.dot(state_below, tparams[p_name(prefix, 'W')]) + tparams[p_name(prefix, 'b')])
def gate_layer(tparams, X_word, X_char, options, prefix, pretrain_mode, activ='lambda x: x', **kwargs): """ compute the forward pass for a gate layer Parameters ---------- tparams : OrderedDict of theano shared variables, {parameter name: value} X_word : theano 3d tensor, word input, dimensions: (num of time steps, batch size, dim of vector) X_char : theano 3d tensor, char input, dimensions: (num of time steps, batch size, dim of vector) options : dictionary, {hyperparameter: value} prefix : string, layer name pretrain_mode : theano shared scalar, 0. = word only, 1. = char only, 2. = word & char activ : string, activation function: 'liner', 'tanh', or 'rectifier' Returns ------- X : theano 3d tensor, final vector, dimensions: (num of time steps, batch size, dim of vector) """ # compute gating values, Eq.(3) G = tensor.nnet.sigmoid(tensor.dot(X_word, tparams[p_name(prefix, 'v')]) + tparams[p_name(prefix, 'b')][0]) X = ifelse(tensor.le(pretrain_mode, numpy.float32(1.)), ifelse(tensor.eq(pretrain_mode, numpy.float32(0.)), X_word, X_char), G[:, :, None] * X_char + (1. - G)[:, :, None] * X_word) return eval(activ)(X)
def concat_layer(tparams, X_word, X_char, options, prefix, pretrain_mode, activ='lambda x: x', **kwargs): """ compute the forward pass for a concat layer Parameters ---------- tparams : OrderedDict of theano shared variables, {parameter name: value} X_word : theano 3d tensor, word input, dimensions: (num of time steps, batch size, dim of vector) X_char : theano 3d tensor, char input, dimensions: (num of time steps, batch size, dim of vector) options : dictionary, {hyperparameter: value} prefix : string, layer name pretrain_mode : theano shared scalar, 0. = word only, 1. = char only, 2. = word & char activ : string, activation function: 'liner', 'tanh', or 'rectifier' Returns ------- X : theano 3d tensor, final vector, dimensions: (num of time steps, batch size, dim of vector) """ X = ifelse(tensor.le(pretrain_mode, numpy.float32(1.)), ifelse(tensor.eq(pretrain_mode, numpy.float32(0.)), X_word, X_char), tensor.dot(tensor.concatenate([X_word, X_char], axis=2), tparams[p_name(prefix, 'W')]) + tparams[p_name(prefix, 'b')]) return eval(activ)(X)
def __init__(self, n_in, n_out, activation=tanh, clip_gradients=False, init_zero=False): self.n_in = n_in self.n_out = n_out self.activation = activation self.clip_gradients = clip_gradients #self.in_gate = RecurrentLayer(n_in, n_out, sigmoid, clip_gradients, init_zero) #self.forget_gate = RecurrentLayer(n_in, n_out, sigmoid, clip_gradients, init_zero) #self.out_gate = RecurrentLayer(n_in, n_out, sigmoid, clip_gradients, init_zero) self.in_gate = RecurrentLayer(n_in+n_out, n_out, sigmoid, clip_gradients, init_zero) self.out_gate = RecurrentLayer(n_in+n_out, n_out, sigmoid, clip_gradients, init_zero) self.input_layer = RecurrentLayer(n_in, n_out, activation, clip_gradients, init_zero) self.internal_layers = [ self.input_layer, self.in_gate, self.out_gate]#, self.forget_gate]
def __init__(self, n_in, n_out, activation=tanh, order=1, clip_gradients=False, BN=False): self.n_in = n_in self.n_out = n_out self.activation = activation self.order = order self.clip_gradients = clip_gradients # batch, in, row, col self.input_shape = (None, n_in, 1, None) # out, in, row, col self.filter_shape = (n_out, n_in, 1, order) self.W = create_shared(random_init(self.filter_shape), name="W") if not BN: self.bias = create_shared(random_init((n_out,)), name="bias") self.BNLayer = None self.BN = BN if BN: # calculate appropriate input_shape, (mini_batch_size, # of channel, # row, # column) new_shape = list(self.input_shape) new_shape[1] = self.filter_shape[0] new_shape = tuple(new_shape) self.BNLayer = BatchNormalization(new_shape, mode=1)
def gru_layer(tparams, emb, layerIndex, hiddenDimSize, mask=None): timesteps = emb.shape[0] if emb.ndim == 3: n_samples = emb.shape[1] else: n_samples = 1 W_rx = T.dot(emb, tparams['W_r_'+layerIndex]) W_zx = T.dot(emb, tparams['W_z_'+layerIndex]) Wx = T.dot(emb, tparams['W_'+layerIndex]) def stepFn(stepMask, wrx, wzx, wx, h): r = T.nnet.sigmoid(wrx + T.dot(h, tparams['U_r_'+layerIndex]) + tparams['b_r_'+layerIndex]) z = T.nnet.sigmoid(wzx + T.dot(h, tparams['U_z_'+layerIndex]) + tparams['b_z_'+layerIndex]) h_tilde = T.tanh(wx + T.dot(r*h, tparams['U_'+layerIndex]) + tparams['b_'+layerIndex]) h_new = z * h + ((1. - z) * h_tilde) h_new = stepMask[:, None] * h_new + (1. - stepMask)[:, None] * h return h_new#, output, time results, updates = theano.scan(fn=stepFn, sequences=[mask,W_rx,W_zx,Wx], outputs_info=T.alloc(numpy_floatX(0.0), n_samples, hiddenDimSize), name='gru_layer'+layerIndex, n_steps=timesteps) return results
def gru_layer(tparams, emb, layerIndex, hiddenDimSize, mask=None): timesteps = emb.shape[0] if emb.ndim == 3: n_samples = emb.shape[1] else: n_samples = 1 W_rx = T.dot(emb, tparams['W_r_'+layerIndex]) W_zx = T.dot(emb, tparams['W_z_'+layerIndex]) Wx = T.dot(emb, tparams['W_'+layerIndex]) def stepFn(stepMask, wrx, wzx, wx, h): r = T.nnet.sigmoid(wrx + T.dot(h, tparams['U_r_'+layerIndex]) + tparams['b_r_'+layerIndex]) z = T.nnet.sigmoid(wzx + T.dot(h, tparams['U_z_'+layerIndex]) + tparams['b_z_'+layerIndex]) h_tilde = T.tanh(wx + T.dot(r*h, tparams['U_'+layerIndex]) + tparams['b_'+layerIndex]) h_new = z * h + ((1. - z) * h_tilde) h_new = stepMask[:, None] * h_new + (1. - stepMask)[:, None] * h return h_new results, updates = theano.scan(fn=stepFn, sequences=[mask,W_rx,W_zx,Wx], outputs_info=T.alloc(numpy_floatX(0.0), n_samples, hiddenDimSize), name='gru_layer'+layerIndex, n_steps=timesteps) return results
def recurrent_as_activation_function(self, Wix, Uix, h_tm1, c_tm1, y_tm1): """ Implement the recurrent unit as an activation function. This function is called by self.__init__(). :param Wix: it equals to W^{hx}x_{t}, as it does not relate with recurrent, pre-calculate the value for fast computation :type Wix: matrix :param h_tm1: contains the hidden activation from previous time step :type h_tm1: matrix, each row means a hidden activation vector of a time step :param c_tm1: this parameter is not used, just to keep the interface consistent with LSTM :returns: h_t is the hidden activation of current time step """ h_t = T.tanh(Wix + T.dot(h_tm1, self.W_hi) + T.dot(y_tm1, self.W_yi) + self.b_i) # # simple recurrent decoder #y_t = T.dot(h_t, self.U_hi) + self.b # recurrent output and additional input y_t = Uix + T.dot(h_t, self.U_hi) + T.dot(y_tm1, self.U_yi) + self.b c_t = h_t return h_t, c_t, y_t
def recurrent_as_activation_function(self, Wix, Wiy, h_tm1, c_tm1): """ Implement the recurrent unit as an activation function. This function is called by self.__init__(). :param Wix: it equals to W^{hx}x_{t}, as it does not relate with recurrent, pre-calculate the value for fast computation :type Wix: matrix :param h_tm1: contains the hidden activation from previous time step :type h_tm1: matrix, each row means a hidden activation vector of a time step :param c_tm1: this parameter is not used, just to keep the interface consistent with LSTM :returns: h_t is the hidden activation of current time step """ h_t = T.tanh(Wix + T.dot(h_tm1, self.W_hi) + Wiy + self.b_i) # c_t = h_t return h_t, c_t
def lstm_as_activation_function(self, Wix, Wfx, Wcx, Wox, h_tm1, c_tm1, y_tm1): """ This function treats the LSTM block as an activation function, and implements the standard LSTM activation function. The meaning of each input and output parameters can be found in :func:`layers.gating.LstmBase.recurrent_fn` """ i_t = T.nnet.sigmoid(Wix + T.dot(h_tm1, self.W_hi) + self.w_ci * c_tm1 + self.b_i) # f_t = T.nnet.sigmoid(Wfx + T.dot(h_tm1, self.W_hf) + self.w_cf * c_tm1 + self.b_f) # c_t = f_t * c_tm1 + i_t * T.tanh(Wcx + T.dot(h_tm1, self.W_hc) + T.dot(y_tm1, self.W_yi) + self.b_c) o_t = T.nnet.sigmoid(Wox + T.dot(h_tm1, self.W_ho) + self.w_co * c_t + self.b_o) h_t = o_t * T.tanh(c_t) y_t = T.dot(h_t, self.U_ho) + self.b return h_t, c_t, y_t #, i_t, f_t, o_t
def lstm_as_activation_function(self, Wix, Wfx, Wcx, Wox, h_tm1, c_tm1): """ This function treats the LSTM block as an activation function, and implements the standard LSTM activation function. The meaning of each input and output parameters can be found in :func:`layers.gating.LstmBase.recurrent_fn` """ i_t = T.nnet.sigmoid(Wix + T.dot(h_tm1, self.W_hi) + self.w_ci * c_tm1 + self.b_i) # f_t = T.nnet.sigmoid(Wfx + T.dot(h_tm1, self.W_hf) + self.w_cf * c_tm1 + self.b_f) # c_t = f_t * c_tm1 + i_t * T.tanh(Wcx + T.dot(h_tm1, self.W_hc) + self.b_c) o_t = T.nnet.sigmoid(Wox + T.dot(h_tm1, self.W_ho) + self.w_co * c_t + self.b_o) h_t = o_t * T.tanh(c_t) return h_t, c_t#, i_t, f_t, o_t
def apply_activation(self, lin_output, activation): if activation == 'SIGMOID': final_output = T.nnet.sigmoid(lin_output) elif activation == 'TANH': final_output = T.tanh(lin_output) elif activation == 'LINEAR': final_output = lin_output elif activation == 'ReLU': ## rectifier linear unit final_output = T.maximum(0.0, lin_output) elif activation == 'ReSU': ## rectifier smooth unit final_output = numpy.log(1.0 + numpy.exp(lin_output)) else: self.logger.critical('the input activation function: %s is not supported right now. Please modify layers.py to support' % (activation)) raise return final_output
def recurrent_as_activation_function(self, Wix, h_tm1, c_tm1): """ Implement the recurrent unit as an activation function. This function is called by self.__init__(). :param Wix: it equals to W^{hx}x_{t}, as it does not relate with recurrent, pre-calculate the value for fast computation :type Wix: matrix :param h_tm1: contains the hidden activation from previous time step :type h_tm1: matrix, each row means a hidden activation vector of a time step :param c_tm1: this parameter is not used, just to keep the interface consistent with LSTM :returns: h_t is the hidden activation of current time step """ h_t = T.tanh(Wix + T.dot(h_tm1, self.W_hi) + self.b_i) # c_t = h_t return h_t, c_t
def lstm_as_activation_function(self, Wix, Wfx, Wcx, Wox, h_tm1, c_tm1): """ This function treats the LSTM block as an activation function, and implements the LSTM (without the output gate) activation function. The meaning of each input and output parameters can be found in :func:`layers.gating.LstmBase.recurrent_fn` """ i_t = T.nnet.sigmoid(Wix + T.dot(h_tm1, self.W_hi) + self.b_i) f_t = T.nnet.sigmoid(Wfx + T.dot(h_tm1, self.W_hf) + self.b_f) c_t = f_t * c_tm1 + i_t * T.tanh(Wcx + T.dot(h_tm1, self.W_hc) + self.b_c) o_t = T.nnet.sigmoid(Wox + T.dot(h_tm1, self.W_ho) + self.b_o) h_t = o_t * T.tanh(c_t) return h_t, c_t
def lstm_as_activation_function(self, Wix, Wfx, Wcx, Wox, h_tm1, c_tm1): """ This function treats the LSTM block as an activation function, and implements the LSTM (simplified LSTM) activation function. The meaning of each input and output parameters can be found in :func:`layers.gating.LstmBase.recurrent_fn` """ ##can_h_t = T.tanh(Whx + r_t * T.dot(h_tm1, self.W_hh) + self.b_h) f_t = T.nnet.sigmoid(Wfx + T.dot(h_tm1, self.W_hf) + self.b_f) #self.w_cf * c_tm1 can_h_t = T.tanh(Wcx + f_t * T.dot(h_tm1, self.W_hc) + self.b_c) h_t = self.w_cf * (1.0 - f_t) * h_tm1 + f_t * can_h_t c_t = h_t # c_t = f_t * c_tm1 + (1 - f_t) * T.tanh(Wcx + T.dot(h_tm1, self.W_hc) + self.b_c) # h_t = T.tanh(c_t) return h_t, c_t
def gru_as_activation_function(self, Wzx, Wrx, Whx, h_tm1, c_tm1 = None): """ This function treats the GRU block as an activation function, and implements the GRU activation function. This function is called by :func:`layers.gating.GatedRecurrentUnit.__init__`. Wzx, Wrx, Whx have been pre-computed before passing to this function. To make the same interface as LSTM, we keep a c_tm1 (means the cell state of previous time step, but GRU does not maintain a cell state). """ z_t = T.nnet.sigmoid(Wzx + T.dot(h_tm1, self.W_hz) + self.b_z) r_t = T.nnet.sigmoid(Wrx + T.dot(h_tm1, self.W_hr) + self.b_r) can_h_t = T.tanh(Whx + r_t * T.dot(h_tm1, self.W_hh) + self.b_h) h_t = (1 - z_t) * h_tm1 + z_t * can_h_t c_t = h_t ## in order to have the same interface as LSTM return h_t, c_t
def model(x, embedding_size, n_hidden): # hidden and input weights U = shared_glorot_uniform(( embedding_size,n_hidden), name="U") W = shared_glorot_uniform((n_hidden, n_hidden), name="W") bh = shared_zeros((n_hidden,), name="bh") # output weights V = shared_glorot_uniform(( n_hidden, embedding_size), name="V") by = shared_zeros((embedding_size,), name="by") params = [U,V,W,by,bh] def step(x_t, h_tm1): h_t = T.tanh(U[x_t] + T.dot( h_tm1, W) + bh) y_t = T.dot(h_t, V) + by return h_t, y_t h0 = shared_zeros((n_hidden,), name='h0') [h, y_pred], _ = theano.scan(step, sequences=x, outputs_info=[h0, None], truncate_gradient=10) model = T.nnet.softmax(y_pred) return model, params
def model(inputs, _is_training, params, batch_size, hidden_size, drop_i, drop_s, init_scale, init_H_bias, _theano_rng): noise_i_for_H = get_dropout_noise((batch_size, hidden_size), drop_i, _theano_rng) i_for_H = ifelse(_is_training, inputs * noise_i_for_H, inputs) i_for_H = linear.model(i_for_H, params, hidden_size, hidden_size, init_scale, bias_init=init_H_bias) # Dropout noise for recurrent hidden state. noise_s = get_dropout_noise((batch_size, hidden_size), drop_s, _theano_rng) def step(i_for_H_t, y_tm1, noise_s): s_lm1_for_H = ifelse(_is_training, y_tm1 * noise_s, y_tm1) return T.tanh(i_for_H_t + linear.model(s_lm1_for_H, params, hidden_size, hidden_size, init_scale)) y_0 = shared_zeros((batch_size, hidden_size), name='h0') y, _ = theano.scan(step, sequences=i_for_H, outputs_info=[y_0], non_sequences = [noise_s]) y_last = y[-1] sticky_state_updates = [(y_0, y_last)] return y, y_0, sticky_state_updates
def __init__(self, rng, input, n_in, n_hidden, n_out): # ??????????????????????????????????? # ?????????????????????????? self.hiddenLayer = HiddenLayer(rng=rng, input=input, n_in=n_in, n_out=n_hidden, activation=T.tanh) self.logRegressionLayer = LogisticRegression(input=self.hiddenLayer.output, n_in=n_hidden, n_out=n_out) # L1/L2????????????????? self.L1 = abs(self.hiddenLayer.W).sum() + abs(self.logRegressionLayer.W).sum() self.L2_sqr = (self.hiddenLayer.W ** 2).sum() + (self.logRegressionLayer.W ** 2).sum() # MLP?????????????? # ?????????????????????????????? self.negative_log_likelihood = self.logRegressionLayer.negative_log_likelihood # ??????????? self.errors = self.logRegressionLayer.errors # ??????????????? self.params = self.hiddenLayer.params + self.logRegressionLayer.params
def tanh(x): r"""Tanh activation function :math:`\varphi(x) = \tanh(x)` Parameters ---------- x : symbolic tensor Tensor to compute the activation function for. Returns ------- symbolic tensor of value in [-1, 1] The output of the tanh function applied to the activation `x`. """ return T.tanh(x)
def recurrent_fn(u_t, h_tm1, W_uht, W_hht, b_ht, W_uz, W_hz, b_z, W_ur, W_hr, b_r, W_hy, b_y): z_t = T.nnet.sigmoid(T.dot(u_t, W_uz) + T.dot(h_tm1, W_hz) + b_z) r_t = T.nnet.sigmoid(T.dot(u_t, W_ur) + T.dot(h_tm1, W_hr) + b_r) ht_t = T.tanh(T.dot(u_t, W_uht) + T.dot(r_t*h_tm1, W_hht) + b_ht) h_t = (1 - z_t)*h_tm1 + z_t*ht_t return h_t # def fcn2(u_t, h_tm1, s_tm1,h_tm12, s_tm12, W_ug, W_hg, b_g, W_ui, W_hi, b_i, W_uf, W_hf, b_f, # W_uo, W_ho, b_o, W_hy, b_hy, W_ug2, W_hg2, b_g2, W_ui2, W_hi2, b_i2, W_uf2, W_hf2, b_f2, # W_uo2, W_ho2, b_o2, W_hy2, b_hy2): # [h_t, s_t] = recurrent_fn(u_t, h_tm1, s_tm1, W_ug, W_hg, b_g, W_ui, W_hi, b_i, W_uf, W_hf, b_f, # W_uo, W_ho, b_o, W_hy, b_hy) # o1 = T.dot(h_tm1, W_hy) + b_hy # [h_t2, s_t2] = recurrent_fn(o1, h_tm12, s_tm12, W_ug2, W_hg2, b_g2, W_ui2, W_hi2, b_i2, W_uf2, W_hf2, b_f2, # W_uo2, W_ho2, b_o2, W_hy2, b_hy2) # return [h_t, s_t, h_t2, s_t2] #use GRULayer class to define algebra of GRU and build stack and gradient calculation #one layer gru stack for stock price prediction
def get_output(self, train=False): input = self.get_input(train) proj_input = self.activation(T.tensordot(input, self.att_proj, axes=(3,0))) if self.context == 'word': att_scores = T.tensordot(proj_input, self.att_scorer, axes=(3, 0)) elif self.context == 'clause': def step(a_t, h_tm1, W_in, W, sc): h_t = T.tanh(T.tensordot(a_t, W_in, axes=(2,0)) + T.tensordot(h_tm1, W, axes=(2,0))) s_t = T.tensordot(h_t, sc, axes=(2,0)) return h_t, s_t [_, scores], _ = theano.scan(step, sequences=[proj_input.dimshuffle(2,0,1,3)], outputs_info=[T.zeros((proj_input.shape[0], self.td1, self.rec_hid_dim)), None], non_sequences=[self.rec_in_weights, self.rec_hid_weights, self.att_scorer]) att_scores = scores.dimshuffle(1,2,0) elif self.context == 'para': att_scores = T.tensordot(proj_input, self.att_scorer, axes=(3, 2)).sum(axis=(1, 2)) # Nested scans. For shame! def get_sample_att(sample_input, sample_att): sample_att_inp, _ = theano.scan(fn=lambda s_att_i, s_input_i: T.dot(s_att_i, s_input_i), sequences=[T.nnet.softmax(sample_att), sample_input]) return sample_att_inp att_input, _ = theano.scan(fn=get_sample_att, sequences=[input, att_scores]) return att_input
def get_layer(self, x_in): assert x_in.ndim == 2 n_steps = x_in.shape[0] def __slice(x_, n, dim): return x_[n * dim: (n + 1) * dim] def __step(x_, h_, c_): preact = T.dot(h_, self._params['U']) + x_ + self._params['b'] i = T.nnet.sigmoid(__slice(preact, 0, self._ydim)) f = T.nnet.sigmoid(__slice(preact, 1, self._ydim)) o = T.nnet.sigmoid(__slice(preact, 2, self._ydim)) c = T.tanh(__slice(preact, 3, self._ydim)) c = f * c_ + i * c h = o * T.tanh(c) return h, c x_in = T.dot(x_in, self._params['W']) + self._params['b'] rval, updates = theano.scan(__step, sequences=x_in, go_backwards=self.go_backwards, outputs_info=[T.alloc(np_floatX(0.), self._ydim), T.alloc(np_floatX(0.), self._ydim)], name='lstm_layers', n_steps=n_steps) return reverse(rval[0]) if self.go_backwards else rval[0]
def __init__(self, rng, x, y, n_x, n_y, activation=T.tanh): weight_max = numpy.sqrt(6. / (n_x + n_y)) if activation == theano.tensor.nnet.sigmoid: weight_max *= 4 self.w = theano.shared( value=rng.uniform(low=-weight_max, high=weight_max, size=(n_x, n_y)), name='w', borrow=True ) self.b = theano.shared( value= numpy.zeros((n_y,), dtype=theano.config.floatX), name='b', borrow=True ) self.params = [self.w, self.b] # save x, y self.x = x self.y = y # calculate the output self.y_given_x = T.dot(self.x, self.w) + self.b if activation is not None: self.y_given_x = activation(self.y_given_x)
def convpool(X, W, b, poolsize=(2, 2)): conv_out = conv2d(input=X, filters=W) # downsample each feature map individually, using maxpooling pooled_out = downsample.max_pool_2d( input=conv_out, ds=poolsize, ignore_border=True ) # add the bias term. Since the bias is a vector (1D array), we first # reshape it to a tensor of shape (1, n_filters, 1, 1). Each bias will # thus be broadcasted across mini-batches and feature map # width & height # return T.tanh(pooled_out + b.dimshuffle('x', 0, 'x', 'x')) return relu(pooled_out + b.dimshuffle('x', 0, 'x', 'x'))
def apply(self, input_v, input_h): # Vertical stack v_nxn_out = self.vertical_conv_nxn.apply(input_v) # Different cropping are used depending on the row we wish to condition on v_nxn_out_to_h = v_nxn_out[:,:,:-(self.filter_size//2)-2,:] v_nxn_out_to_v = v_nxn_out[:,:,1:-(self.filter_size//2)-1,:] v_1x1_out = self.vertical_conv_1x1.apply(v_nxn_out_to_h) output_v = T.tanh(v_nxn_out_to_v[:,:self.num_filters,:,:]) * \ T.nnet.sigmoid(v_nxn_out_to_v[:,self.num_filters:,:,:]) # Horizontal stack h_1xn_out = self.horizontal_conv_1xn.apply(input_h) h_1xn_out = h_1xn_out[:,:,:,:-(self.filter_size//2)] h_sum = h_1xn_out + v_1x1_out h_activation = T.tanh(h_sum[:,:self.num_filters,:,:]) * \ T.nnet.sigmoid(h_sum[:,self.num_filters:,:,:]) h_1x1_out = self.horizontal_conv_1x1.apply(h_activation) if self.res: # input_h_padded = T.zeros(input_h.shape, dtype=theano.config.floatX) # input_h_padded = T.inc_subtensor(input_h_padded[:,:,3:,3:], input_h[:,:,:-3,:-3]) # input_h = input_h_padded output_h = h_1x1_out #+ input_h else: output_h = h_1x1_out #h_activation return output_v, output_h
def predict(self, new_data, batch_size): """ predict for new data """ img_shape = (batch_size, 1, self.image_shape[2], self.image_shape[3]) conv_out = conv.conv2d(input=new_data, filters=self.W, filter_shape=self.filter_shape, image_shape=img_shape) if self.non_linear=="tanh": conv_out_tanh = T.tanh(conv_out + self.b.dimshuffle('x', 0, 'x', 'x')) output = downsample.max_pool_2d(input=conv_out_tanh, ds=self.poolsize, ignore_border=True) if self.non_linear=="relu": conv_out_tanh = ReLU(conv_out + self.b.dimshuffle('x', 0, 'x', 'x')) output = downsample.max_pool_2d(input=conv_out_tanh, ds=self.poolsize, ignore_border=True) else: pooled_out = downsample.max_pool_2d(input=conv_out, ds=self.poolsize, ignore_border=True) output = pooled_out + self.b.dimshuffle('x', 0, 'x', 'x') return output
def get_output(self, input_): """ This function overrides the parents' one. Tanh is element-wise operation. Math Expression ------------------- y = tanh(x) Parameters ---------- input_: TensorVariable Returns ------- TensorVariable """ return T.tanh(input_)
def __init__(self, n_in, n_out, activation=tanh, order=1, clip_gradients=False): self.n_in = n_in self.n_out = n_out self.activation = activation self.order = order self.clip_gradients = clip_gradients internal_layers = self.internal_layers = [ ] for i in range(order): input_layer = Layer(n_in, n_out, linear, has_bias=False, \ clip_gradients=clip_gradients) internal_layers.append(input_layer) self.bias = create_shared(random_init((n_out,)), name="bias")
def forward(self, x, mask, hc): n_in, n_out, activation = self.n_in, self.n_out_t, self.activation if hc.ndim > 1: c_tm1 = hc[:, :n_out] h_tm1 = hc[:, n_out:] else: c_tm1 = hc[:n_out] h_tm1 = hc[n_out:] in_t = self.in_gate.forward(x,h_tm1) forget_t = self.forget_gate.forward(x,h_tm1) out_t = self.out_gate.forward(x, h_tm1) c_t = forget_t * c_tm1 + in_t * self.input_layer.forward(x,h_tm1) c_t = c_t * mask.dimshuffle(0, 'x') c_t = T.cast(c_t, 'float32') h_t = out_t * T.tanh(c_t) h_t = h_t * mask.dimshuffle(0, 'x') h_t = T.cast(h_t, 'float32') if hc.ndim > 1: return T.concatenate([ c_t, h_t ], axis=1) else: return T.concatenate([ c_t, h_t ])
def backward(self, x, mask, hc): n_in, n_out, activation = self.n_in, self.n_out_t, self.activation if hc.ndim > 1: c_tm1 = hc[:, :n_out] h_tm1 = hc[:, n_out:] else: c_tm1 = hc[:n_out] h_tm1 = hc[n_out:] in_t = self.in_gate_b.forward(x,h_tm1) forget_t = self.forget_gate_b.forward(x,h_tm1) out_t = self.out_gate_b.forward(x, h_tm1) c_t = forget_t * c_tm1 + in_t * self.input_layer_b.forward(x,h_tm1) c_t = c_t * mask.dimshuffle(0, 'x') c_t = T.cast(c_t, 'float32') h_t = out_t * T.tanh(c_t) h_t = h_t * mask.dimshuffle(0, 'x') h_t = T.cast(h_t, 'float32') if hc.ndim > 1: return T.concatenate([ c_t, h_t ], axis=1) else: return T.concatenate([ c_t, h_t ])
def get_parent_state(self, children_states, node_type, use_dropout: bool, iteration_number) -> tuple: layer_input = T.flatten(children_states) nn_out = self.__compute_layer_output(layer_input, node_type, use_dropout, iteration_number) encoder_input = T.flatten(T.concatenate((children_states, nn_out))) * self.__ae_noise encoding = T.tanh(T.dot(encoder_input, self.__encoder_weights[node_type])) decoded = T.tanh(T.dot(encoding, self.__decoder_weights)) decoded /= decoded.norm(2) / layer_input.norm(2) output_reconstruction = self.__compute_layer_output(decoded, node_type, use_dropout, iteration_number) reconstruction_cos = T.dot(nn_out[0], output_reconstruction[0]) children_reconstruction_cos = T.dot(decoded, layer_input) additional_objective = reconstruction_cos + children_reconstruction_cos constrain_usage_pct = T.cast(1. - T.pow(self.__hyperparameters['constrain_intro_rate'], iteration_number), theano.config.floatX) return nn_out[0], constrain_usage_pct * additional_objective