我们从Python开源项目中,提取了以下45个代码示例,用于说明如何使用tensorflow.python.framework.ops.colocate_with()。
def assign_sub(self, delta, name=None): """Mimic the updates to the variable. Args: delta: is pushed into a staging buffer and will be pumped later. name: currently ignored; names of ops and the StagingArea are computed without using this pass name. Returns: The actual updates. The colocation constraint will be reapplied. """ # This parameter is ignored: the StagingArea only supports setting # the shared name, not the names of individual ops it uses. del name # colocate_with(None, True) clears the colocation constraints. # Push the delta into a staging buffer. with ops.colocate_with(None, True), tf.device(self.var_stage_get.device): delta_staging_area = data_flow_ops.StagingArea( [self.var_stage_get.dtype], shapes=[self.var_stage_get.shape]) delta_put_op = delta_staging_area.put([delta]) self.variable_mgr.staging_delta_ops.append(delta_put_op) delta_get_op = delta_staging_area.get()[0] # Return the actual updates. The colocation constraint will be reapplied. return self.real_var.assign_sub(delta_get_op)
def _compute_euclidean_distance(cls, inputs, clusters): """Computes Euclidean distance between each input and each cluster center. Args: inputs: list of input Tensors. clusters: cluster Tensor. Returns: list of Tensors, where each element corresponds to each element in inputs. The value is the distance of each row to all the cluster centers. """ output = [] for inp in inputs: with ops.colocate_with(inp): # Computes Euclidean distance. Note the first and third terms are # broadcast additions. squared_distance = (tf.reduce_sum(tf.square(inp), 1, keep_dims=True) - 2 * tf.matmul(inp, clusters, transpose_b=True) + tf.transpose(tf.reduce_sum(tf.square(clusters), 1, keep_dims=True))) output.append(squared_distance) return output
def _compute_cosine_distance(cls, inputs, clusters, inputs_normalized=True): """Computes cosine distance between each input and each cluster center. Args: inputs: list of input Tensor. clusters: cluster Tensor inputs_normalized: if True, it assumes that inp and clusters are normalized and computes the dot product which is equivalent to the cosine distance. Else it L2 normalizes the inputs first. Returns: list of Tensors, where each element corresponds to each element in inp. The value is the distance of each row to all the cluster centers. """ output = [] if not inputs_normalized: with ops.colocate_with(clusters): clusters = tf.nn.l2_normalize(clusters, dim=1) for inp in inputs: with ops.colocate_with(inp): if not inputs_normalized: inp = tf.nn.l2_normalize(inp, dim=1) output.append(1 - tf.matmul(inp, clusters, transpose_b=True)) return output
def _prepare_gramian(self, factors, gramian): """Helper function to create ops to prepare/calculate gramian. Args: factors: Variable or list of Variable representing (sharded) factors. Used to compute the updated corresponding gramian value. gramian: Variable storing the gramian calculated from the factors. Returns: A op that updates the gramian with the calcuated value from the factors. """ partial_gramians = [] for f in factors: with ops.colocate_with(f): partial_gramians.append(tf.matmul(f, f, transpose_a=True)) with ops.colocate_with(gramian): prep_gramian = tf.assign(gramian, tf.add_n(partial_gramians)).op return prep_gramian
def scatter_update(cls, factor, indices, values, sharding_func): """Helper function for doing sharded scatter update.""" assert isinstance(factor, list) if len(factor) == 1: with ops.colocate_with(factor[0]): # TODO(agarwal): assign instead of scatter update for full batch update. return tf.scatter_update(factor[0], indices, values).op else: num_shards = len(factor) assignments, new_ids = sharding_func(indices) assert assignments is not None assignments = tf.cast(assignments, tf.int32) sharded_ids = tf.dynamic_partition(new_ids, assignments, num_shards) sharded_values = tf.dynamic_partition(values, assignments, num_shards) updates = [] for i in xrange(num_shards): updates.append(tf.scatter_update(factor[i], sharded_ids[i], sharded_values[i])) return tf.group(*updates)
def _compute_euclidean_distance(cls, inputs, clusters): """Computes Euclidean distance between each input and each cluster center. Args: inputs: list of input Tensors. clusters: cluster Tensor. Returns: list of Tensors, where each element corresponds to each element in inputs. The value is the distance of each row to all the cluster centers. """ output = [] for inp in inputs: with ops.colocate_with(inp): # Computes Euclidean distance. Note the first and third terms are # broadcast additions. squared_distance = (math_ops.reduce_sum( math_ops.square(inp), 1, keep_dims=True) - 2 * math_ops.matmul( inp, clusters, transpose_b=True) + array_ops.transpose( math_ops.reduce_sum( math_ops.square(clusters), 1, keep_dims=True))) output.append(squared_distance) return output
def _compute_cosine_distance(cls, inputs, clusters, inputs_normalized=True): """Computes cosine distance between each input and each cluster center. Args: inputs: list of input Tensor. clusters: cluster Tensor inputs_normalized: if True, it assumes that inp and clusters are normalized and computes the dot product which is equivalent to the cosine distance. Else it L2 normalizes the inputs first. Returns: list of Tensors, where each element corresponds to each element in inp. The value is the distance of each row to all the cluster centers. """ output = [] if not inputs_normalized: with ops.colocate_with(clusters): clusters = nn_impl.l2_normalize(clusters, dim=1) for inp in inputs: with ops.colocate_with(inp): if not inputs_normalized: inp = nn_impl.l2_normalize(inp, dim=1) output.append(1 - math_ops.matmul(inp, clusters, transpose_b=True)) return output
def _prepare_gramian(self, factors, gramian): """Helper function to create ops to prepare/calculate gramian. Args: factors: Variable or list of Variable representing (sharded) factors. Used to compute the updated corresponding gramian value. gramian: Variable storing the gramian calculated from the factors. Returns: A op that updates the gramian with the calcuated value from the factors. """ partial_gramians = [] for f in factors: with ops.colocate_with(f): partial_gramians.append(math_ops.matmul(f, f, transpose_a=True)) with ops.colocate_with(gramian): prep_gramian = state_ops.assign(gramian, math_ops.add_n(partial_gramians)).op return prep_gramian
def scatter_update(cls, factor, indices, values, sharding_func): """Helper function for doing sharded scatter update.""" assert isinstance(factor, list) if len(factor) == 1: with ops.colocate_with(factor[0]): # TODO(agarwal): assign instead of scatter update for full batch update. return state_ops.scatter_update(factor[0], indices, values).op else: num_shards = len(factor) assignments, new_ids = sharding_func(indices) assert assignments is not None assignments = math_ops.cast(assignments, dtypes.int32) sharded_ids = data_flow_ops.dynamic_partition(new_ids, assignments, num_shards) sharded_values = data_flow_ops.dynamic_partition(values, assignments, num_shards) updates = [] for i in xrange(num_shards): updates.append( state_ops.scatter_update(factor[i], sharded_ids[i], sharded_values[ i])) return control_flow_ops.group(*updates)
def _ref(self): """Return the underlying variable ref, required by tf.colocate_with.""" return self.real_var._ref() # pylint: disable=protected-access
def _noise_dense(self, var): updated_var_value = var._ref() # pylint: disable=protected-access noise = tf.random_normal(shape = tf.shape(var), stddev = self._temp * tf.sqrt(2 * self._learning_rate)) with colocate_with(var): return var.assign_add(noise, use_locking=self._use_locking)
def _noise_sparse(self, grad, var): assert isinstance(grad, tf.IndexedSlices) noise = tf.random_normal(shape = tf.shape(grad.values), stddev = self._temp * tf.sqrt(2 * self._learning_rate)) noise_sparse = tf.IndexedSlices(noise, grad.indices, grad.dense_shape) with colocate_with(var): return var.scatter_sub(noise_sparse, use_locking=self._use_locking)
def _noise_dense(self, var): updated_var_value = var._ref() # pylint: disable=protected-access pcder = tf.sqrt(self._opt.get_slot(var, name="rms") + self._epsilon) noise = tf.random_normal(shape = tf.shape(var), stddev = self._temp * tf.sqrt(2 * self._learning_rate / pcder)) with colocate_with(var): return var.assign_add(noise, use_locking=self._use_locking)
def _noise_sparse(self, grad, var): assert isinstance(grad, tf.IndexedSlices) rms = self._opt.get_slot(var, name="rms") rms_sparse = tf.gather(rms, grad.indices) pcder = tf.sqrt(rms_sparse + self._epsilon) noise = tf.random_normal(shape = tf.shape(grad.values), stddev = self._temp * tf.sqrt(2 * self._learning_rate / pcder)) noise_sparse = tf.IndexedSlices(noise, grad.indices, grad.dense_shape) with colocate_with(var): return var.scatter_sub(noise_sparse, use_locking=self._use_locking)
def before_apply(self): self._moving_averager = tf.train.ExponentialMovingAverage( decay=self._beta, zero_debias=self._zero_debias) assert self._grads is not None and len(self._grads) > 0 before_apply_ops = [] # get per var g**2 and norm**2 self._grad_squared = [] self._grad_norm_squared = [] for v, g in zip(self._tvars, self._grads): if g is None: continue with ops.colocate_with(v): self._grad_squared.append(tf.square(g)) self._grad_norm_squared = [ tf.reduce_sum(grad_squared) for grad_squared in self._grad_squared] if self._sparsity_debias: avg_op_sparsity = self.grad_sparsity() before_apply_ops.append(avg_op_sparsity) # the following running average on squared norm of gradient is shared # by `grad_variance` and `dist_to_opt` avg_op = self._moving_averager.apply(self._grad_norm_squared) with tf.control_dependencies([avg_op]): self._grad_norm_squared_avg = [self._moving_averager.average(val) for val in self._grad_norm_squared] self._grad_norm_squared = tf.add_n(self._grad_norm_squared) self._grad_norm_squared_avg = tf.add_n(self._grad_norm_squared_avg) before_apply_ops.append(avg_op) with tf.control_dependencies([avg_op]): curv_range_ops = self.curvature_range() before_apply_ops += curv_range_ops grad_var_ops = self.grad_variance() before_apply_ops += grad_var_ops dist_to_opt_ops = self.dist_to_opt() before_apply_ops += dist_to_opt_ops return tf.group(*before_apply_ops)
def _infer_graph(self, inputs, clusters): """Maps input to closest cluster and the score. Args: inputs: list of input Tensors. clusters: Tensor of cluster centers. Returns: List of tuple, where each value in tuple corresponds to a value in inp. The tuple has following three elements: all_scores: distance of each input to each cluster center. score: distance of each input to closest cluster center. cluster_idx: index of cluster center closest to the corresponding input. """ assert isinstance(inputs, list) # Pairwise distances are used only by transform(). In all other cases, this # sub-graph is not evaluated. scores = self._distance_graph(inputs, clusters, self._distance_metric) output = [] if (self._distance_metric == COSINE_DISTANCE and not self._clusters_l2_normalized()): # The cosine distance between normalized vectors x and y is the same as # 2 * squared_euclidian_distance. We are using this fact and reusing the # nearest_neighbors op. # TODO(ands): Support COSINE distance in nearest_neighbors and remove # this. with ops.colocate_with(clusters): clusters = tf.nn.l2_normalize(clusters, dim=1) for inp, score in zip(inputs, scores): with ops.colocate_with(inp): (indices, distances) = gen_clustering_ops.nearest_neighbors(inp, clusters, 1) if self._distance_metric == COSINE_DISTANCE: distances *= 0.5 output.append((score, tf.squeeze(distances), tf.squeeze(indices))) return zip(*output)
def _full_batch_training_op(self, inputs, cluster_idx_list, cluster_centers): """Creates an op for training for full batch case. Args: inputs: list of input Tensors. cluster_idx_list: A vector (or list of vectors). Each element in the vector corresponds to an input row in 'inp' and specifies the cluster id corresponding to the input. cluster_centers: Tensor Ref of cluster centers. Returns: An op for doing an update of mini-batch k-means. """ cluster_sums = [] cluster_counts = [] epsilon = tf.constant(1e-6, dtype=inputs[0].dtype) for inp, cluster_idx in zip(inputs, cluster_idx_list): with ops.colocate_with(inp): cluster_sums.append(tf.unsorted_segment_sum(inp, cluster_idx, self._num_clusters)) cluster_counts.append(tf.unsorted_segment_sum( tf.reshape(tf.ones(tf.reshape(tf.shape(inp)[0], [-1])), [-1, 1]), cluster_idx, self._num_clusters)) with ops.colocate_with(cluster_centers): new_clusters_centers = tf.add_n(cluster_sums) / ( tf.cast(tf.add_n(cluster_counts), cluster_sums[0].dtype) + epsilon) if self._clusters_l2_normalized(): new_clusters_centers = tf.nn.l2_normalize(new_clusters_centers, dim=1) return tf.assign(cluster_centers, new_clusters_centers)
def _clip_dense(self, var): with self._maybe_colocate_with(var): updated_var_value = array_ops.identity(var.ref()) normalized_var = clip_ops.clip_by_norm( updated_var_value, self._max_norm, self._vars_to_clip_dims[var]) delta = updated_var_value - normalized_var with ops.colocate_with(var): return var.assign_sub(delta, use_locking=self._use_locking)
def _maybe_colocate_with(self, var): """Context to colocate with `var` if `colocate_clip_ops_with_vars`.""" if self._colocate_clip_ops_with_vars: with ops.colocate_with(var): yield else: yield
def _clip_dense(self, var): with self._maybe_colocate_with(var): updated_var_value = var._ref() # pylint: disable=protected-access normalized_var = clip_ops.clip_by_norm( updated_var_value, self._max_norm, self._vars_to_clip_dims[var]) delta = updated_var_value - normalized_var with ops.colocate_with(var): return var.assign_sub(delta, use_locking=self._use_locking)
def assign_moving_average(variable, value, decay, name=None): """Compute the moving average of a variable. The moving average of 'variable' updated with 'value' is: variable * decay + value * (1 - decay) The returned Operation sets 'variable' to the newly computed moving average. The new value of 'variable' can be set with the 'AssignSub' op as: variable -= (1 - decay) * (variable - value) Args: variable: A Variable. value: A tensor with the same shape as 'variable' decay: A float Tensor or float value. The moving average decay. name: Optional name of the returned operation. Returns: An Operation that updates 'variable' with the newly computed moving average. """ with ops.op_scope([variable, value, decay], name, "AssignMovingAvg") as scope: with ops.colocate_with(variable): decay = ops.convert_to_tensor(1.0 - decay, name="decay") if decay.dtype != variable.dtype.base_dtype: decay = math_ops.cast(decay, variable.dtype.base_dtype) return state_ops.assign_sub(variable, (variable - value) * decay, name=scope)
def after_apply(self): self._moving_averager = tf.train.ExponentialMovingAverage(decay=self._beta, zero_debias=self._zero_debias) assert self._grads != None and len(self._grads) > 0 after_apply_ops = [] # get per var g**2 and norm**2 self._grad_squared = [] self._grad_norm_squared = [] for v, g in zip(self._tvars, self._grads): with ops.colocate_with(v): self._grad_squared.append(tf.square(g) ) self._grad_norm_squared = [tf.reduce_sum(grad_squared) for grad_squared in self._grad_squared] # the following running average on squared norm of gradient is shared by grad_var and dist_to_opt avg_op = self._moving_averager.apply(self._grad_norm_squared) with tf.control_dependencies([avg_op] ): self._grad_norm_squared_avg = [self._moving_averager.average(val) for val in self._grad_norm_squared] self._grad_norm_squared = tf.add_n(self._grad_norm_squared) self._grad_norm_squared_avg = tf.add_n(self._grad_norm_squared_avg) after_apply_ops.append(avg_op) with tf.control_dependencies([avg_op] ): curv_range_ops = self.curvature_range() after_apply_ops += curv_range_ops grad_var_ops = self.grad_variance() after_apply_ops += grad_var_ops dist_to_opt_ops = self.dist_to_opt() after_apply_ops += dist_to_opt_ops return tf.group(*after_apply_ops)
def _FloatyGatherGrad(op, grad): if op.inputs[0].get_shape().is_fully_defined(): dense_shape = constant_op.constant(op.inputs[0].get_shape().as_list()) values_shape = [-1] + op.inputs[0].get_shape()[1:].as_list() else: # op.inputs[0] can be large, so colocate the shape calculation with it. with ops.colocate_with(op.inputs[0]): dense_shape = array_ops.shape(op.inputs[0]) values_shape = array_ops.concat(0, [[-1], dense_shape[1:]]) values = array_ops.reshape(grad, values_shape) indices = math_ops.to_int32(array_ops.reshape(op.inputs[1], [-1])) return [ops.IndexedSlices(values, indices, dense_shape), None]
def _parse_tensor_or_dict(self, features): if isinstance(features, dict): keys = sorted(features.keys()) with ops.colocate_with(features[keys[0]]): features = array_ops.concat([features[k] for k in keys], 1) return features
def _infer_graph(self, inputs, clusters): """Maps input to closest cluster and the score. Args: inputs: list of input Tensors. clusters: Tensor of cluster centers. Returns: List of tuple, where each value in tuple corresponds to a value in inp. The tuple has following three elements: all_scores: distance of each input to each cluster center. score: distance of each input to closest cluster center. cluster_idx: index of cluster center closest to the corresponding input. """ assert isinstance(inputs, list) # Pairwise distances are used only by transform(). In all other cases, this # sub-graph is not evaluated. scores = self._distance_graph(inputs, clusters, self._distance_metric) output = [] if (self._distance_metric == COSINE_DISTANCE and not self._clusters_l2_normalized()): # The cosine distance between normalized vectors x and y is the same as # 2 * squared_euclidian_distance. We are using this fact and reusing the # nearest_neighbors op. # TODO(ands): Support COSINE distance in nearest_neighbors and remove # this. with ops.colocate_with(clusters): clusters = nn_impl.l2_normalize(clusters, dim=1) for inp, score in zip(inputs, scores): with ops.colocate_with(inp): (indices, distances) = gen_clustering_ops.nearest_neighbors(inp, clusters, 1) if self._distance_metric == COSINE_DISTANCE: distances *= 0.5 output.append( (score, array_ops.squeeze(distances), array_ops.squeeze(indices))) return zip(*output)
def _l2_normalize_data(cls, inputs): """Normalized the input data.""" output = [] for inp in inputs: with ops.colocate_with(inp): output.append(nn_impl.l2_normalize(inp, dim=1)) return output
def get_mean_baseline(ema_decay=0.99, name=None): """ExponentialMovingAverage baseline. EMA initializes to 0, which introduces a bias. This baseline implements the bias correction term from Adam (section 3 of https://arxiv.org/pdf/1412.6980v8.pdf), dividing by `1 - ema_decay^t`, where `t` is the step count. Args: ema_decay: decay rate for the ExponentialMovingAverage. name: name for variable scope of the ExponentialMovingAverage. Returns: Callable baseline function that takes the `DistributionTensor` (unused) and the downstream `loss`, and returns an EMA of the loss. """ def mean_baseline(_, loss): with vs.variable_scope(name, default_name="MeanBaseline"): reduced_loss = math_ops.reduce_mean(loss) ema = training.ExponentialMovingAverage(decay=ema_decay) update_op = ema.apply([reduced_loss]) # The bias correction term requires keeping track of how many times the # EMA has been updated. Creating a variable here to do so. The global step # is not used because it may or may not track exactly the number of times # the EMA is updated. ema_var = ema.average(reduced_loss) assert ema_var is not None with ops.colocate_with(ema_var): num_updates = vs.get_variable( "local_ema_step", initializer=0, trainable=False) num_updates = num_updates.assign_add(1) bias_correction = 1. - math_ops.pow(ema_decay, math_ops.cast( num_updates, reduced_loss.dtype)) with ops.control_dependencies([update_op]): baseline = ema.average(reduced_loss) / bias_correction return baseline return mean_baseline
def save_state(self, state_name, value, name=None): """Returns an op to save the current batch of state `state_name`. Args: state_name: string, matches a key provided in `initial_states`. value: A `Tensor`. Its type must match that of `initial_states[state_name].dtype`. If we had at input: ```python initial_states[state_name].get_shape() == [d1, d2, ...]
then the shape of `value` must match: ```python tf.shape(value) == [batch_size, d1, d2, ...] ``` name: string (optional). The name scope for newly created ops. Returns: A control flow op that stores the new state of each entry into the state saver. This op must be run for every iteration that accesses data from the state saver (otherwise the state saver will never progress through its states and run out of capacity). Raises: KeyError: if `state_name` does not match any of the initial states declared in `initial_states`. """ if state_name not in self._state_saver._received_states.keys(): raise KeyError("state was not declared: %s" % state_name) default_name = "InputQueueingStateSaver_SaveState" with ops.name_scope(name, default_name, values=[value]): # Place all operations on the CPU. Barriers and queues are only # implemented for CPU, but all the other book-keeping operations # (reshape, shape, range, ...) would be placed on GPUs if available, # unless we explicitly tie them to CPU. with ops.colocate_with(self._state_saver._capacity_queue.queue_ref): indices_where_not_done = array_ops.reshape(array_ops.where( math_ops.logical_not(self._state_saver._sequence_is_done)), [-1]) keeping_next_key = array_ops.gather( self._state_saver._received_next_key, indices_where_not_done) value = _check_shape( array_ops.identity(value, name="convert_%s" % state_name), array_ops.shape(self._state_saver._received_states[state_name])) keeping_state = array_ops.gather(value, indices_where_not_done) return self._state_saver._barrier.insert_many( self._state_saver._get_barrier_index("state", state_name), keeping_next_key, keeping_state, name="BarrierInsertState_%s" % state_name)
```
def _cached_copy(self, var, name, pass_through=False): """Helper function to create a worker cached copy of a Variable. This assigns the var (either a single Variable or a list of Variables) to local transient cache Variable(s). Note that if var is a list of Variables, the assignment is done sequentially to minimize the memory overheads. Also note that if pass_through is set to True, this does not create new Variables but simply return the input back. Args: var: A Variable or a list of Variables to cache. name: name of cached Variable. pass_through: when set to True, this simply pass through the var back through identity operator and does not actually creates a cache. Returns: Tuple consisting of following three entries: cache: the new transient Variable or list of transient Variables corresponding one-to-one with var. cache_init: op to initialize the Variable or the list of Variables. cache_reset: op to reset the Variable or the list of Variables to some default value. """ if var is None: return None, None, None elif pass_through: cache = var cache_init = tf.no_op() cache_reset = tf.no_op() elif isinstance(var, tf.Variable): cache = WALSModel._transient_var(name=name) with ops.colocate_with(cache): cache_init = tf.assign(cache, var, validate_shape=False) cache_reset = tf.assign(cache, 1.0, validate_shape=False) else: assert isinstance(var, list) assert var cache = [WALSModel._transient_var(name='%s_shard_%d' % (name, i)) for i in xrange(len(var))] reset_ops = [] for i, c in enumerate(cache): with ops.colocate_with(c): if i == 0: cache_init = tf.assign(c, var[i], validate_shape=False) else: with ops.control_dependencies([cache_init]): cache_init = tf.assign(c, var[i], validate_shape=False) reset_ops.append(tf.assign(c, 1.0, validate_shape=False)) cache_reset = tf.group(*reset_ops) return cache, cache_init, cache_reset
def _AddRestoreOps(self, filename_tensor, vars_to_save, restore_sequentially, reshape, preferred_shard=-1, name="restore_all"): """Add operations to restore vars_to_save. Args: filename_tensor: Tensor for the path of the file to load. vars_to_save: A list of _VarToSave objects. restore_sequentially: True if we want to restore variables sequentially within a shard. reshape: True if we want to reshape loaded tensors to the shape of the corresponding variable. preferred_shard: Shard to open first when loading a sharded file. name: Name for the returned op. Returns: An Operation that restores the variables. """ assign_ops = [] for vs in vars_to_save: v = vs.var restore_control_inputs = assign_ops[-1:] if restore_sequentially else [] # Load and optionally reshape on the CPU, as string tensors are not # available on the GPU. # TODO(touts): Re-enable restore on GPU when we can support annotating # string tensors as "HostMemory" inputs. with ops.device(graph_util.set_cpu0(v.device) if v.device else None): with ops.control_dependencies(restore_control_inputs): values = self.restore_op(filename_tensor, vs, preferred_shard) if reshape: shape = v.get_shape() if not shape.is_fully_defined(): shape = array_ops.shape(v) values = array_ops.reshape(values, shape) # Assign on the same device as the variable. validate_shape = not reshape and v.get_shape().is_fully_defined() with ops.colocate_with(v): assign_ops.append(state_ops.assign(v, values, validate_shape=validate_shape)) # Create a Noop that has control dependencies from all the updates. return control_flow_ops.group(*assign_ops, name=name)
def _cached_copy(self, var, name, pass_through=False): """Helper function to create a worker cached copy of a Variable. This assigns the var (either a single Variable or a list of Variables) to local transient cache Variable(s). Note that if var is a list of Variables, the assignment is done sequentially to minimize the memory overheads. Also note that if pass_through is set to True, this does not create new Variables but simply return the input back. Args: var: A Variable or a list of Variables to cache. name: name of cached Variable. pass_through: when set to True, this simply pass through the var back through identity operator and does not actually creates a cache. Returns: Tuple consisting of following three entries: cache: the new transient Variable or list of transient Variables corresponding one-to-one with var. cache_init: op to initialize the Variable or the list of Variables. cache_reset: op to reset the Variable or the list of Variables to some default value. """ if var is None: return None, None, None elif pass_through: cache = var cache_init = control_flow_ops.no_op() cache_reset = control_flow_ops.no_op() elif isinstance(var, variables.Variable): cache = WALSModel._transient_var(name=name) with ops.colocate_with(cache): cache_init = state_ops.assign(cache, var, validate_shape=False) cache_reset = state_ops.assign(cache, 1.0, validate_shape=False) else: assert isinstance(var, list) assert var cache = [ WALSModel._transient_var(name="%s_shard_%d" % (name, i)) for i in xrange(len(var)) ] reset_ops = [] for i, c in enumerate(cache): with ops.colocate_with(c): if i == 0: cache_init = state_ops.assign(c, var[i], validate_shape=False) else: with ops.control_dependencies([cache_init]): cache_init = state_ops.assign(c, var[i], validate_shape=False) reset_ops.append(state_ops.assign(c, 1.0, validate_shape=False)) cache_reset = control_flow_ops.group(*reset_ops) return cache, cache_init, cache_reset
def seek_next(string_list, shuffle=False, seed=None, num_epochs=None): """Returns an op that seeks the next element in a list of strings. Seeking happens in a round robin fashion. This op creates a variable called obtain_next_counter that is initialized to -1 and is used to keep track of which element in the list was returned, and a variable obtain_next_expanded_list to hold the list. If num_epochs is not None, then we limit the number of times we go around the string_list before OutOfRangeError is thrown. It creates a variable to keep track of this. Args: string_list: A list of strings. shuffle: If true, we shuffle the string_list differently for each epoch. seed: Seed used for shuffling. num_epochs: Returns OutOfRangeError once string_list has been repeated num_epoch times. If unspecified then keeps on looping. Returns: An op that produces the next element in the provided list. """ expanded_list = _create_list(string_list, shuffle, seed, num_epochs) with variable_scope.variable_scope("obtain_next"): counter = variable_scope.get_variable( name="obtain_next_counter", initializer=constant_op.constant( -1, dtype=dtypes.int64), dtype=dtypes.int64) with ops.colocate_with(counter): string_tensor = variable_scope.get_variable( name="obtain_next_expanded_list", initializer=constant_op.constant(expanded_list), dtype=dtypes.string) if num_epochs: filename_counter = variable_scope.get_variable( name="obtain_next_filename_counter", initializer=constant_op.constant( 0, dtype=dtypes.int64), dtype=dtypes.int64) c = filename_counter.count_up_to(len(expanded_list)) with ops.control_dependencies([c]): return obtain_next(string_tensor, counter) else: return obtain_next(string_tensor, counter)