#!/usr/bin/python | |

# | |

# Copyright 2015 Google Inc. | |

# | |

# Licensed under the Apache License, Version 2.0 (the "License"); | |

# you may not use this file except in compliance with the License. | |

# You may obtain a copy of the License at | |

# | |

# http://www.apache.org/licenses/LICENSE-2.0 | |

# | |

# Unless required by applicable law or agreed to in writing, software | |

# distributed under the License is distributed on an "AS IS" BASIS, | |

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |

# See the License for the specific language governing permissions and | |

# limitations under the License. | |

"""Ops for graph construction.""" | |

import tensorflow as tf | |

from google.protobuf import text_format | |

from tensorflow.python.platform import gfile | |

from tensorflow.python.platform import logging | |

from biology import utils as model_utils | |

def AddBias(tensor, init=None, name=None): | |

"""Add a bias term to a tensor. | |

Args: | |

tensor: Variable tensor. | |

init: Bias initializer. Defaults to zero. | |

name: Name for this op. Defaults to tensor.op.name. | |

Returns: | |

A biased tensor with the same shape as the input tensor. | |

""" | |

if init is None: | |

init = tf.zeros([tensor.get_shape()[-1].value]) | |

with tf.op_scope([tensor], name, tensor.op.name): | |

b = tf.Variable(init, name='b') | |

return tf.nn.bias_add(tensor, b) | |

def BatchNormalize(tensor, convolution, mask=None, epsilon=0.001, | |

scale_after_normalization=True, decay=0.999, | |

global_step=None, name=None): | |

"""Batch normalization. | |

Normalize, scale, and shift the input tensor to reduce covariate shift. | |

NOTE(user): For inference, the mean and variance must be set to fixed | |

values derived from the entire training set. This is accomplished by using | |

moving_mean and moving_variance during evaluation. Be sure that models run | |

the ops in updates during training or the moving averages will not be very | |

useful! | |

Args: | |

tensor: Input tensor (must be 4D). | |

convolution: If True, perform normalization across rows and columns as | |

well as over batch. | |

mask: Mask to apply to tensor. | |

epsilon: Small float to avoid dividing by zero. | |

scale_after_normalization: If True, multiply by gamma. If False, gamma is | |

not used. When the next layer is linear (also e.g. ReLU), this can be | |

disabled since the scaling can be done by the next layer. | |

decay: Float value for moving average decay. | |

global_step: Tensor containing global step for accelerating moving averages | |

at the beginning of training. | |

name: Name for this op. Defaults to 'batch_norm'. | |

Returns: | |

A new tensor corresponding to the batch normalized input. | |

Raises: | |

ValueError: If the input tensor is not 4D. | |

""" | |

if len(tensor.get_shape()) != 4: | |

raise ValueError('Input tensor must be 4D, not %dD' | |

% len(tensor.get_shape())) | |

if convolution: | |

axes = [0, 1, 2] | |

shape = tensor.get_shape()[3:] | |

else: | |

axes = [0] | |

shape = tensor.get_shape()[1:] | |

with tf.op_scope([tensor], None, 'BatchNormalize'): | |

if mask is not None: | |

mean, variance = model_utils.Moment( | |

2, tensor, reduction_indices=axes, mask=mask) | |

else: | |

mean, variance = tf.nn.moments(tensor, axes) | |

# Keep track of moving averages for mean and variance. During eval, use the | |

# moving averages from training. | |

mean_moving_average = MovingAverage(mean, global_step, decay) | |

variance_moving_average = MovingAverage(variance, global_step, decay) | |

if not IsTraining(): | |

mean = mean_moving_average | |

variance = variance_moving_average | |

beta = tf.Variable(tf.zeros(shape), name='beta') | |

gamma = tf.Variable(tf.constant(1.0, shape=shape), name='gamma') | |

if convolution: | |

batch_norm = tf.nn.batch_norm_with_global_normalization( | |

tensor, mean, variance, beta, gamma, epsilon, | |

scale_after_normalization) | |

else: | |

batch_norm = (tensor - mean) * tf.rsqrt(variance + epsilon) | |

if scale_after_normalization: | |

batch_norm *= gamma | |

batch_norm += beta | |

if mask is not None: | |

batch_norm = model_utils.Mask(batch_norm, mask) | |

return batch_norm | |

def MovingAverage(tensor, global_step, decay=0.999): | |

