Source code for gpflow.utilities.ops

# Copyright 2017-2020 The GPflow Contributors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import copy
from typing import Any, Callable, List, Optional, Union

import tensorflow as tf
import tensorflow_probability as tfp

from ..base import AnyNDArray
from ..experimental.check_shapes import check_shapes


[docs]@check_shapes( "value: [any...]", "return: [any...]", ) def cast( value: Union[tf.Tensor, AnyNDArray], dtype: tf.DType, name: Optional[str] = None ) -> tf.Tensor: if not tf.is_tensor(value): # TODO(awav): Release TF2.2 resolves this issue # workaround for https://github.com/tensorflow/tensorflow/issues/35938 return tf.convert_to_tensor(value, dtype, name=name) return tf.cast(value, dtype, name=name)
[docs]@check_shapes( "value: []", "return: [N, N]", ) def eye(num: int, value: tf.Tensor, dtype: Optional[tf.DType] = None) -> tf.Tensor: if dtype is not None: value = cast(value, dtype) return tf.linalg.diag(tf.fill([num], value))
[docs]@check_shapes( "tensor: [any...]", "return: [transposed_any...]", ) def leading_transpose(tensor: tf.Tensor, perm: List[Any], leading_dim: int = 0) -> tf.Tensor: """ Transposes tensors with leading dimensions. Leading dimensions in permutation list represented via ellipsis `...` and is of type List[Union[int, type(...)] (please note, due to mypy issues, List[Any] is used instead). When leading dimensions are found, `transpose` method considers them as a single grouped element indexed by 0 in `perm` list. So, passing `perm=[-2, ..., -1]`, you assume that your input tensor has [..., A, B] shape, and you want to move leading dims between A and B dimensions. Dimension indices in permutation list can be negative or positive. Valid positive indices start from 1 up to the tensor rank, viewing leading dimensions `...` as zero index. Example:: a = tf.random.normal((1, 2, 3, 4, 5, 6)) # [..., A, B, C], # where A is 1st element, # B is 2nd element and # C is 3rd element in # permutation list, # leading dimensions are [1, 2, 3] # which are 0th element in permutation list b = leading_transpose(a, [3, -3, ..., -2]) # [C, A, ..., B] sess.run(b).shape output> (6, 4, 1, 2, 3, 5) :param tensor: TensorFlow tensor. :param perm: List of permutation indices. :returns: TensorFlow tensor. :raises ValueError: when `...` cannot be found. """ perm = copy.copy(perm) idx = perm.index(...) perm[idx] = leading_dim rank = tf.rank(tensor) perm_tf = perm % rank leading_dims = tf.range(rank - len(perm) + 1) perm = tf.concat([perm_tf[:idx], leading_dims, perm_tf[idx + 1 :]], 0) return tf.transpose(tensor, perm)
[docs]@check_shapes( "a: [a_shape...]", "b: [b_shape...]", "return: [a_shape..., b_shape...]", ) def broadcasting_elementwise( op: Callable[[tf.Tensor, tf.Tensor], tf.Tensor], a: tf.Tensor, b: tf.Tensor ) -> tf.Tensor: """ Apply binary operation `op` to every pair in tensors `a` and `b`. :param op: binary operator on tensors, e.g. tf.add, tf.substract """ flatres = op(tf.reshape(a, [-1, 1]), tf.reshape(b, [1, -1])) return tf.reshape(flatres, tf.concat([tf.shape(a), tf.shape(b)], 0))
[docs]@check_shapes( "X: [batch..., N, D]", "X2: [batch2..., N2, D]", "return: [batch..., N, batch2..., N2] if X2 is not None", "return: [batch..., N, N] if X2 is None", ) def square_distance(X: tf.Tensor, X2: Optional[tf.Tensor]) -> tf.Tensor: """ Returns ||X - X2ᵀ||² Due to the implementation and floating-point imprecision, the result may actually be very slightly negative for entries very close to each other. """ if X2 is None: Xs = tf.reduce_sum(tf.square(X), axis=-1, keepdims=True) dist = -2 * tf.matmul(X, X, transpose_b=True) dist += Xs + tf.linalg.adjoint(Xs) return dist Xs = tf.reduce_sum(tf.square(X), axis=-1) X2s = tf.reduce_sum(tf.square(X2), axis=-1) dist = -2 * tf.tensordot(X, X2, [[-1], [-1]]) dist += broadcasting_elementwise(tf.add, Xs, X2s) return dist
[docs]@check_shapes( "X: [batch..., N, D]", "X2: [batch2..., N2, D]", "return: [batch..., N, batch2..., N2, D] if X2 is not None", "return: [batch..., N, N, D] if X2 is None", ) def difference_matrix(X: tf.Tensor, X2: Optional[tf.Tensor]) -> tf.Tensor: """ Returns (X - X2ᵀ) """ if X2 is None: X2 = X diff = X[..., :, tf.newaxis, :] - X2[..., tf.newaxis, :, :] return diff Xshape = tf.shape(X) X2shape = tf.shape(X2) X = tf.reshape(X, (-1, Xshape[-1])) X2 = tf.reshape(X2, (-1, X2shape[-1])) diff = X[:, tf.newaxis, :] - X2[tf.newaxis, :, :] diff = tf.reshape(diff, tf.concat((Xshape[:-1], X2shape[:-1], [Xshape[-1]]), 0)) return diff
[docs]@check_shapes( "X: [N, D]", "latent_dim: []", "return: [N, Q]", ) def pca_reduce(X: tf.Tensor, latent_dim: tf.Tensor) -> tf.Tensor: """ Linearly reduce the dimensionality of the input points `X` to `latent_dim` dimensions. :param X: Data to reduce. :param latent_dim: Number of latent dimension, Q < D. :return: PCA projection array. """ if latent_dim > X.shape[1]: # pragma: no cover raise ValueError("Cannot have more latent dimensions than observed") X_cov = tfp.stats.covariance(X) evals, evecs = tf.linalg.eigh(X_cov) W = evecs[:, -latent_dim:] return (X - tf.reduce_mean(X, axis=0, keepdims=True)) @ W