util.py

#
import numpy as np
import tensorflow as tf
#

Orthogonal Initializer

Coding this wasn’t part of the plan. I previously used TensorFlow orthogonal initializer. But it used a lot of GPU memory and sometimes crashed with a memory allocation error during initialization. I didn’t test much to see what was happening; instead, I just copied this code from OpenAI Baselines.

class Orthogonal(object):
#
#
    def __init__(self, scale=1.):
        self.scale = scale
#

Lasagne orthogonal initializer

    def __call__(self, shape, dtype=None, partition_info=None):
#
        shape = tuple(shape)
        if len(shape) == 2:
            flat_shape = shape
        elif len(shape) == 4:  # assumes NHWC
            flat_shape = (np.prod(shape[:-1]), shape[-1])
        else:
            raise NotImplementedError
        a = np.random.normal(0.0, 1.0, flat_shape)
        u, _, v = np.linalg.svd(a, full_matrices=False)
        q = u if u.shape == flat_shape else v  # pick the one with the correct shape
        q = q.reshape(shape)
        return (self.scale * q[:shape[0], :shape[1]]).astype(np.float32)
#
    def get_config(self):
        return {
            'scale': self.scale
        }
def huber_loss(x, delta=1.0):
#
    return tf.where(
        tf.abs(x) < delta,
        tf.square(x) * 0.5,
        delta * (tf.abs(x) - 0.5 * delta)
    )
#

Piecewise schedule

class PiecewiseSchedule(object):
#
#

Initialize

endpoints is list of pairs (x, y). The values between endpoints are linearly interpolated. y values outside the range covered by x are outside_value.

    def __init__(self, endpoints, outside_value):
#
#

(x, y) pairs should be sorted

        indexes = [e[0] for e in endpoints]
        assert indexes == sorted(indexes)

        self._outside_value = outside_value
        self._endpoints = endpoints
#

Find y for given x

    def __call__(self, x):
#
#

iterate through each segment

        for (x1, y1), (x2, y2) in zip(self._endpoints[:-1], self._endpoints[1:]):
#

interpolate if x is within the segment

            if x1 <= x < x2:
                dx = float(x - x1) / (x2 - x1)
                return y1 + dx * (y2 - y1)
#

return outside value otherwise

        return self._outside_value