"""Create a variable that contains the moving average of a tensor. | |

Adds a tf.identity and special namescope to ensure the tensor | |

is colocated with its Variable on the parameter server. | |

See http://g/tensorflow-users/PAAXYLlybNs/xA0z-x1qEwAJ | |

and replicated_model.py#NameScopeDevicePicker for context. | |

Args: | |

tensor: Tensor to calculate moving average of. | |

global_step: Variable containing the number of global steps. | |

decay: Float for exponential decay of moving average. | |

Returns: | |

A tf.Variable containing the moving average of the input tensor. | |

""" | |

exponential_moving_average = tf.train.ExponentialMovingAverage( | |

decay=decay, num_updates=global_step) | |

update_op = exponential_moving_average.apply([tensor]) | |

tf.get_default_graph().add_to_collection('updates', update_op) | |

return exponential_moving_average.average(tensor) | |

def Dropout(tensor, dropout_prob, training_only=True): | |

"""Random dropout. | |

This implementation supports "always-on" dropout (training_only=False), which | |

can be used to calculate model uncertainty. See Gal and Ghahramani, | |

http://arxiv.org/abs/1506.02142. | |

NOTE(user): To simplify the implementation, I have chosen not to reverse | |

the scaling that occurs in tf.nn.dropout when using dropout during | |

inference. This shouldn't be an issue since the activations will be scaled | |

by the same constant in both training and inference. This means that there | |

are no training-time differences between networks that use dropout during | |

inference and those that do not. | |

Args: | |

tensor: Input tensor. | |

dropout_prob: Float giving dropout probability for weights (NOT keep | |

probability). | |

training_only: Boolean. If True (standard dropout), apply dropout only | |

during training. If False, apply dropout during inference as well. | |

Returns: | |

A tensor with the same shape as the input tensor. | |

""" | |

if not dropout_prob: | |

return tensor # do nothing | |

keep_prob = 1.0 - dropout_prob | |

if IsTraining() or not training_only: | |

tensor = tf.nn.dropout(tensor, keep_prob) | |

return tensor | |

def FullyConnectedLayer(tensor, size, weight_init=None, bias_init=None, | |

name=None): | |

"""Fully connected layer. | |

Args: | |

tensor: Input tensor. | |

size: Number of nodes in this layer. | |

weight_init: Weight initializer. | |

bias_init: Bias initializer. | |

name: Name for this op. Defaults to 'fully_connected'. | |

Returns: | |

A new tensor representing the output of the fully connected layer. | |

Raises: | |

ValueError: If input tensor is not 2D. | |

""" | |

if len(tensor.get_shape()) != 2: | |

raise ValueError('Dense layer input must be 2D, not %dD' | |

% len(tensor.get_shape())) | |

if weight_init is None: | |

num_features = tensor.get_shape()[-1].value | |

weight_init = tf.truncated_normal([num_features, size], stddev=0.01) | |

if bias_init is None: | |

bias_init = tf.zeros([size]) | |

with tf.op_scope([tensor], name, 'fully_connected'): | |

w = tf.Variable(weight_init, name='w') | |

b = tf.Variable(bias_init, name='b') | |

return tf.nn.xw_plus_b(tensor, w, b) | |

def IsTraining(): | |

"""Determine whether the default graph is in training mode. | |

Returns: | |

A boolean value indicating whether the default graph is in training mode. | |

Raises: | |

ValueError: If the 'train' collection in the default graph does not contain | |

exactly one element. | |

""" | |

train = tf.get_collection('train') | |

if not train: | |

raise ValueError('Training mode is not set. Please call SetTraining.') | |

elif len(train) > 1: | |

raise ValueError('Training mode has more than one setting.') | |

return train[0] | |

def SetTraining(train): | |

"""Set the training mode of the default graph. | |

This operation may only be called once for a given graph. | |

Args: | |

train: If True, graph is in training mode. | |

Raises: | |

AssertionError: If the default graph already has this value set. | |

""" | |

if tf.get_collection('train'): | |

raise AssertionError('Training mode already set: %s' % | |

tf.get_collection('train')) | |

tf.add_to_collection('train', train) | |

def MultitaskLogits(features, num_tasks, num_classes=2, weight_init=None, | |

bias_init=None, dropout=None, name=None): | |

