Source code for gpflow.kernels.misc

# Copyright 2018-2020 The GPflow Contributors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Optional

import numpy as np
import tensorflow as tf

from ..base import AnyNDArray, Parameter, TensorType
from ..experimental.check_shapes import check_shape as cs
from ..experimental.check_shapes import check_shapes, inherit_check_shapes
from ..utilities import positive, to_default_float
from .base import ActiveDims, Kernel


[docs]class ArcCosine(Kernel): """ The Arc-cosine family of kernels which mimics the computation in neural networks. The order parameter specifies the assumed activation function. The Multi Layer Perceptron (MLP) kernel is closely related to the ArcCosine kernel of order 0. The key reference is :cite:t:`NIPS2009_3628`. """ implemented_orders = {0, 1, 2} @check_shapes( "variance: []", "weight_variances: [broadcast n_active_dims]", "bias_variance: []", ) def __init__( self, order: int = 0, variance: TensorType = 1.0, weight_variances: TensorType = 1.0, bias_variance: TensorType = 1.0, *, active_dims: Optional[ActiveDims] = None, name: Optional[str] = None, ) -> None: """ :param order: specifies the activation function of the neural network the function is a rectified monomial of the chosen order :param variance: the (initial) value for the variance parameter :param weight_variances: the (initial) value for the weight_variances parameter, to induce ARD behaviour this must be initialised as an array the same length as the the number of active dimensions e.g. [1., 1., 1.] :param bias_variance: the (initial) value for the bias_variance parameter defaults to 1.0 :param active_dims: a slice or list specifying which columns of X are used """ super().__init__(active_dims=active_dims, name=name) if order not in self.implemented_orders: raise ValueError("Requested kernel order is not implemented.") self.order = order self.variance = Parameter(variance, transform=positive()) self.bias_variance = Parameter(bias_variance, transform=positive()) self.weight_variances = Parameter(weight_variances, transform=positive()) self._validate_ard_active_dims(self.weight_variances) @property def ard(self) -> bool: """ Whether ARD behaviour is active. """ ndims: int = self.weight_variances.shape.ndims return ndims > 0 @check_shapes( "X: [batch..., N, D]", "return: [batch..., N]", ) def _diag_weighted_product(self, X: TensorType) -> tf.Tensor: return tf.reduce_sum(self.weight_variances * tf.square(X), axis=-1) + self.bias_variance @check_shapes( "X: [batch..., N, D]", "X2: [batch2..., N2, D]", "return: [batch..., N, batch2..., N2] if X2 is not None", "return: [batch..., N, N] if X2 is None", ) def _full_weighted_product(self, X: TensorType, X2: Optional[TensorType]) -> tf.Tensor: if X2 is None: return ( tf.linalg.matmul((self.weight_variances * X), X, transpose_b=True) + self.bias_variance ) else: D = tf.shape(X)[-1] N = tf.shape(X)[-2] N2 = tf.shape(X2)[-2] batch = tf.shape(X)[:-2] batch2 = tf.shape(X2)[:-2] rank = tf.rank(X) - 2 rank2 = tf.rank(X2) - 2 ones = tf.ones((rank,), tf.int32) ones2 = tf.ones((rank2,), tf.int32) X = cs( tf.reshape(X, tf.concat([batch, ones2, [N, D]], 0)), "[batch..., broadcast batch2..., N, D]", ) X2 = cs( tf.reshape(X2, tf.concat([ones, batch2, [N2, D]], 0)), "[broadcast batch..., batch2..., N2, D]", ) result = cs( tf.linalg.matmul((self.weight_variances * X), X2, transpose_b=True) + self.bias_variance, "[batch..., batch2..., N, N2]", ) indices = tf.concat( [ tf.range(rank), [rank + rank2], tf.range(rank2) + rank, [rank + rank2 + 1], ], axis=0, ) return tf.transpose(result, indices) @check_shapes( "theta: [any...]", "return: [any...]", ) def _J(self, theta: TensorType) -> TensorType: """ Implements the order dependent family of functions defined in equations 4 to 7 in the reference paper. """ if self.order == 0: return np.pi - theta elif self.order == 1: return tf.sin(theta) + (np.pi - theta) * tf.cos(theta) else: assert self.order == 2, f"Don't know how to handle order {self.order}." return 3.0 * tf.sin(theta) * tf.cos(theta) + (np.pi - theta) * ( 1.0 + 2.0 * tf.cos(theta) ** 2 ) @inherit_check_shapes def K(self, X: TensorType, X2: Optional[TensorType] = None) -> tf.Tensor: X_denominator = cs(tf.sqrt(self._diag_weighted_product(X)), "[batch..., N]") if X2 is None: X2_denominator = cs(X_denominator[..., None, :], "[batch..., 1, N]") X_denominator = cs(X_denominator[..., :, None], "[batch..., N, 1]") numerator = cs(self._full_weighted_product(X, None), "[batch..., N, N]") else: X2_denominator = cs(tf.sqrt(self._diag_weighted_product(X2)), "[batch2..., N2]") batch = tf.shape(X)[:-1] batch2 = tf.shape(X2)[:-1] ones = tf.ones((tf.rank(X) - 1,), tf.int32) ones2 = tf.ones((tf.rank(X2) - 1,), tf.int32) X_denominator = cs( tf.reshape(X_denominator, tf.concat([batch, ones2], 0)), "[batch..., N, broadcast batch2..., 1]", ) X2_denominator = cs( tf.reshape(X2_denominator, tf.concat([ones, batch2], 0)), "[broadcast batch..., 1, batch2..., N2]", ) numerator = cs(self._full_weighted_product(X, X2), "[batch..., N, batch2..., N2]") cos_theta = numerator / X_denominator / X2_denominator jitter = 1e-15 theta = tf.acos(jitter + (1 - 2 * jitter) * cos_theta) return ( self.variance * (1.0 / np.pi) * self._J(theta) * X_denominator ** self.order * X2_denominator ** self.order ) @inherit_check_shapes def K_diag(self, X: TensorType) -> tf.Tensor: X_product = self._diag_weighted_product(X) const = (1.0 / np.pi) * self._J(to_default_float(0.0)) return self.variance * const * X_product ** self.order
[docs]class Coregion(Kernel): """ A Coregionalization kernel. The inputs to this kernel are _integers_ (we cast them from floats as needed) which usually specify the *outputs* of a Coregionalization model. The kernel function is an indexing of a positive-definite matrix: K(x, y) = B[x, y] . To ensure that B is positive-definite, it is specified by the two parameters of this kernel, W and kappa: B = W Wᵀ + diag(kappa) . We refer to the size of B as "output_dim x output_dim", since this is the number of outputs in a coregionalization model. We refer to the number of columns on W as 'rank': it is the number of degrees of correlation between the outputs. NB. There is a symmetry between the elements of W, which creates a local minimum at W=0. To avoid this, it is recommended to initialize the optimization (or MCMC chain) using a random W. """ def __init__( self, output_dim: int, rank: int, *, active_dims: Optional[ActiveDims] = None, name: Optional[str] = None, ) -> None: """ :param output_dim: number of outputs expected (0 <= X < output_dim) :param rank: number of degrees of correlation between outputs """ # assert input_dim == 1, "Coregion kernel in 1D only" super().__init__(active_dims=active_dims, name=name) self.output_dim = output_dim self.rank = rank W: AnyNDArray = 0.1 * np.ones((self.output_dim, self.rank)) kappa = np.ones(self.output_dim) self.W = Parameter(W) self.kappa = Parameter(kappa, transform=positive()) @check_shapes( "return: [P, P]", ) def output_covariance(self) -> tf.Tensor: B = tf.linalg.matmul(self.W, self.W, transpose_b=True) + tf.linalg.diag(self.kappa) return B @check_shapes( "return: [P]", ) def output_variance(self) -> tf.Tensor: B_diag = tf.reduce_sum(tf.square(self.W), 1) + self.kappa return B_diag @inherit_check_shapes def K(self, X: TensorType, X2: Optional[TensorType] = None) -> tf.Tensor: cs(X, "[batch..., N, 1] # The `Coregion` kernel requires a 1D input space.") B = cs(self.output_covariance(), "[O, O]") X = cs(tf.cast(X[..., 0], tf.int32), "[batch..., N]") if X2 is None: batch = tf.shape(X)[:-1] N = tf.shape(X)[-1] O = tf.shape(B)[-1] result = cs(tf.gather(B, X), "[batch..., N, O]") result = cs(tf.reshape(result, [-1, N, O]), "[flat_batch, N, O]") flat_X = cs(tf.reshape(X, [-1, N]), "[flat_batch, N]") result = cs(tf.gather(result, flat_X, axis=2, batch_dims=1), "[flat_batch, N, N]") result = cs(tf.reshape(result, tf.concat([batch, [N, N]], 0)), "[batch..., N, N]") else: X2 = cs(tf.cast(X2[..., 0], tf.int32), "[batch2..., N2]") rank2 = tf.rank(X2) result = cs(tf.gather(B, X2), "[batch2..., N2, O]") result = cs( tf.transpose(result, tf.concat([[rank2], tf.range(rank2)], 0)), "[O, batch2..., N2]" ) result = cs(tf.gather(result, X), "[batch..., N, batch2..., N2]") return result @inherit_check_shapes def K_diag(self, X: TensorType) -> tf.Tensor: cs(X, "[batch..., N, 1] # The `Coregion` kernel requires a 1D input space.") X = tf.cast(X[..., 0], tf.int32) B_diag = self.output_variance() return tf.gather(B_diag, X)