我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用keras.backend.int_shape()。
def distance_layer(x1, x2): """Distance and angle of two inputs. Compute the concatenation of element-wise subtraction and multiplication of two inputs. """ def _distance(args): x1 = args[0] x2 = args[1] x = K.abs(x1 - x2) return x def _multiply(args): x1 = args[0] x2 = args[1] return x1 * x2 distance = Lambda(_distance, output_shape=(K.int_shape(x1)[-1],))([x1, x2]) multiply = Lambda(_multiply, output_shape=(K.int_shape(x1)[-1],))([x1, x2]) return concatenate([distance, multiply])
def _shortcut(input, residual): # ??????????????????y=F(x)+x """Adds a shortcut between input and residual block and merges them with "sum" """ # Expand channels of shortcut to match residual. # Stride appropriately to match residual (width, height) # Should be int if network architecture is correctly configured. input_shape = K.int_shape(input) residual_shape = K.int_shape(residual) stride_width = int(round(input_shape[ROW_AXIS] / residual_shape[ROW_AXIS])) stride_height = int(round(input_shape[COL_AXIS] / residual_shape[COL_AXIS])) equal_channels = input_shape[CHANNEL_AXIS] == residual_shape[CHANNEL_AXIS] shortcut = input # 1 X 1 conv if shape is different. Else identity. if stride_width > 1 or stride_height > 1 or not equal_channels: shortcut = Conv2D(filters=residual_shape[CHANNEL_AXIS], kernel_size=(1, 1), strides=(stride_width, stride_height), padding="valid", kernel_initializer="he_normal", kernel_regularizer=l2(0.0001))(input) return add([shortcut, residual])
def _shortcut(inputs, x): # shortcut path _, inputs_w, inputs_h, inputs_ch = K.int_shape(inputs) _, x_w, x_h, x_ch = K.int_shape(x) stride_w = int(round(inputs_w / x_w)) stride_h = int(round(inputs_h / x_h)) equal_ch = inputs_ch == x_ch if stride_w>1 or stride_h>1 or not equal_ch: shortcut = Conv2D(x_ch, (1, 1), strides = (stride_w, stride_h), kernel_initializer = init, padding = 'valid')(inputs) else: shortcut = inputs merged = Add()([shortcut, x]) return merged
def step_with_training(self, training=None): def step(inputs, states): input_shape = K.int_shape(inputs) y_tm1 = self.layer.preprocess_input( K.expand_dims(states[0], axis=1), training ) y_tm1 = K.reshape(y_tm1, (-1, input_shape[-1])) inputs_sum = tf.reduce_sum(inputs) def inputs_f(): return inputs def output_f(): return y_tm1 current_inputs = tf.case( [(tf.equal(inputs_sum, 0.0), output_f)], default=inputs_f ) return self.layer.step( current_inputs, states ) return step
def call(self, inputs, mask=None): input_shape = K.int_shape(inputs) outputs = self.layer.call(inputs) outputs = K.permute_dimensions( outputs, self.permute_pattern + [len(input_shape) - 1] ) outputs_shape = self.compute_output_shape(input_shape) outputs = K.reshape( outputs, (-1, outputs_shape[1], outputs_shape[2]) ) mask_tensor = self.compute_mask( inputs, mask ) mask_tensor = K.cast(mask_tensor, K.floatx()) mask_tensor = K.expand_dims(mask_tensor) mask_output = K.repeat_elements( mask_tensor, outputs_shape[2], 2 ) return outputs * mask_output
def preprocess_input(self, inputs, training=None): #if self.consume_less == 'cpu': # input_shape = K.int_shape(x) # input_dim = input_shape[2] # timesteps = input_shape[1] # x_z = time_distributed_dense(x, self.W_z, self.b_z, self.dropout_W, # input_dim, self.units, timesteps) # x_r = time_distributed_dense(x, self.W_r, self.b_r, self.dropout_W, # input_dim, self.units, timesteps) # x_h = time_distributed_dense(x, self.W_h, self.b_h, self.dropout_W, # input_dim, self.units, timesteps) # return K.concatenate([x_z, x_r, x_h], axis=2) #else: # return x self.ha = time_distributed_dense(self.h, self.Ua) return inputs
def get_constants(self, inputs, training=None): constants = [] '''if 0 < self.dropout_U < 1: ones = K.ones_like(K.reshape(x[:, 0, 0], (-1, 1))) ones = K.tile(ones, (1, self.units)) B_U = [K.in_train_phase(K.dropout(ones, self.dropout_U), ones) for _ in range(3)] constants.append(B_U) else: constants.append([K.cast_to_floatx(1.) for _ in range(3)]) if 0 < self.dropout_W < 1: input_shape = K.int_shape(x) input_dim = input_shape[-1] ones = K.ones_like(K.reshape(x[:, 0, 0], (-1, 1))) ones = K.tile(ones, (1, int(input_dim))) B_W = [K.in_train_phase(K.dropout(ones, self.dropout_W), ones) for _ in range(3)] constants.append(B_W) else:''' constants.append([K.cast_to_floatx(1.) for _ in range(3)]) return constants
def preprocess_input(self, inputs, training=None): #if self.consume_less == 'cpu': # input_shape = K.int_shape(x) # input_dim = input_shape[2] # timesteps = input_shape[1] # x_z = time_distributed_dense(x, self.W_z, self.b_z, self.dropout_W, # input_dim, self.units, timesteps) # x_r = time_distributed_dense(x, self.W_r, self.b_r, self.dropout_W, # input_dim, self.units, timesteps) # x_h = time_distributed_dense(x, self.W_h, self.b_h, self.dropout_W, # input_dim, self.units, timesteps) # return K.concatenate([x_z, x_r, x_h], axis=2) #else: # return x self.ha = K.dot(self.h, self.Ua) #time_distributed_dense(self.h, self.Ua) return inputs
def get_constants(self, x): constants = [] if 0 < self.dropout_U < 1: ones = K.ones_like(K.reshape(x[:, 0, 0], (-1, 1))) ones = K.tile(ones, (1, self.output_dim)) B_U = [K.in_train_phase(K.dropout(ones, self.dropout_U), ones) for _ in range(3)] constants.append(B_U) else: constants.append([K.cast_to_floatx(1.) for _ in range(3)]) if 0 < self.dropout_W < 1: input_shape = K.int_shape(x) input_dim = input_shape[-1] ones = K.ones_like(K.reshape(x[:, 0, 0], (-1, 1))) ones = K.tile(ones, (1, int(input_dim))) B_W = [K.in_train_phase(K.dropout(ones, self.dropout_W), ones) for _ in range(3)] constants.append(B_W) else: constants.append([K.cast_to_floatx(1.) for _ in range(3)]) return constants
def tensor_linear_interpolation(image, x, y, cval): # image: batch tensor, x,y: number import math x0, x1 = math.floor(x), math.ceil(x) y0, y1 = math.floor(y), math.ceil(y) dx0, dx1 = x - x0, x1 - x dy0, dy1 = y - y0, y1 - y shape = K.int_shape(image)[1:] results = [] if 0 <= y0 and y0 < shape[0] and 0 <= x0 and x0 < shape[1]: results.append((y0,x0,dy1*dx1)) if 0 <= y0 and y0 < shape[0] and 0 <= x1 and x1 < shape[1]: results.append((y0,x1,dy1*dx0)) if 0 <= y1 and y1 < shape[0] and 0 <= x0 and x0 < shape[1]: results.append((y1,x0,dy0*dx1)) if 0 <= y1 and y1 < shape[0] and 0 <= x1 and x1 < shape[1]: results.append((y1,x1,dy0*dx0)) return results
def tensor_swirl(image, center=None, strength=1, radius=100, rotation=0, cval=0.0, **kwargs): # **kwargs is for unsupported options (ignored) cval = tf.fill(K.shape(image)[0:1], cval) shape = K.int_shape(image)[1:3] if center is None: center = np.array(shape) / 2 ys = np.expand_dims(np.repeat(np.arange(shape[0]), shape[1]),-1) xs = np.expand_dims(np.tile (np.arange(shape[1]), shape[0]),-1) map_xs, map_ys = swirl_mapping(xs, ys, center, rotation, strength, radius) mapping = np.zeros((*shape, *shape)) for map_x, map_y, x, y in zip(map_xs, map_ys, xs, ys): results = tensor_linear_interpolation(image, map_x, map_y, cval) for _y, _x, w in results: # mapping[int(y),int(x),int(_y),int(_x),] = w mapping[int(_y),int(_x),int(y),int(x),] = w results = tf.tensordot(image, K.variable(mapping), [[1,2],[0,1]]) # results = K.reshape(results, K.shape(image)) return results
def test_jaccard_distance(): # all_right, almost_right, half_right, all_wrong y_true = np.array([[0, 0, 1, 0], [0, 0, 1, 0], [0, 0, 1, 0], [0, 0, 1., 0.]]) y_pred = np.array([[0, 0, 1, 0], [0, 0, 0.9, 0], [0, 0, 0.1, 0], [1, 1, 0.1, 1.]]) r = jaccard_distance( K.variable(y_true), K.variable(y_pred), ) if K.is_keras_tensor(r): assert K.int_shape(r) == (4, ) all_right, almost_right, half_right, all_wrong = K.eval(r) assert all_right == 0, 'should converge on zero' assert all_right < almost_right assert almost_right < half_right assert half_right < all_wrong
def _shortcut(input, residual): """Adds a shortcut between input and residual block and merges them with "sum" """ # Expand channels of shortcut to match residual. # Stride appropriately to match residual (width, height) # Should be int if network architecture is correctly configured. input_shape = K.int_shape(input) residual_shape = K.int_shape(residual) stride_width = int(round(input_shape[ROW_AXIS] / residual_shape[ROW_AXIS])) stride_height = int(round(input_shape[COL_AXIS] / residual_shape[COL_AXIS])) equal_channels = input_shape[CHANNEL_AXIS] == residual_shape[CHANNEL_AXIS] shortcut = input # 1 X 1 conv if shape is different. Else identity. if stride_width > 1 or stride_height > 1 or not equal_channels: shortcut = Conv2D(filters=residual_shape[CHANNEL_AXIS], kernel_size=(1, 1), strides=(stride_width, stride_height), padding="valid", kernel_initializer="he_normal", kernel_regularizer=l2(0.0001))(input) return add([shortcut, residual])
def block17(input, scale=1.0, activation_fn='relu'): if K.image_dim_ordering() == "th": channel_axis = 1 else: channel_axis = -1 shortcut = input tower_conv = conv2d_bn(input, 192, 1, 1, activ_fn=activation_fn) tower_conv1_0 = conv2d_bn(input, 128, 1, 1, activ_fn=activation_fn) tower_conv1_1 = conv2d_bn(tower_conv1_0, 160, 1, 7, activ_fn=activation_fn) tower_conv1_2 = conv2d_bn(tower_conv1_1, 192, 7, 1, activ_fn=activation_fn) mixed = merge([tower_conv, tower_conv1_2], mode='concat', concat_axis=channel_axis) up = conv2d_bn(mixed, 1088, 1, 1, activ_fn=False, normalize=False) up = Lambda(do_scale, output_shape=K.int_shape(up)[1:], arguments={'scale':scale})(up) net = merge([shortcut, up], mode='sum') if activation_fn: net = Activation(activation_fn)(net) return net
def block8(input, scale=1.0, activation_fn='relu'): if K.image_dim_ordering() == "th": channel_axis = 1 else: channel_axis = -1 shortcut = input tower_conv = conv2d_bn(input, 192, 1, 1, activ_fn=activation_fn) tower_conv1_0 = conv2d_bn(input, 192, 1, 1, activ_fn=activation_fn) tower_conv1_1 = conv2d_bn(tower_conv1_0, 224, 1, 3, activ_fn=activation_fn) tower_conv1_2 = conv2d_bn(tower_conv1_1, 256, 3, 1, activ_fn=activation_fn) mixed = merge([tower_conv, tower_conv1_2], mode='concat', concat_axis=channel_axis) up = conv2d_bn(mixed, 2080, 1, 1, activ_fn=False, normalize=False) up = Lambda(do_scale, output_shape=K.int_shape(up)[1:], arguments={'scale':scale})(up) net = merge([shortcut, up], mode='sum') if activation_fn: net = Activation(activation_fn)(net) return net
def __call__(self, model): if self.crop_right: model = Lambda(lambda x: x[:, :, :K.int_shape(x)[2]-1, :])(model) if self.v is not None: model = Merge(mode='sum')([model, self.v]) if self.h is not None: hV = Dense(output_dim=2*self.filters)(self.h) hV = Reshape((1, 1, 2*self.filters))(hV) model = Lambda(lambda x: x[0]+x[1])([model,hV]) model_f = Lambda(lambda x: x[:,:,:,:self.filters])(model) model_g = Lambda(lambda x: x[:,:,:,self.filters:])(model) model_f = Lambda(lambda x: K.tanh(x))(model_f) model_g = Lambda(lambda x: K.sigmoid(x))(model_g) res = Merge(mode='mul')([model_f, model_g]) return res
def parametric_conv( value, weight_source, filter_size, output_dim, padding): assert False, "Halp, how to make separate convs per sample" n_timesteps, input_dim = K.int_shape(value)[-2:] kernel_shape = (filter_size, input_dim, output_dim) kernel = dense( weight_source, dim=kernel_shape[0] * kernel_shape[1] * kernel_shape[1], activation="linear") kernel = Reshape(kernel_shape)(kernel) return K.conv1d( value, kernel, padding=padding)
def highway_layer(value, activation="tanh", gate_bias=-3): dims = K.int_shape(value) if len(dims) != 2: raise ValueError( "Expected 2d value (batch_size, dims) but got shape %s" % ( dims,)) dim = dims[-1] gate_bias_initializer = keras.initializers.Constant(gate_bias) gate = Dense(units=dim, bias_initializer=gate_bias_initializer)(value) gate = Activation("sigmoid")(gate) negated_gate = Lambda( lambda x: 1.0 - x, output_shape=(dim,))(gate) transformed = Dense(units=dim)(value) transformed = Activation(activation)(value) transformed_gated = Multiply()([gate, transformed]) pass_through_gated = Multiply()([negated_gate, value]) value = Add()([transformed_gated, pass_through_gated]) return value
def _create_layers(self, input_layer): """ Create the encoding and the decoding layers of the autoencoder. :return: self """ encode_layer = Dense(name='encoder', units=self.n_hidden, activation=self.enc_activation, kernel_regularizer=l1_l2(self.l1_reg, self.l2_reg), bias_regularizer=l1_l2(self.l1_reg, self.l2_reg))(input_layer) n_inputs = K.int_shape(input_layer)[-1] self._decode_layer = Dense(name='decoder', units=n_inputs, activation=self.dec_activation)(encode_layer)
def _create_layers(self, input_layer): """ Create the encoding and the decoding layers of the deep autoencoder. :param input_layer: Input size. :return: self """ encode_layer = input_layer for i, l in enumerate(self.n_hidden): encode_layer = Dense(units=l, name='encoder_%d' % i, activation=self.enc_activation[i], kernel_regularizer=l1_l2(self.l1_reg[i], self.l2_reg[i]), bias_regularizer=l1_l2(self.l1_reg[i], self.l2_reg[i]))(encode_layer) self._decode_layer = encode_layer for i, l in enumerate(self.n_hidden[-2:-(len(self.n_hidden)+1):-1] + [K.int_shape(input_layer)[1]]): self._decode_layer = Dense(units=l, name='decoder_%d' % i, activation=self.dec_activation[i])(self._decode_layer)
def _create_layers(self, input_layer): """ Create the encoding and the decoding layers of the variational autoencoder. :return: self """ n_inputs = K.int_shape(input_layer)[1] # Encode layers encode_layer = Dense(units=self.n_hidden, activation=self.enc_activation)(input_layer) z_mean = Dense(name='z_mean', units=self.n_latent)(encode_layer) z_log_var = Dense(name='z_log_var', units=self.n_latent)(encode_layer) z = Lambda(self._sampling, output_shape=(self.n_latent,))([z_mean, z_log_var]) # Decode layers self._decode_layer = Dense(units=self.n_hidden, activation=self.dec_activation)(z) self._decode_layer = Dense(units=n_inputs, activation='linear')(self._decode_layer)
def _shortcut(input, residual): """Adds a shortcut between input and residual block and merges them with "sum" """ # Expand channels of shortcut to match residual. # Stride appropriately to match residual (width, height) # Should be int if network architecture is correctly configured. input_shape = K.int_shape(input) residual_shape = K.int_shape(residual) stride_width = int(round(input_shape[ROW_AXIS] / residual_shape[ROW_AXIS])) stride_height = int(round(input_shape[COL_AXIS] / residual_shape[COL_AXIS])) equal_channels = input_shape[CHANNEL_AXIS] == residual_shape[CHANNEL_AXIS] shortcut = input # 1 X 1 conv if shape is different. Else identity. if stride_width > 1 or stride_height > 1 or not equal_channels: shortcut = Convolution2D(nb_filter=residual_shape[CHANNEL_AXIS], nb_row=1, nb_col=1, subsample=(stride_width, stride_height), init="he_normal", border_mode="valid", W_regularizer=l2(0.0001))(input) return merge([shortcut, residual], mode="sum")
def content_loss(base, combination): channel_dim = 0 if K.image_dim_ordering() == "th" else -1 channels = K.int_shape(base)[channel_dim] size = img_width * img_height if args.content_loss_type == 1: multiplier = 1. / (2. * (channels ** 0.5) * (size ** 0.5)) elif args.content_loss_type == 2: multiplier = 1. / (channels * size) else: multiplier = 1. return multiplier * K.sum(K.square(combination - base)) # the 3rd loss function, total variation loss, # designed to keep the generated image locally coherent
def deflating_convolution(inputs, n_deflation_layers, n_filters_init=32, noise=None, name_prefix=None): def add_linear_noise(x, eps, ind): flattened_deflated = Reshape((-1,), name=name_prefix + '_conv_flatten_{}'.format(ind))(x) deflated_shape = ker.int_shape(x) deflated_size = deflated_shape[1] * deflated_shape[2] * deflated_shape[3] noise_transformed = Dense(deflated_size, activation=None, name=name_prefix + '_conv_noise_dense_{}'.format(ind))(eps) added_noise = Add(name=name_prefix + '_conv_add_noise_{}'.format(ind))([noise_transformed, flattened_deflated]) x = Reshape((deflated_shape[1], deflated_shape[2], deflated_shape[3]), name=name_prefix + '_conv_backreshape_{}'.format(ind))(added_noise) return x deflated = Conv2D(filters=n_filters_init, kernel_size=(5, 5), strides=(2, 2), padding='same', activation='relu', name=name_prefix + '_conv_0')(inputs) if noise is not None: deflated = add_linear_noise(deflated, noise, 0) for i in range(1, n_deflation_layers): deflated = Conv2D(filters=n_filters_init * (2**i), kernel_size=(5, 5), strides=(2, 2), padding='same', activation='relu', name=name_prefix + '_conv_{}'.format(i))(deflated) # if noise is not None: # deflated = add_linear_noise(deflated, noise, i) return deflated
def call(self, inputs, mask=None): # In case the target shape is not fully defined, # we need access to the shape of x. # solution: # 1) rely on x._keras_shape # 2) fallback: K.int_shape target_shape = self.target_shape target_mask_shape = self.target_mask_shape if -1 in target_shape: # target shape not fully defined input_shape = None try: input_shape = K.int_shape(inputs) except TypeError: pass if input_shape is not None: target_shape = self.compute_output_shape(input_shape)[1:] _result = K.reshape(inputs, (-1,) + target_shape) reshaped_mask = K.reshape(mask, (-1,) + target_mask_shape + (1,)) result = _result * K.cast(reshaped_mask, K.floatx()) return result
def call(self, x, mask=None): X = x half_n = self.n // 2 input_sqr = K.square(X) if K._BACKEND == 'theano': b, ch, r, c = X.shape extra_channels = T.alloc(0., b, ch + 2*half_n, r, c) input_sqr = T.set_subtensor( extra_channels[:, half_n:half_n+ch, :, :], input_sqr) elif K._BACKEND == 'tensorflow': b, ch, r, c = K.int_shape(X) up_dims = tf.pack([tf.shape(X)[0], half_n, r, c]) up = tf.fill(up_dims, 0.0) middle = input_sqr down_dims = tf.pack([tf.shape(X)[0], half_n, r, c]) down = tf.fill(down_dims, 0.0) input_sqr = K.concatenate([up, middle, down], axis=1) scale = self.k norm_alpha = self.alpha / self.n for i in range(self.n): scale += norm_alpha * input_sqr[:, i:i+ch, :, :] scale = scale ** self.beta result = X / scale return result
def call(self, inputs, mask=None, initial_state=None, training=None): input_shape = K.int_shape(inputs) constants = self.layer.get_constants(inputs, training=None) preprocessed_input = self.layer.preprocess_input(inputs, training=None) initial_states = self.layer.get_initial_states(inputs) last_output, outputs, states = K.rnn(self.step, preprocessed_input, initial_states, go_backwards=self.layer.go_backwards, mask=mask, constants=constants, unroll=self.layer.unroll, input_length=input_shape[1]) return outputs if self.layer.return_sequences else last_output
def lin_interpolation_2d(inp, dim): num_rows, num_cols, num_filters = K.int_shape(inp)[1:] conv = SeparableConv2D(num_filters, (num_rows, num_cols), use_bias=False) x = conv(inp) w = conv.get_weights() w[0].fill(0) w[1].fill(0) linspace = linspace_2d(num_rows, num_cols, dim=dim) for i in range(num_filters): w[0][:,:, i, 0] = linspace[:,:] w[1][0, 0, i, i] = 1. conv.set_weights(w) conv.trainable = False x = Lambda(lambda x: K.squeeze(x, axis=1))(x) x = Lambda(lambda x: K.squeeze(x, axis=1))(x) x = Lambda(lambda x: K.expand_dims(x, axis=-1))(x) return x
def weighted_dice_loss(y_true, y_pred): y_true = K.cast(y_true, 'float32') y_pred = K.cast(y_pred, 'float32') # if we want to get same size of output, kernel size must be odd number if K.int_shape(y_pred)[1] == 128: kernel_size = 11 elif K.int_shape(y_pred)[1] == 256: kernel_size = 21 elif K.int_shape(y_pred)[1] == 512: kernel_size = 21 elif K.int_shape(y_pred)[1] == 1024: kernel_size = 41 else: raise ValueError('Unexpected image size') averaged_mask = K.pool2d( y_true, pool_size=(kernel_size, kernel_size), strides=(1, 1), padding='same', pool_mode='avg') border = K.cast(K.greater(averaged_mask, 0.005), 'float32') * K.cast(K.less(averaged_mask, 0.995), 'float32') weight = K.ones_like(averaged_mask) w0 = K.sum(weight) weight += border * 2 w1 = K.sum(weight) weight *= (w0 / w1) loss = 1 - weighted_dice_coeff(y_true, y_pred, weight) return loss
def weighted_bce_dice_loss(y_true, y_pred): y_true = K.cast(y_true, 'float32') y_pred = K.cast(y_pred, 'float32') # if we want to get same size of output, kernel size must be odd number if K.int_shape(y_pred)[1] == 128: kernel_size = 11 elif K.int_shape(y_pred)[1] == 256: kernel_size = 21 elif K.int_shape(y_pred)[1] == 512: kernel_size = 21 elif K.int_shape(y_pred)[1] == 1024: kernel_size = 41 else: raise ValueError('Unexpected image size') averaged_mask = K.pool2d( y_true, pool_size=(kernel_size, kernel_size), strides=(1, 1), padding='same', pool_mode='avg') border = K.cast(K.greater(averaged_mask, 0.005), 'float32') * K.cast(K.less(averaged_mask, 0.995), 'float32') weight = K.ones_like(averaged_mask) w0 = K.sum(weight) weight += border * 2 w1 = K.sum(weight) weight *= (w0 / w1) loss = weighted_bce_loss(y_true, y_pred, weight) + (1 - weighted_dice_coeff(y_true, y_pred, weight)) return loss
def get_constants(self, x): constants = [] if 0 < self.dropout_U < 1: ones = K.ones_like(K.reshape(x[:, 0, 0], (-1, 1))) ones = K.tile(ones, (1, self.output_dim)) B_U = [K.in_train_phase(K.dropout(ones, self.dropout_U), ones) for _ in range(2)] constants.append(B_U) else: constants.append([K.cast_to_floatx(1.) for _ in range(2)]) if 0 < self.dropout_W < 1: input_shape = K.int_shape(x) input_dim = input_shape[-1] ones = K.ones_like(K.reshape(x[:, 0, 0], (-1, 1))) ones = K.tile(ones, (1, int(input_dim))) B_W = [K.in_train_phase(K.dropout(ones, self.dropout_W), ones) for _ in range(2)] constants.append(B_W) else: constants.append([K.cast_to_floatx(1.) for _ in range(2)]) return constants
def get_constants(self, x): constants = [] if 0 < self.dropout_U < 1: ones = K.ones_like(K.reshape(x[:, 0, 0], (-1, 1))) ones = K.tile(ones, (1, self.input_dim)) B_U = [K.in_train_phase(K.dropout(ones, self.dropout_U), ones) for _ in range(4)] constants.append(B_U) else: constants.append([K.cast_to_floatx(1.) for _ in range(4)]) if 0 < self.dropout_W < 1: input_shape = K.int_shape(x) input_dim = input_shape[-1] ones = K.ones_like(K.reshape(x[:, 0, 0], (-1, 1))) ones = K.tile(ones, (1, int(input_dim))) B_W = [K.in_train_phase(K.dropout(ones, self.dropout_W), ones) for _ in range(4)] constants.append(B_W) else: constants.append([K.cast_to_floatx(1.) for _ in range(4)]) return constants
def softmax_sparse_crossentropy_ignoring_last_label(y_true, y_pred): y_pred = K.reshape(y_pred, (-1, K.int_shape(y_pred)[-1])) log_softmax = tf.nn.log_softmax(y_pred) y_true = K.one_hot(tf.to_int32(K.flatten(y_true)), K.int_shape(y_pred)[-1]+1) unpacked = tf.unstack(y_true, axis=-1) y_true = tf.stack(unpacked[:-1], axis=-1) cross_entropy = -K.sum(y_true * log_softmax, axis=1) cross_entropy_mean = K.mean(cross_entropy) return cross_entropy_mean # Softmax cross-entropy loss function for coco segmentation # and models which expect but do not apply sigmoid on each entry # tensorlow only
def test_vgg_conv(): if K.image_data_format() == 'channels_first': x = Input(shape=(3, 224, 224)) y1_shape = (None, 64, 112, 112) y2_shape = (None, 128, 56, 56) else: x = Input(shape=(224, 224, 3)) y1_shape = (None, 112, 112, 64) y2_shape = (None, 56, 56, 128) block1 = vgg_conv(filters=64, convs=2, block_name='block1') y = block1(x) assert K.int_shape(y) == y1_shape block2 = vgg_conv(filters=128, convs=2, block_name='block2') y = block2(y) assert K.int_shape(y) == y2_shape
def test_vgg_decoder(): if K.image_data_format() == 'channels_last': inputs = Input(shape=(500, 500, 3)) pool3 = Input(shape=(88, 88, 256)) pool4 = Input(shape=(44, 44, 512)) drop7 = Input(shape=(16, 16, 4096)) score_shape = (None, 500, 500, 21) else: inputs = Input(shape=(3, 500, 500)) pool3 = Input(shape=(256, 88, 88)) pool4 = Input(shape=(512, 44, 44)) drop7 = Input(shape=(4096, 16, 16)) score_shape = (None, 21, 500, 500) pyramid = [drop7, pool4, pool3, inputs] scales = [1., 1e-2, 1e-4] score = VGGDecoder(pyramid, scales, classes=21) assert K.int_shape(score) == score_shape
def test_vgg_upsampler(): if K.image_data_format() == 'channels_last': inputs = Input(shape=(500, 500, 3)) pool3 = Input(shape=(63, 63, 256)) pool4 = Input(shape=(32, 32, 512)) drop7 = Input(shape=(16, 16, 4096)) score_shape = (None, 500, 500, 21) else: inputs = Input(shape=(3, 500, 500)) pool3 = Input(shape=(256, 63, 63)) pool4 = Input(shape=(512, 32, 32)) drop7 = Input(shape=(4096, 16, 16)) score_shape = (None, 21, 500, 500) pyramid = [drop7, pool4, pool3, inputs] scales = [1., 1e-2, 1e-4] score = VGGUpsampler(pyramid, scales, classes=21) assert K.int_shape(score) == score_shape
def VGGUpsampler(pyramid, scales, classes, weight_decay=0.): """A Functional upsampler for the VGG Nets. :param: pyramid: A list of features in pyramid, scaling from large receptive field to small receptive field. The bottom of the pyramid is the input image. :param: scales: A list of weights for each of the feature map in the pyramid, sorted in the same order as the pyramid. :param: classes: Integer, number of classes. """ if len(scales) != len(pyramid) - 1: raise ValueError('`scales` needs to match the length of' '`pyramid` - 1.') blocks = [] for i in range(len(pyramid) - 1): block_name = 'feat{}'.format(i + 1) block = vgg_upsampling(classes=classes, target_shape=K.int_shape(pyramid[i + 1]), scale=scales[i], weight_decay=weight_decay, block_name=block_name) blocks.append(block) return Decoder(pyramid=pyramid[:-1], blocks=blocks)
def preprocess_input(self, inputs, training=None): if self.implementation == 0: input_shape = K.int_shape(inputs) input_dim = input_shape[2] timesteps = input_shape[1] x_i = _time_distributed_dense(inputs, self.kernel_i, self.bias_i, self.dropout, input_dim, self.units, timesteps, training=training) x_f = _time_distributed_dense(inputs, self.kernel_f, self.bias_f, self.dropout, input_dim, self.units, timesteps, training=training) x_c = _time_distributed_dense(inputs, self.kernel_c, self.bias_c, self.dropout, input_dim, self.units, timesteps, training=training) x_o = _time_distributed_dense(inputs, self.kernel_o, self.bias_o, self.dropout, input_dim, self.units, timesteps, training=training) return K.concatenate([x_i, x_f, x_c, x_o], axis=2) else: return inputs
def build(input_shape, num_outputs, block_fn, repetitions): inputs = Input(shape = input_shape) conv1 = Conv2D(64, (7, 7), strides = (2, 2), padding = 'same')(inputs) conv1 = BatchNormalization()(conv1) conv1 = Activation('relu')(conv1) pool1 = MaxPooling2D(pool_size = (3, 3), strides = (2, 2), padding = 'same')(conv1) x = pool1 filters = 64 first_layer = True for i, r in enumerate(repetitions): x = _residual_block(block_fn, filters = filters, repetitions = r, is_first_layer = first_layer)(x) filters *= 2 if first_layer: first_layer = False # last activation <- unnecessary??? # x = BatchNormalization()(x) # x = Activation('relu')(x) _, w, h, ch = K.int_shape(x) pool2 = AveragePooling2D(pool_size = (w, h), strides = (1, 1))(x) flat1 = Flatten()(pool2) outputs = Dense(num_outputs, kernel_initializer = init, activation = 'softmax')(flat1) model = Model(inputs = inputs, outputs = outputs) return model
def categorical_crossentropy_3d_SW(y_true_sw, y_predicted): """ Computes categorical cross-entropy loss for a softmax distribution in a hot-encoded 3D array with shape (num_samples, num_classes, dim1, dim2, dim3) Parameters ---------- y_true : keras.placeholder [batches, dim0,dim1,dim2] Placeholder for data holding the ground-truth labels encoded in a one-hot representation y_predicted : keras.placeholder [batches,channels,dim0,dim1,dim2] Placeholder for data holding the softmax distribution over classes Returns ------- scalar Categorical cross-entropy loss value """ sw = y_true_sw[:,:,:,:,K.int_shape(y_predicted)[-1]:] y_true = y_true_sw[:,:,:,:,:K.int_shape(y_predicted)[-1]] y_true_flatten = K.flatten(y_true*sw) y_pred_flatten = K.flatten(y_predicted) y_pred_flatten_log = -K.log(y_pred_flatten + K.epsilon()) num_total_elements = K.sum(y_true_flatten) # cross_entropy = K.dot(y_true_flatten, K.transpose(y_pred_flatten_log)) cross_entropy = tf.reduce_sum(tf.multiply(y_true_flatten, y_pred_flatten_log)) mean_cross_entropy = cross_entropy / (num_total_elements + K.epsilon()) return mean_cross_entropy
def wasserstein_disagreement_map(prediction, ground_truth, M): """ Function to calculate the pixel-wise Wasserstein distance between the flattened pred_proba and the flattened labels (ground_truth) with respect to the distance matrix on the label space M. :param prediction: the logits after softmax :param ground_truth: segmentation ground_truth :param M: distance matrix on the label space :return: the pixelwise distance map (wass_dis_map) """ # pixel-wise Wassertein distance (W) between flat_pred_proba and flat_labels # wrt the distance matrix on the label space M n_classes = K.int_shape(prediction)[-1] # unstack_labels = tf.unstack(ground_truth, axis=-1) ground_truth = tf.cast(ground_truth, dtype=tf.float64) # unstack_pred = tf.unstack(prediction, axis=-1) prediction = tf.cast(prediction, dtype=tf.float64) # print("shape of M", M.shape, "unstacked labels", unstack_labels, # "unstacked pred" ,unstack_pred) # W is a weighting sum of all pairwise correlations (pred_ci x labels_cj) pairwise_correlations = [] for i in range(n_classes): for j in range(n_classes): pairwise_correlations.append( M[i, j] * tf.multiply(prediction[:,i], ground_truth[:,j])) wass_dis_map = tf.add_n(pairwise_correlations) return wass_dis_map
def generalised_wasserstein_dice_loss(y_true, y_predicted ): """ Function to calculate the Generalised Wasserstein Dice Loss defined in Fidon, L. et. al. (2017) Generalised Wasserstein Dice Score for Imbalanced Multi-class Segmentation using Holistic Convolutional Networks. MICCAI 2017 (BrainLes) :param prediction: the logits (before softmax) :param ground_truth: the segmentation ground_truth :param weight_map: :return: the loss """ # apply softmax to pred scores n_classes = K.int_shape(y_predicted)[-1] ground_truth = tf.cast(tf.reshape(y_true,(-1,n_classes)), dtype=tf.int64) pred_proba = tf.cast(tf.reshape(y_predicted,(-1,n_classes)), dtype=tf.float64) # M = tf.cast(M, dtype=tf.float64) # compute disagreement map (delta) M = M_tree_4 # print("M shape is ", M.shape, pred_proba, one_hot) delta = wasserstein_disagreement_map(pred_proba, ground_truth, M) # compute generalisation of all error for multi-class seg all_error = tf.reduce_sum(delta) # compute generalisation of true positives for multi-class seg one_hot = tf.cast(ground_truth, dtype=tf.float64) true_pos = tf.reduce_sum( tf.multiply(tf.constant(M[0, :n_classes], dtype=tf.float64), one_hot), axis=1) true_pos = tf.reduce_sum(tf.multiply(true_pos, 1. - delta), axis=0) WGDL = 1. - (2. * true_pos) / (2. * true_pos + all_error) return tf.cast(WGDL, dtype=tf.float32)
def mask_tensor(x): tensor, mask = x rep = K.int_shape(tensor)[4] return tensor*K.repeat_elements(mask, rep, axis=4)
def fill_background_mask(x): tensor, mask = x rep = K.int_shape(tensor)[4] - 1 full_mask = K.ones_like(mask) - mask K.repeat_elements(mask, rep, axis=4) return tensor + full_mask