"""Create a logit tensor for each classification task. | |

Args: | |

features: A 2D tensor with dimensions batch_size x num_features. | |

num_tasks: Number of classification tasks. | |

num_classes: Number of classes for each task. | |

weight_init: Weight initializer. | |

bias_init: Bias initializer. | |

dropout: Float giving dropout probability for weights (NOT keep | |

probability). | |

name: Name for this op. Defaults to 'multitask_logits'. | |

Returns: | |

A list of logit tensors; one for each classification task. | |

""" | |

logits = [] | |

with tf.name_scope('multitask_logits'): | |

for task_idx in range(num_tasks): | |

with tf.op_scope([features], name, | |

('task' + str(task_idx).zfill(len(str(num_tasks))))): | |

logits.append( | |

Logits(features, num_classes, weight_init=weight_init, | |

bias_init=bias_init, dropout=dropout)) | |

return logits | |

def Logits(features, num_classes=2, weight_init=None, bias_init=None, | |

dropout=None, name=None): | |

"""Create a logits tensor for a single classification task. | |

You almost certainly don't want dropout on there -- it's like randomly setting | |

the (unscaled) probability of a target class to 0.5. | |

Args: | |

features: A 2D tensor with dimensions batch_size x num_features. | |

num_classes: Number of classes for each task. | |

weight_init: Weight initializer. | |

bias_init: Bias initializer. | |

dropout: Float giving dropout probability for weights (NOT keep | |

probability). | |

name: Name for this op. | |

Returns: | |

A logits tensor with shape batch_size x num_classes. | |

""" | |

with tf.op_scope([features], name, 'logits') as name: | |

return Dropout( | |

FullyConnectedLayer(features, num_classes, weight_init=weight_init, | |

bias_init=bias_init, name=name), | |

dropout) | |

def SoftmaxN(tensor, name=None): | |

"""Apply softmax across last dimension of a tensor. | |

Args: | |

tensor: Input tensor. | |

name: Name for this op. If None, defaults to 'SoftmaxN'. | |

Returns: | |

A tensor with softmax-normalized values on the last dimension. | |

""" | |

with tf.op_scope([tensor], name, 'SoftmaxN'): | |

exp_tensor = tf.exp(tensor) | |

reduction_indices = [tensor.get_shape().ndims - 1] | |

return tf.div(exp_tensor, | |

tf.reduce_sum(exp_tensor, | |

reduction_indices=reduction_indices, | |

keep_dims=True)) | |

def Transform(tensor, transform, convolution=True, mask=None): | |

"""Apply a transform to a tensor. | |

Args: | |

tensor: Input tensor. | |

transform: String description of transform. Supported values are 'bias' | |

and 'batch_norm'. | |

convolution: If True, assume tensor is the output of a convolution. | |

mask: Mask to apply to tensor. | |

Returns: | |

A tensor with the same shape as the input tensor. | |

Raises: | |

ValueError: If the input tensor is not 3D or 4D. | |

""" | |

if len(tensor.get_shape()) not in [2, 3, 4]: | |

raise ValueError('Input tensor must be 2D, 3D or 4D, not %dD.' | |

% len(tensor.get_shape())) | |

with tensor.graph.as_default(): | |

if transform == 'batch_norm': | |

# batch normalization requires 4D input | |

if len(tensor.get_shape()) != 4: | |

# 3D case: add one extra dimension | |

if len(tensor.get_shape()) == 3: | |

squeeze = [2] | |

tensor = tf.expand_dims(tensor, 2) | |

if mask is not None: | |

mask = tf.expand_dims(mask, -1) | |

# 2D case: add two extra dimensions | |

else: | |

squeeze = [1, 2] | |

tensor = tf.expand_dims(tf.expand_dims(tensor, -2), -2) | |

if mask is not None: | |

mask = tf.expand_dims(tf.expand_dims(mask, -1), -1) | |

tensor = BatchNormalize(tensor, convolution=convolution, mask=mask) | |

tensor = tf.squeeze(tensor, squeeze) | |

else: | |

tensor = BatchNormalize(tensor, convolution=convolution, mask=mask) | |

elif transform == 'bias': | |

tensor = AddBias(tensor, init=tf.constant( | |

1.0, shape=[tensor.get_shape()[-1].value])) | |

if mask is not None: | |

tensor = model_utils.Mask(tensor, mask) | |

return tensor |