我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用keras.backend.variable()。
def tensor_swirl(image, center=None, strength=1, radius=100, rotation=0, cval=0.0, **kwargs): # **kwargs is for unsupported options (ignored) cval = tf.fill(K.shape(image)[0:1], cval) shape = K.int_shape(image)[1:3] if center is None: center = np.array(shape) / 2 ys = np.expand_dims(np.repeat(np.arange(shape[0]), shape[1]),-1) xs = np.expand_dims(np.tile (np.arange(shape[1]), shape[0]),-1) map_xs, map_ys = swirl_mapping(xs, ys, center, rotation, strength, radius) mapping = np.zeros((*shape, *shape)) for map_x, map_y, x, y in zip(map_xs, map_ys, xs, ys): results = tensor_linear_interpolation(image, map_x, map_y, cval) for _y, _x, w in results: # mapping[int(y),int(x),int(_y),int(_x),] = w mapping[int(_y),int(_x),int(y),int(x),] = w results = tf.tensordot(image, K.variable(mapping), [[1,2],[0,1]]) # results = K.reshape(results, K.shape(image)) return results
def generate_gpu(configs,**kwargs): configs = np.array(configs) import math size = int(math.sqrt(len(configs[0]))) base = panels.shape[1] dim = base*size def build(): P = 2 configs = Input(shape=(size*size,)) _configs = 1 - K.round((configs/2)+0.5) # from -1/1 to 1/0 configs_one_hot = K.one_hot(K.cast(_configs,'int32'), P) configs_one_hot = K.reshape(configs_one_hot, [-1,P]) _panels = K.variable(panels) _panels = K.reshape(_panels, [P, base*base]) states = tf.matmul(configs_one_hot, _panels) states = K.reshape(states, [-1, size, size, base, base]) states = K.permute_dimensions(states, [0, 1, 3, 2, 4]) states = K.reshape(states, [-1, size*base, size*base, 1]) states = K.spatial_2d_padding(states, padding=((pad,pad),(pad,pad))) states = K.squeeze(states, -1) return Model(configs, wrap(configs, states)) return preprocess(batch_swirl(build().predict(configs,**kwargs)))
def generate_gpu2(configs,**kwargs): configs = np.array(configs) import math size = int(math.sqrt(len(configs[0]))) base = panels.shape[1] dim = base*size def build(): P = 2 configs = Input(shape=(size*size,)) _configs = 1 - K.round((configs/2)+0.5) # from -1/1 to 1/0 configs_one_hot = K.one_hot(K.cast(_configs,'int32'), P) configs_one_hot = K.reshape(configs_one_hot, [-1,P]) _panels = K.variable(panels) _panels = K.reshape(_panels, [P, base*base]) states = tf.matmul(configs_one_hot, _panels) states = K.reshape(states, [-1, size, size, base, base]) states = K.permute_dimensions(states, [0, 1, 3, 2, 4]) states = K.reshape(states, [-1, size*base, size*base, 1]) states = K.spatial_2d_padding(states, padding=((pad,pad),(pad,pad))) states = K.squeeze(states, -1) states = tensor_swirl(states, radius=dim+2*pad * relative_swirl_radius, **swirl_args) return Model(configs, wrap(configs, states)) return preprocess(build().predict(configs,**kwargs))
def generate_gpu(configs, **kwargs): import math size = int(math.sqrt(len(configs[0]))) base = panels.shape[1] dim = base*size def build(): P = 2 configs = Input(shape=(size*size,)) _configs = 1 - K.round((configs/2)+0.5) # from -1/1 to 1/0 configs_one_hot = K.one_hot(K.cast(_configs,'int32'), P) configs_one_hot = K.reshape(configs_one_hot, [-1,P]) _panels = K.variable(panels) _panels = K.reshape(_panels, [P, base*base]) states = tf.matmul(configs_one_hot, _panels) states = K.reshape(states, [-1, size, size, base, base]) states = K.permute_dimensions(states, [0, 1, 3, 2, 4]) states = K.reshape(states, [-1, size*base, size*base]) return Model(configs, wrap(configs, states)) return build().predict(np.array(configs),**kwargs)
def style_loss(style_image, target_image, style_masks, target_masks): '''Calculate style loss between style_image and target_image, in all regions. ''' assert 3 == K.ndim(style_image) == K.ndim(target_image) assert 3 == K.ndim(style_masks) == K.ndim(target_masks) loss = K.variable(0) for i in xrange(nb_labels): if K.image_dim_ordering() == 'th': style_mask = style_masks[i, :, :] target_mask = target_masks[i, :, :] else: style_mask = style_masks[:, :, i] target_mask = target_masks[:, :, i] loss += region_style_loss(style_image, target_image, style_mask, target_mask) return loss
def test_jaccard_distance(): # all_right, almost_right, half_right, all_wrong y_true = np.array([[0, 0, 1, 0], [0, 0, 1, 0], [0, 0, 1, 0], [0, 0, 1., 0.]]) y_pred = np.array([[0, 0, 1, 0], [0, 0, 0.9, 0], [0, 0, 0.1, 0], [1, 1, 0.1, 1.]]) r = jaccard_distance( K.variable(y_true), K.variable(y_pred), ) if K.is_keras_tensor(r): assert K.int_shape(r) == (4, ) all_right, almost_right, half_right, all_wrong = K.eval(r) assert all_right == 0, 'should converge on zero' assert all_right < almost_right assert almost_right < half_right assert half_right < all_wrong
def test_sub_pixel_upscaling(): num_samples = 2 num_row = 16 num_col = 16 input_dtype = K.floatx() for scale_factor in [2, 3, 4]: input_data = np.random.random((num_samples, 4 * (scale_factor ** 2), num_row, num_col)) input_data = input_data.astype(input_dtype) if K.image_data_format() == 'channels_last': input_data = input_data.transpose((0, 2, 3, 1)) input_tensor = K.variable(input_data) expected_output = K.eval(KC.depth_to_space(input_tensor, scale=scale_factor)) layer_test(convolutional.SubPixelUpscaling, kwargs={'scale_factor': scale_factor}, input_data=input_data, expected_output=expected_output, expected_output_dtype=K.floatx())
def test_regularizer(layer_class): layer = layer_class(output_dim, return_sequences=False, weights=None, batch_input_shape=(nb_samples, timesteps, embedding_dim), W_regularizer=regularizers.WeightRegularizer(l1=0.01), U_regularizer=regularizers.WeightRegularizer(l1=0.01), b_regularizer='l2') shape = (nb_samples, timesteps, embedding_dim) layer.build(shape) output = layer(K.variable(np.ones(shape))) K.eval(output) if layer_class == recurrent.SimpleRNN: assert len(layer.losses) == 3 if layer_class == recurrent.GRU: assert len(layer.losses) == 9 if layer_class == recurrent.LSTM: assert len(layer.losses) == 12
def __init__(self, mdl, x): self.loss_value = None self.grad_values = None self.mdl = mdl loss = K.variable(0.) layer_dict = dict([(layer.name, layer) for layer in mdl.layers]) inp = layer_dict['face'].output out = layer_dict['conf'].output loss -= K.sum(out) # Might want to add some L2-loss in here, depending on output # loss += 0.0005 * K.sum(K.square(inp - x)) grads = K.gradients(loss, inp) outputs = [loss] if type(grads) in {list, tuple}: outputs += grads else: outputs.append(grads) self.f_outputs = K.function([inp, K.learning_phase()], outputs)
def style_loss(style_image, target_image, style_masks, target_masks): '''Calculate style loss between style_image and target_image, in all regions. ''' assert 3 == K.ndim(style_image) == K.ndim(target_image) assert 3 == K.ndim(style_masks) == K.ndim(target_masks) loss = K.variable(0) for i in xrange(num_labels): if K.image_data_format() == 'channels_first': style_mask = style_masks[i, :, :] target_mask = target_masks[i, :, :] else: style_mask = style_masks[:, :, i] target_mask = target_masks[:, :, i] loss += region_style_loss(style_image, target_image, style_mask, target_mask) return loss
def get_gradcam(image,model,layer_name,mode): layer = model.get_layer(layer_name) image = np.expand_dims(image,0) loss = K.variable(0.) if mode == "abnormal": loss += K.sum(model.output) elif mode == "normal": loss += K.sum(1 - model.output) else: raise ValueError("mode must be normal or abnormal") #gradients of prediction wrt the conv layer of choice are used upstream_grads = K.gradients(loss,layer.output)[0] feature_weights = K.mean(upstream_grads,axis=[1,2]) #spatial global avg pool heatmap = K.relu(K.dot(layer.output, K.transpose(feature_weights))) fetch_heatmap = K.function([model.input, K.learning_phase()], [heatmap]) return fetch_heatmap([image,0])[0]
def checkScale(targetSample, outputSample, scale, nIters = 3, batchSize = 1000): mmd_TT = np.zeros(nIters) mmd_OT = np.zeros(nIters) #ratios = np.zeros(nIters) for i in range(nIters): T = targetSample[np.random.randint(targetSample.shape[0], size=batchSize),:] T1 = targetSample[np.random.randint(targetSample.shape[0], size=batchSize),:] T2 = targetSample[np.random.randint(targetSample.shape[0], size=batchSize),:] O = outputSample[np.random.randint(outputSample.shape[0], size=batchSize),:] mmd_TT[i] = K.eval(cf.MMD(T1,T2, scales=[scale]).cost(K.variable(value=T1), K.variable(value=T2))) mmd_OT[i] = K.eval(cf.MMD(T,O, scales=[scale]).cost(K.variable(value=T), K.variable(value=O))) #ratios[i] = (mmd_OT[i] - mmd_TT[i])/ mmd_OT[i] print('scale: ' + str(scale)) print('mmd_TT: ' + str (np.mean(mmd_TT))) print('mmd_OT: ' + str (np.mean(mmd_OT))) ratio = (np.mean(mmd_OT) - np.mean(mmd_TT))/ np.mean(mmd_OT) print('ratio: ' + str(ratio)) return np.mean(mmd_TT), np.mean(mmd_OT), ratio
def build(self, input_shape): super().build(input_shape) self.mask = np.ones(self.W_shape) assert mask.shape[0] == mask.shape[1] filter_size = self.mask.shape[0] filter_center = filter_size / 2 self.mask[math.ceil(filter_center):] = 0 self.mask[math.floor(filter_center):, math.ceil(filter_center):] = 0 if self.mono: if self.mask_type == 'A': self.mask[math.floor(filter_center), math.floor(filter_center)] = 0 else: op = np.greater_equal if self.mask_type == 'A' else np.greater for i in range(self.n_channels): for j in range(self.n_channels): if op(i, j): self.mask[math.floor(filter_center), math.floor(filter_center), i::self.n_channels, j::self.n_channels] = 0 self.mask = K.variable(self.mask)
def get_total_loss(content_losses, style_losses, total_var_loss, content_weights, style_weights, tv_weights, class_targets): total_loss = K.variable(0.) # Compute content losses for loss in content_losses: weighted_loss = K.mean(K.gather(content_weights, class_targets) * loss) weighted_content_losses.append(weighted_loss) total_loss += weighted_loss # Compute style losses for loss in style_losses: weighted_loss = K.mean(K.gather(style_weights, class_targets) * loss) weighted_style_losses.append(weighted_loss) total_loss += weighted_loss # Compute tv loss weighted_tv_loss = K.mean(K.gather(tv_weights, class_targets) * total_var_loss) total_loss += weighted_tv_loss return (total_loss, weighted_content_losses, weighted_style_losses, weighted_tv_loss)
def build(self, input_shape): alpha_shape = input_shape[self.axis] self.alpha = self.init((alpha_shape,), name='alpha_pos'.format(self.name)) self.rho = K.variable(self.power_init * np.ones(alpha_shape), name='rho_pos'.format(self.name)) if self.fit: self.trainable_weights = [self.alpha, self.rho] self.input_spec = [InputSpec(dtype=K.floatx(), shape=input_shape)] if self.initial_weights is not None: self.set_weights(self.initial_weights) del self.initial_weights
def test_downsample_model_features(): """ Test creates a toy numpy array, and checks that the method correctly downsamples the array into a hand-checked tensor """ # Create the spliced and averaged tensor via downsampling function array = np.array([[1, 2, 3, 4, 5, 6, 7, 8, 9, 10], [11, 12, 13, 14, 15, 16, 17, 18, 19, 20], [21, 22, 23, 24, 25, 26, 27, 28, 29, 30] ]) tensor = K.variable(array) x = _downsample_model_features(tensor, 5) # Create the spliced and averaged tensor by hand check_array = np.array([[1.5, 3.5, 5.5, 7.5, 9.5], [11.5, 13.5, 15.5, 17.5, 19.5], [21.5, 23.5, 25.5, 27.5, 29.5] ]) check_tensor = K.variable(check_array) # Check that they are equal: that it returns the correct tensor assert np.allclose(K.eval(check_tensor), K.eval(x), atol=ATOL)
def style_loss(style_image, target_image, style_masks, target_masks): '''Calculate style loss between style_image and target_image, in all regions. ''' assert 3 == K.ndim(style_image) == K.ndim(target_image) assert 3 == K.ndim(style_masks) == K.ndim(target_masks) loss = K.variable(0) for i in range(nb_labels): if K.image_dim_ordering() == 'th': style_mask = style_masks[i, :, :] target_mask = target_masks[i, :, :] else: style_mask = style_masks[:, :, i] target_mask = target_masks[:, :, i] loss += region_style_weight * region_style_loss(style_image, target_image, style_mask, target_mask) return loss
def make_soft(y_true, fragment_length, nb_output_bins, train_with_soft_target_stdev, with_prints=False): receptive_field, _ = compute_receptive_field() n_outputs = fragment_length - receptive_field + 1 # Make a gaussian kernel. kernel_v = scipy.signal.gaussian(9, std=train_with_soft_target_stdev) print(kernel_v) kernel_v = np.reshape(kernel_v, [1, 1, -1, 1]) kernel = K.variable(kernel_v) if with_prints: y_true = print_t(y_true, 'y_true initial') # y_true: [batch, timesteps, input_dim] y_true = K.reshape(y_true, (-1, 1, nb_output_bins, 1)) # Same filter for all output; combine with batch. # y_true: [batch*timesteps, n_channels=1, input_dim, dummy] y_true = K.conv2d(y_true, kernel, border_mode='same') y_true = K.reshape(y_true, (-1, n_outputs, nb_output_bins)) # Same filter for all output; combine with batch. # y_true: [batch, timesteps, input_dim] y_true /= K.sum(y_true, axis=-1, keepdims=True) if with_prints: y_true = print_t(y_true, 'y_true after') return y_true
def check_two_tensor_operation(function_name, x_input_shape, y_input_shape, **kwargs): xval = np.random.random(x_input_shape) - 0.5 xth = KTH.variable(xval) xtf = KTF.variable(xval) yval = np.random.random(y_input_shape) - 0.5 yth = KTH.variable(yval) ytf = KTF.variable(yval) zth = KTH.eval(getattr(KTH, function_name)(xth, yth, **kwargs)) ztf = KTF.eval(getattr(KTF, function_name)(xtf, ytf, **kwargs)) assert zth.shape == ztf.shape assert_allclose(zth, ztf, atol=1e-05)
def check_composed_tensor_operations(first_function_name, first_function_args, second_function_name, second_function_args, input_shape): ''' Creates a random tensor t0 with shape input_shape and compute t1 = first_function_name(t0, **first_function_args) t2 = second_function_name(t1, **second_function_args) with both Theano and TensorFlow backends and ensures the answers match. ''' val = np.random.random(input_shape) - 0.5 xth = KTH.variable(val) xtf = KTF.variable(val) yth = getattr(KTH, first_function_name)(xth, **first_function_args) ytf = getattr(KTF, first_function_name)(xtf, **first_function_args) zth = KTH.eval(getattr(KTH, second_function_name)(yth, **second_function_args)) ztf = KTF.eval(getattr(KTF, second_function_name)(ytf, **second_function_args)) assert zth.shape == ztf.shape assert_allclose(zth, ztf, atol=1e-05)
def test_shape_operations(self): # concatenate xval = np.random.random((4, 3)) xth = KTH.variable(xval) xtf = KTF.variable(xval) yval = np.random.random((4, 2)) yth = KTH.variable(yval) ytf = KTF.variable(yval) zth = KTH.eval(KTH.concatenate([xth, yth], axis=-1)) ztf = KTF.eval(KTF.concatenate([xtf, ytf], axis=-1)) assert zth.shape == ztf.shape assert_allclose(zth, ztf, atol=1e-05) check_single_tensor_operation('reshape', (4, 2), shape=(8, 1)) check_single_tensor_operation('permute_dimensions', (4, 2, 3), pattern=(2, 0, 1)) check_single_tensor_operation('repeat', (4, 1), n=3) check_single_tensor_operation('flatten', (4, 1)) check_single_tensor_operation('expand_dims', (4, 3), dim=-1) check_single_tensor_operation('expand_dims', (4, 3, 2), dim=1) check_single_tensor_operation('squeeze', (4, 3, 1), axis=2) check_single_tensor_operation('squeeze', (4, 1, 1), axis=1) check_composed_tensor_operations('reshape', {'shape': (4, 3, 1, 1)}, 'squeeze', {'axis': 2}, (4, 3, 1, 1))
def test_repeat_elements(self): reps = 3 for ndims in [1, 2, 3]: shape = np.arange(2, 2 + ndims) arr = np.arange(np.prod(shape)).reshape(shape) arr_th = KTH.variable(arr) arr_tf = KTF.variable(arr) for rep_axis in range(ndims): np_rep = np.repeat(arr, reps, axis=rep_axis) th_rep = KTH.eval( KTH.repeat_elements(arr_th, reps, axis=rep_axis)) tf_rep = KTF.eval( KTF.repeat_elements(arr_tf, reps, axis=rep_axis)) assert th_rep.shape == np_rep.shape assert tf_rep.shape == np_rep.shape assert_allclose(np_rep, th_rep, atol=1e-05) assert_allclose(np_rep, tf_rep, atol=1e-05)
def test_sparse_dot(self): x_d = np.array([0, 7, 2, 3], dtype=np.float32) x_r = np.array([0, 2, 2, 3], dtype=np.int64) x_c = np.array([4, 3, 2, 3], dtype=np.int64) x_sparse = sparse.csr_matrix((x_d, (x_r, x_c)), shape=(4, 5)) x_dense = x_sparse.toarray() W = np.random.random((5, 4)) backends = [KTF] if KTH.th_sparse_module: # Theano has some dependency issues for sparse backends.append(KTH) for K in backends: t_W = K.variable(W) k_s = K.eval(K.dot(K.variable(x_sparse), t_W)) k_d = K.eval(K.dot(K.variable(x_dense), t_W)) assert k_s.shape == k_d.shape assert_allclose(k_s, k_d, atol=1e-05)
def test_set_floatx(self): """ Make sure that changes to the global floatx are effectively taken into account by the backend. """ # Keep track of the old value old_floatx = floatx() set_floatx('float16') var = variable([10]) check_dtype(var, 'float16') set_floatx('float64') var = variable([10]) check_dtype(var, 'float64') # Restore old value set_floatx(old_floatx)
def build(self, input_shape): # construct the clockwork structures # basically: every n units the period changes; # `period` is for flaggin this; `mask` is for enforcing it n = self.output_dim // len(self.period_spec) mask = np.zeros((self.output_dim, self.output_dim), K.floatx()) period = np.zeros((self.output_dim,), np.int16) for i, t in enumerate(self.period_spec): mask[i*n:(i+1)*n, i*n:] = 1 period[i*n:(i+1)*n] = t self.mask = K.variable(mask, name='clockword_mask') self.period = K.variable(period, dtype='int16', name='clockwork_period') super(ClockworkRNN, self).build(input_shape) self.U = self.U * self.mask # old implementation did this at run time... # simple rnn initializes the wrong size self.states # we want to also keep the time step in the state. if self.stateful: self.reset_states() else: self.states = [None, None]
def reset_states(self): assert self.stateful, 'Layer must be stateful.' input_shape = self.input_spec[0].shape if not input_shape[0]: raise Exception('If a RNN is stateful, a complete ' + 'input_shape must be provided (including batch size).') if self.go_backwards: initial_time = self.input_spec[0].shape[1] else: initial_time = 0. if hasattr(self, 'states'): K.set_value(self.states[0], np.zeros((input_shape[0], self.output_dim))) K.set_value(self.states[1], initial_time) else: self.states = [K.zeros((input_shape[0], self.output_dim)), K.variable(initial_time)]
def TEST_lossfun(): ''' Test the hinge-rank loss function in model.py ''' # data # 1 correct word , 2 wrong ones image_vectors = np.random.rand(3, WORD_DIM) word_vectors = np.random.rand(3, WORD_DIM) image_vectors = K.variable(image_vectors) word_vectors = K.variable(word_vectors) # import module from model import hinge_rank_loss # test out = K.eval(hinge_rank_loss(word_vectors, image_vectors, TESTING=True)) print out print "Completed TEST_lossfun"
def get_initial_state(self, X): #if not self.stateful: # self.controller.reset_states() init_old_ntm_output = K.ones((self.batch_size, self.output_dim), name="init_old_ntm_output")*0.42 init_M = K.ones((self.batch_size, self.n_slots , self.m_depth), name='main_memory')*0.042 init_wr = np.zeros((self.batch_size, self.read_heads, self.n_slots)) init_wr[:,:,0] = 1 init_wr = K.variable(init_wr, name="init_weights_read") init_ww = np.zeros((self.batch_size, self.write_heads, self.n_slots)) init_ww[:,:,0] = 1 init_ww = K.variable(init_ww, name="init_weights_write") return [init_old_ntm_output, init_M, init_wr, init_ww] # See chapter 3.1
def build(self, input_shape): self.input_spec = [InputSpec(shape=input_shape[0])] self.input_dim = input_shape[0][2] self.embedding_dim = input_shape[1][1] self.states = [None] self.W = self.init((self.input_dim, 3 * self.output_dim), name='{}_W'.format(self.name)) self.U = self.inner_init((self.output_dim, 3 * self.output_dim), name='{}_U'.format(self.name)) self.C = self.inner_init((self.embedding_dim, 3 * self.output_dim), name='{}_C'.format(self.name)) self.V = self.init((self.embedding_dim, self.output_dim), name='{}_V'.format(self.name)) self.b = K.variable(np.hstack((np.zeros(self.output_dim), np.zeros(self.output_dim), np.zeros(self.output_dim))), name='{}_b'.format(self.name)) self.trainable_weights = [self.W, self.U, self.C, self.V, self.b]
def find_analogy_patches(a, a_prime, b, patch_size=3, patch_stride=1): '''This is for precalculating the analogy_loss Since A, A', and B never change we only need to calculate the patch matches once. ''' # extract patches from feature maps a_patches, a_patches_norm = patches.make_patches(K.variable(a), patch_size, patch_stride) a_prime_patches, a_prime_patches_norm = patches.make_patches(K.variable(a_prime), patch_size, patch_stride) b_patches, b_patches_norm = patches.make_patches(K.variable(b), patch_size, patch_stride) # find best patches and calculate loss p = patches.find_patch_matches(b_patches, b_patches_norm, a_patches / a_patches_norm) #best_patches = a_prime_patches[p] best_patches = K.reshape(a_prime_patches[p], K.shape(b_patches)) f = K.function([], best_patches) best_patches = f([]) return best_patches
def __init__(self, width, global_p=0.5, deepest=False): self.global_p = global_p self.width = width self.switch_seed = np.random.randint(1, 10e6) self.path_seed = np.random.randint(1, 10e6) self.deepest = deepest if deepest: self.is_global = K.variable(1.) self.path_array = K.variable([1.] + [.0 for _ in range(width-1)]) else: self.is_global = self._build_global_switch() self.path_array = self._build_global_path_arr()
def bilinear_interpolation(shape, name=None): w = np.asarray([[[0.125, 0.25, 0.125], [0.25, 0.5, 0.25], [0.125, 0.25, 0.125]], [[0.25, 0.5, 0.25], [0.5, 1, 0.5], [0.25, 0.5, 0.25]], [[0.125, 0.25, 0.125], [0.25, 0.5, 0.25], [0.125, 0.25, 0.125]]]) # np.zeros(shape) return K.variable(w, name=name)
def loss_net(self) -> Model: """Returns the network that yields a loss given both input spectrograms and labels. Used for training.""" input_batch = self._input_batch_input label_batch = Input(name=Wav2Letter.InputNames.label_batch, shape=(None,), dtype='int32') label_lengths = Input(name=Wav2Letter.InputNames.label_lengths, shape=(1,), dtype='int64') asg_transition_probabilities_variable = backend.variable(value=self.asg_transition_probabilities, name="asg_transition_probabilities") asg_initial_probabilities_variable = backend.variable(value=self.asg_initial_probabilities, name="asg_initial_probabilities") # Since Keras doesn't currently support loss functions with extra parameters, # we define a custom lambda layer yielding one single real-valued CTC loss given the grapheme probabilities: loss_layer = Lambda(Wav2Letter._asg_lambda if self.use_asg else Wav2Letter._ctc_lambda, name='asg_loss' if self.use_asg else 'ctc_loss', output_shape=(1,), arguments={"transition_probabilities": asg_transition_probabilities_variable, "initial_probabilities": asg_initial_probabilities_variable} if self.use_asg else None) # ([asg_transition_probabilities_variable, asg_initial_probabilities_variable] if self.use_asg else []) # This loss layer is placed atop the predictive network and provided with additional arguments, # namely the label batch and prediction/label sequence lengths: loss = loss_layer( [self.predictive_net(input_batch), label_batch, self._prediction_lengths_input, label_lengths]) loss_net = Model(inputs=[input_batch, label_batch, self._prediction_lengths_input, label_lengths], outputs=[loss]) # Since loss is already calculated in the last layer of the net, we just pass through the results here. # The loss dummy labels have to be given to satify the Keras API. loss_net.compile(loss=lambda dummy_labels, ctc_loss: ctc_loss, optimizer=self.optimizer) return loss_net
def contractive_loss(model, lam=1e-4): def loss(y_true, y_pred): ent_loss = K.mean(K.binary_crossentropy(y_pred, y_true), axis=-1) W = K.variable(value=model.encoder.get_weights()[0]) # N x N_hidden W = K.transpose(W) # N_hidden x N h = model.encoder.output dh = h * (1 - h) # N_batch x N_hidden # N_batch x N_hidden * N_hidden x 1 = N_batch x 1 contractive = lam * K.sum(dh**2 * K.sum(W**2, axis=1), axis=1) return ent_loss + contractive return loss
def weighted_binary_crossentropy(feature_weights): def loss(y_true, y_pred): # try: # x = K.binary_crossentropy(y_pred, y_true) # # y = tf.Variable(feature_weights.astype('float32')) # # z = K.dot(x, y) # y_true = tf.pow(y_true + 1e-5, .75) # y2 = tf.div(y_true, tf.reshape(K.sum(y_true, 1), [-1, 1])) # z = K.sum(tf.mul(x, y2), 1) # except Exception as e: # print e # import pdb;pdb.set_trace() # return z return K.dot(K.binary_crossentropy(y_pred, y_true), K.variable(feature_weights.astype('float32'))) return loss
def generate(configs, width, height, **kwargs): assert width*height <= 9 load(width, height) from keras.layers import Input, Reshape from keras.models import Model from keras import backend as K import tensorflow as tf def build(): base = setting['base'] P = len(setting['panels']) configs = Input(shape=(P,)) configs_one_hot = K.one_hot(K.cast(configs,'int32'), width*height) matches = K.permute_dimensions(configs_one_hot, [0,2,1]) matches = K.reshape(matches,[-1,P]) panels = K.variable(setting['panels']) panels = K.reshape(panels, [P, base*base]) states = tf.matmul(matches, panels) states = K.reshape(states, [-1, height, width, base, base]) states = K.permute_dimensions(states, [0, 1, 3, 2, 4]) states = K.reshape(states, [-1, height*base, width*base]) return Model(configs, wrap(configs, states)) model = build() return model.predict(configs,**kwargs)
def build_error(s, height, width, base): P = len(setting['panels']) s = K.reshape(s,[-1,height,base,width,base]) s = K.permute_dimensions(s, [0,1,3,2,4]) s = K.reshape(s,[-1,height,width,1,base,base]) s = K.tile(s, [1,1,1,P,1,1,]) allpanels = K.variable(np.array(setting['panels'])) allpanels = K.reshape(allpanels, [1,1,1,P,base,base]) allpanels = K.tile(allpanels, [K.shape(s)[0], height, width, 1, 1, 1]) def hash(x): ## 2x2 average hashing x = K.reshape(x, [-1,height,width,P, base//2, 2, base//2, 2]) x = K.mean(x, axis=(5,7)) return K.round(x) ## diff hashing (horizontal diff) # x1 = x[:,:,:,:,:,:-1] # x2 = x[:,:,:,:,:,1:] # d = x1 - x2 # return K.round(d) ## just rounding # return K.round(x) ## do nothing # return x # s = hash(s) # allpanels = hash(allpanels) # error = K.binary_crossentropy(s, allpanels) error = K.abs(s - allpanels) error = hash(error) error = K.mean(error, axis=(4,5)) return error
def build_errors(states,base,dim,size): s = K.reshape(states,[-1,size,base,size,base]) s = K.permute_dimensions(s, [0,1,3,2,4]) s = K.reshape(s,[-1,size,size,1,base,base]) s = K.tile (s,[1, 1, 1, 2, 1, 1,]) # number of panels : 2 allpanels = K.variable(panels) allpanels = K.reshape(allpanels, [1,1,1,2,base,base]) allpanels = K.tile(allpanels, [K.shape(s)[0], size,size, 1, 1, 1]) def hash(x): ## 2x2 average hashing # x = K.reshape(x, [-1,size,size,2, base//2, 2, base//2, 2]) # x = K.mean(x, axis=(5,7)) # return K.round(x) ## diff hashing (horizontal diff) # x1 = x[:,:,:,:,:,:-1] # x2 = x[:,:,:,:,:,1:] # d = x1 - x2 # return K.round(d) ## just rounding return K.round(x) ## do nothing # return x # s = hash(s) # allpanels = hash(allpanels) # error = K.binary_crossentropy(s, allpanels) error = K.abs(s - allpanels) error = hash(error) error = K.mean(error, axis=(4,5)) return error
def build_error(s, disks, towers, tower_width, panels): s = K.reshape(s,[-1,disks, disk_height, towers, tower_width]) s = K.permute_dimensions(s, [0,1,3,2,4]) s = K.reshape(s,[-1,disks,towers,1, disk_height,tower_width]) s = K.tile (s,[1, 1, 1, disks+1,1, 1,]) allpanels = K.variable(panels) allpanels = K.reshape(allpanels, [1,1,1,disks+1,disk_height,tower_width]) allpanels = K.tile(allpanels, [K.shape(s)[0], disks, towers, 1, 1, 1]) def hash(x): ## 2x2 average hashing (now it does not work since disks have 1 pixel height) # x = K.reshape(x, [-1,disks,towers,disks+1, disk_height,tower_width//2,2]) # x = K.mean(x, axis=(4,)) # return K.round(x) ## diff hashing (horizontal diff) # x1 = x[:,:,:,:,:,:-1] # x2 = x[:,:,:,:,:,1:] # d = x1 - x2 # return K.round(d) ## just rounding return K.round(x) ## do nothing # return x s = hash(s) allpanels = hash(allpanels) # error = K.binary_crossentropy(s, allpanels) error = K.abs(s - allpanels) error = K.mean(error, axis=(4,5)) return error
def __init__(self,N,M,min,max,anneal_rate): self.N = N self.M = M self.layers = Sequential([ # Dense(N * M), Reshape((N,M))]) self.min = min self.max = max self.anneal_rate = anneal_rate self.tau = K.variable(max, name="temperature")
def _build(self,input_shape): super()._build(input_shape) c = K.variable(0, name="c") self.c = c x = Input(shape=input_shape) s = self.net(x) y2 = wrap(s, s / c) self.pu = Model(x,y2)
def w_categorical_crossentropy(weights): """ A weighted version of keras.objectives.categorical_crossentropy Variables: weights: numpy array of shape (C,) where C is the number of classes Usage: weights = np.array([0.5,2,10]) # Class one at 0.5, class 2 twice the normal weights, class 3 10x. loss = weighted_categorical_crossentropy(weights) model.compile(loss=loss,optimizer='adam') Credit to: @wassname (github) https://gist.github.com/wassname/ce364fddfc8a025bfab4348cf5de852d """ weights = K.variable(weights) def loss(y_true, y_pred): # scale predictions so that the class probas of each sample sum to 1 y_pred /= K.sum(y_pred, axis=-1, keepdims=True) # clip to prevent NaN's and Inf's y_pred = K.clip(y_pred, K.epsilon(), 1 - K.epsilon()) # calc loss = y_true * K.log(y_pred) * weights loss = -K.sum(loss, -1) return loss return loss
def build(self, input_shape): self.input_spec = [InputSpec(shape=input_shape)] shape = (input_shape[self.axis],) self.gamma = self.add_weight(shape, initializer=self.gamma_init, regularizer=self.gamma_regularizer, name='{}_gamma'.format(self.name)) self.beta = self.add_weight(shape, initializer=self.beta_init, regularizer=self.beta_regularizer, name='{}_beta'.format(self.name)) self.running_mean = self.add_weight(shape, initializer='zero', name='{}_running_mean'.format(self.name), trainable=False) # Note: running_std actually holds the running variance, not the running std. self.running_std = self.add_weight(shape, initializer='one', name='{}_running_std'.format(self.name), trainable=False) self.r_max = K.variable(np.ones((1,)), name='{}_r_max'.format(self.name)) self.d_max = K.variable(np.zeros((1,)), name='{}_d_max'.format(self.name)) self.t = K.variable(np.zeros((1,)), name='{}_t'.format(self.name)) if self.initial_weights is not None: self.set_weights(self.initial_weights) del self.initial_weights self.built = True
def test_loss_masking(): weighted_loss = weighted_objective(objectives.get('mae')) shape = (3, 4, 2) X = np.arange(24).reshape(shape) Y = 2 * X # Normally the trailing 1 is added by standardize_weights weights = np.ones((3,)) mask = np.ones((3, 4)) mask[1, 0] = 0 out = K.eval(weighted_loss(K.variable(X), K.variable(Y), K.variable(weights), K.variable(mask)))