Mixing TensorFlow models with GPflow#

This notebook explores the combination of Keras TensorFlow neural networks with GPflow models.

from typing import Dict, Optional, Tuple

import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfds
from scipy.cluster.vq import kmeans2

import gpflow
from gpflow.ci_utils import reduce_in_tests
from gpflow.utilities import to_default_float

iterations = reduce_in_tests(100)
Convolutional network inside a GPflow model#

original_dataset, info = tfds.load(
    name="mnist", split=tfds.Split.TRAIN, with_info=True
total_num_data = info.splits["train"].num_examples
image_shape = info.features["image"].shape
image_size = tf.reduce_prod(image_shape)
batch_size = 32

def map_fn(input_slice: Dict[str, tf.Tensor]):
    updated = input_slice
    image = to_default_float(updated["image"]) / 255.0
    label = to_default_float(updated["label"])
    return tf.reshape(image, [-1, image_size]), label

autotune = tf.data.experimental.AUTOTUNE
dataset = (
    .batch(batch_size, drop_remainder=True)
    .map(map_fn, num_parallel_calls=autotune)
Here we’ll use the GPflow functionality, but put a non-GPflow model inside the kernel.
Vanilla ConvNet. This gets 97.3% accuracy on MNIST when used on its own (+ final linear layer) after 20K iterations
class KernelWithConvNN(gpflow.kernels.Kernel):
    def __init__(
        image_shape: Tuple,
        output_dim: int,
        base_kernel: gpflow.kernels.Kernel,
        batch_size: Optional[int] = None,
        with self.name_scope:
            self.base_kernel = base_kernel
            input_size = int(tf.reduce_prod(image_shape))
            input_shape = (input_size,)

            self.cnn = tf.keras.Sequential(
                        input_shape=input_shape, batch_size=batch_size
                    tf.keras.layers.MaxPool2D(pool_size=(2, 2), strides=2),
                        kernel_size=(5, 5),
                    tf.keras.layers.MaxPool2D(pool_size=(2, 2), strides=2),
                    tf.keras.layers.Dense(output_dim, activation="relu"),


    def K(
        self, a_input: tf.Tensor, b_input: Optional[tf.Tensor] = None
    ) -> tf.Tensor:
        transformed_a = self.cnn(a_input)
        transformed_b = self.cnn(b_input) if b_input is not None else b_input
        return self.base_kernel.K(transformed_a, transformed_b)

    def K_diag(self, a_input: tf.Tensor) -> tf.Tensor:
        transformed_a = self.cnn(a_input)
        return self.base_kernel.K_diag(transformed_a)

\(K_{uf}\) is in ConvNN output space, therefore we need to update Kuf multidispatch.

class KernelSpaceInducingPoints(gpflow.inducing_variables.InducingPoints):

@gpflow.covariances.Kuu.register(KernelSpaceInducingPoints, KernelWithConvNN)
def Kuu(inducing_variable, kernel, jitter=None):
    func = gpflow.covariances.Kuu.dispatch(
        gpflow.inducing_variables.InducingPoints, gpflow.kernels.Kernel
    return func(inducing_variable, kernel.base_kernel, jitter=jitter)

    KernelSpaceInducingPoints, KernelWithConvNN, object
def Kuf(inducing_variable, kernel, a_input):
    return kernel.base_kernel(inducing_variable.Z, kernel.cnn(a_input))

Now we are ready to create and initialize the model:

num_mnist_classes = 10
output_dim = 5
num_inducing_points = 100
images_subset, labels_subset = next(iter(dataset.batch(32)))
images_subset = tf.reshape(images_subset, [-1, image_size])
labels_subset = tf.reshape(labels_subset, [-1, 1])

kernel = KernelWithConvNN(

likelihood = gpflow.likelihoods.MultiClass(num_mnist_classes)

inducing_variable_kmeans = kmeans2(
    images_subset.numpy(), num_inducing_points, minit="points"
inducing_variable_cnn = kernel.cnn(inducing_variable_kmeans)
inducing_variable = KernelSpaceInducingPoints(inducing_variable_cnn)

model = gpflow.models.SVGP(
And start optimization:

data_iterator = iter(dataset)
adam_opt = tf.optimizers.Adam(0.001)

training_loss = model.training_loss_closure(data_iterator)

def optimization_step():
    adam_opt.minimize(training_loss, var_list=model.trainable_variables)

for _ in range(iterations):

Let’s do predictions after training. Don’t expect that we will get a good accuracy, because we haven’t run training for long enough.

m, v = model.predict_y(images_subset)
preds = np.argmax(m, 1).reshape(labels_subset.numpy().shape)
correct = preds == labels_subset.numpy().astype(int)
acc = np.average(correct.astype(float)) * 100.0

print("Accuracy is {:.4f}%".format(acc))
Accuracy is 64.9414%