Gaussian process regression with varying output noise#

This notebook shows how to construct a Gaussian process model where different noise is assumed for different data points. The model is:

\begin{align} f(\cdot) &\sim \mathcal{GP}\big(0, k(\cdot, \cdot)\big) \\ y_i | f, x_i &\sim \mathcal N\big(y_i; f(x_i), \sigma^2_i\big) \end{align}

We’ll demonstrate three methods for specifying the data-point specific noise: * First we’ll show how to fit the noise variance to a simple function. * In the second demonstration we’ll assume that the data comes from two different groups and show how to learn separate noise variance for the groups. * Third we’ll assume you have multiple samples at each location \(x\) and show how to use the empirical variance.

[1]:
import os
import warnings

warnings.simplefilter("ignore")
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
[2]:
from typing import Optional, Tuple

import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf

import gpflow as gf
from gpflow.ci_utils import reduce_in_tests
from gpflow.experimental.check_shapes import inherit_check_shapes
from gpflow.utilities import print_summary

%matplotlib inline

gf.config.set_default_summary_fmt("notebook")

optimizer_config = dict(maxiter=reduce_in_tests(1000))
n_data = reduce_in_tests(300)
X_plot = np.linspace(0.0, 1.0, reduce_in_tests(101))[:, None]

To help us later we’ll first define a small function for plotting data with predictions:

[3]:
def plot_distribution(
    X: gf.base.AnyNDArray,
    Y: gf.base.AnyNDArray,
    X_plot: Optional[gf.base.AnyNDArray] = None,
    mean_plot: Optional[gf.base.AnyNDArray] = None,
    var_plot: Optional[gf.base.AnyNDArray] = None,
    X_err: Optional[gf.base.AnyNDArray] = None,
    mean_err: Optional[gf.base.AnyNDArray] = None,
    var_err: Optional[gf.base.AnyNDArray] = None,
) -> None:
    plt.figure(figsize=(15, 5))

    X = X.squeeze(axis=-1)
    Y = Y.squeeze(axis=-1)
    plt.scatter(X, Y, color="gray", alpha=0.8)

    def get_confidence_bounds(
        mean: gf.base.AnyNDArray, var: gf.base.AnyNDArray
    ) -> Tuple[gf.base.AnyNDArray, gf.base.AnyNDArray]:
        std = np.sqrt(var)
        return mean - 1.96 * std, mean + 1.96 * std

    if X_plot is not None:
        assert mean_plot is not None
        assert var_plot is not None
        X_plot = X_plot.squeeze(axis=-1)
        mean_plot = mean_plot.squeeze(axis=-1)
        var_plot = var_plot.squeeze(axis=-1)
        lower_plot, upper_plot = get_confidence_bounds(mean_plot, var_plot)
        plt.fill_between(
            X_plot, lower_plot, upper_plot, color="silver", alpha=0.25
        )
        plt.plot(X_plot, lower_plot, color="silver")
        plt.plot(X_plot, upper_plot, color="silver")
        plt.plot(X_plot, mean_plot, color="black")

    if X_err is not None:
        assert mean_err is not None
        assert var_err is not None
        lower_err, upper_err = get_confidence_bounds(mean_err, var_err)
        plt.vlines(X_err, lower_err, upper_err, color="black")

    plt.xlim([0, 1])
    plt.show()

Demo 1: known noise variances#

Generate data#

We create a utility function to generate synthetic data, including noise that varies amongst the data:

[4]:
def generate_data() -> Tuple[gf.base.AnyNDArray, gf.base.AnyNDArray]:
    rng = np.random.default_rng(42)  # for reproducibility
    X = rng.uniform(0.0, 1.0, (n_data, 1))
    signal = (X - 0.5) ** 2 + 0.05
    noise_scale = 0.5 * signal
    noise = noise_scale * rng.standard_normal((n_data, 1))
    Y = signal + noise
    return X, Y


X, Y = generate_data()

The data alone looks like:

[5]:
plot_distribution(X, Y)
../../_images/notebooks_advanced_varying_noise_9_0.png

Try a naive fit#

If we try to fit a naive GPR model to this data, which assumes a single shared noise variance value for all data points - as specified by the likelihood variance parameter - we get:

[6]:
model = gf.models.GPR(
    data=(X, Y),
    kernel=gf.kernels.SquaredExponential(),
)
gf.optimizers.Scipy().minimize(
    model.training_loss, model.trainable_variables, options=optimizer_config
)
print_summary(model)
name class transform prior trainable shape dtype value
GPR.kernel.variance ParameterSoftplus True () float640.12365
GPR.kernel.lengthscalesParameterSoftplus True () float640.61646
GPR.likelihood.varianceParameterSoftplus + Shift True () float640.0057
[7]:
mean, var = model.predict_y(X_plot)
plot_distribution(X, Y, X_plot, mean.numpy(), var.numpy())
../../_images/notebooks_advanced_varying_noise_12_0.png

Notice how this naive model underestimates the noise near the center of the figure, but overestimates at for small and large values of x?

Fit a polynomial to the noise scale#

To fit a model with varying noise, instead of using the default single shared noise variance, we can create a Gaussian Likelihood with an input dependent (polynomial) Function for the scale of the noise, then pass that likelihood to our model.

[8]:
model = gf.models.GPR(
    data=(X, Y),
    kernel=gf.kernels.SquaredExponential(),
    likelihood=gf.likelihoods.Gaussian(scale=gf.functions.Polynomial(degree=2)),
)
gf.optimizers.Scipy().minimize(
    model.training_loss, model.trainable_variables, options=optimizer_config
)
print_summary(model)
name class transform prior trainable shape dtype value
GPR.kernel.variance ParameterSoftplus True () float640.24138
GPR.kernel.lengthscalesParameterSoftplus True () float640.77092
GPR.likelihood.scale.w ParameterIdentity True (1, 3) float64[[ 0.14643 -0.47433 0.47147]]
[9]:
mean, var = model.predict_y(X_plot)
plot_distribution(X, Y, X_plot, mean.numpy(), var.numpy())
../../_images/notebooks_advanced_varying_noise_16_0.png

Demo 2: grouped noise variances#

In this demo, we won’t assume that the noise variances is a function of \(x\), but we will assume that they’re known to be in two groups. This example represents a case where we might know that data has been collected by two instruments with different fidelity, but we do not know what those fidelities are.

Of course it would be straightforward to add more groups. We’ll stick with two for simplicity.

Generate data#

[10]:
def get_group(X: gf.base.AnyNDArray) -> gf.base.AnyNDArray:
    return np.where((((0.2 < X) & (X < 0.4)) | ((0.7 < X) & (X < 0.8))), 0, 1)


def generate_grouped_data() -> Tuple[gf.base.AnyNDArray, gf.base.AnyNDArray]:
    rng = np.random.default_rng(42)  # for reproducibility
    X = rng.uniform(0.0, 1.0, (n_data, 1))
    signal = np.sin(6 * X)
    noise_scale = 0.1 + 0.4 * get_group(X)
    noise = noise_scale * rng.standard_normal((n_data, 1))
    Y = signal + noise
    return X, Y


X, Y = generate_grouped_data()

And here’s a plot of the raw data:

[11]:
plot_distribution(X, Y)
../../_images/notebooks_advanced_varying_noise_21_0.png

Fit a naive model#

Again we’ll start by fitting a naive GPR, to demonstrate the problem.

[12]:
model = gf.models.GPR(
    data=(X, Y),
    kernel=gf.kernels.SquaredExponential(),
)
gf.optimizers.Scipy().minimize(
    model.training_loss, model.trainable_variables, options=optimizer_config
)
print_summary(model)
name class transform prior trainable shape dtype value
GPR.kernel.variance ParameterSoftplus True () float640.66418
GPR.kernel.lengthscalesParameterSoftplus True () float640.2537
GPR.likelihood.varianceParameterSoftplus + Shift True () float640.18977
[13]:
mean, var = model.predict_y(X_plot)
plot_distribution(X, Y, X_plot, mean.numpy(), var.numpy())
../../_images/notebooks_advanced_varying_noise_24_0.png

Data structure#

To model the different noise groups, we need to let the model know which group each data point belongs to. We’ll do that by appending a column with the group to the \(x\) data:

[14]:
X_and_group = np.hstack([X, get_group(X)])
X_plot_and_group = np.hstack([X_plot, get_group(X_plot)])

Use multiple functions for the noise variance#

To model this we will create two noise variance functions, each of which are just a constant, and then switch between them depending on the group labels.

Notice that we initialize the constant functions to a positive value. They would have defaulted to 0.0, but that wouldn’t work for a variance, which must be strictly positive.

[15]:
model = gf.models.GPR(
    data=(X_and_group, Y),
    kernel=gf.kernels.SquaredExponential(active_dims=[0]),
    likelihood=gf.likelihoods.Gaussian(
        variance=gf.functions.SwitchedFunction(
            [
                gf.functions.Constant(1.0),
                gf.functions.Constant(1.0),
            ]
        )
    ),
)
gf.optimizers.Scipy().minimize(
    model.training_loss, model.trainable_variables, options=optimizer_config
)
print_summary(model)
name class transform prior trainable shape dtype value
GPR.kernel.variance ParameterSoftplus True () float640.81565
GPR.kernel.lengthscales ParameterSoftplus True () float640.30409
GPR.likelihood.variance.functions[0].cParameterIdentity True () float640.01012
GPR.likelihood.variance.functions[1].cParameterIdentity True () float640.25862
[16]:
mean, var = model.predict_y(X_plot_and_group)
plot_distribution(X, Y, X_plot, mean.numpy(), var.numpy())
../../_images/notebooks_advanced_varying_noise_29_0.png

Demo 3: Empirical noise variance#

In this demo we will assume that you have multiple measurements at each \(x\) location, and want to use the empirical variance at each location.

Generate data#

[17]:
n_data = reduce_in_tests(20)
n_repeats = reduce_in_tests(10)


def generate_empiricial_noise_data() -> Tuple[
    gf.base.AnyNDArray, gf.base.AnyNDArray
]:
    rng = np.random.default_rng(42)  # for reproducibility
    X = rng.uniform(0.0, 1.0, (n_data, 1))
    signal = np.sin(6 * X)
    noise_scale = rng.uniform(0.1, 1.0, (n_data, 1))
    noise = noise_scale * rng.standard_normal((n_data, n_repeats))
    Y = signal + noise
    return X, Y


X, Y = generate_empiricial_noise_data()
Y_mean = np.mean(Y, axis=-1, keepdims=True)
Y_var = np.var(Y, axis=-1, keepdims=True)

For the sake of plotting we’ll create flat lists of (x, y) pairs:

[18]:
X_flat = np.broadcast_to(X, Y.shape).reshape((-1, 1))
Y_flat = Y.reshape((-1, 1))
plot_distribution(X_flat, Y_flat)
../../_images/notebooks_advanced_varying_noise_34_0.png

Data structure#

Our GPs don’t like it when multiple data points occupy the same \(x\) location. So we’ll reduce this dataset to the means and the variances at each \(x\), and use a custom function to model inject the empirical variances into the model.

[19]:
Y_mean = np.mean(Y, axis=-1, keepdims=True)

Fit a naive model#

Again we’ll start by fitting a naive GPR, to demonstrate the problem.

Notice that this case is somewhat different from what we did above. Here we are not modelling \(Y\), but the mean of \(Y\), so the noise variance is not our uncertainty of \(Y\), but our uncertainty of the mean of \(Y\). Furthermore, while the naive model assumes a constant noise variance, the custom noise variance function we will develop below will only model the noise variance at the points where we have data, thus we cannot plot a continuous uncertainty. Instead, the shaded gray area is the confidence interval of \(f\), and we use vertical black lines to indicate the confidence interval of the \(Y\) mean.

[20]:
model = gf.models.GPR(
    data=(X, Y_mean),
    kernel=gf.kernels.SquaredExponential(),
)
gf.optimizers.Scipy().minimize(
    model.training_loss, model.trainable_variables, options=optimizer_config
)
print_summary(model)
name class transform prior trainable shape dtype value
GPR.kernel.variance ParameterSoftplus True () float640.5174
GPR.kernel.lengthscalesParameterSoftplus True () float640.24234
GPR.likelihood.varianceParameterSoftplus + Shift True () float640.02477
[21]:
f_mean, f_var = model.predict_f(X_plot)
y_mean_mean, y_mean_var = model.predict_y(X)

plot_distribution(
    X_flat,
    Y_flat,
    X_plot,
    f_mean.numpy(),
    f_var.numpy(),
    X,
    y_mean_mean.numpy(),
    y_mean_var.numpy(),
)
../../_images/notebooks_advanced_varying_noise_39_0.png

Create custom function for the noise variance#

We’re modelling the Y_mean and its standard error is Y_var / n_repeats. We will create a simple function that ignores its input and returns these values as a constant. This will obviously only work for the X that corresponds to the Y we computed the variance of - that is good enough for model training, but notice that it will not allow us to use predict_y.

[22]:
class FixedVarianceOfMean(gf.functions.Function):
    def __init__(self, Y: gf.base.AnyNDArray):
        n_repeats = Y.shape[-1]
        self.var_mean = np.var(Y, axis=-1, keepdims=True) / n_repeats

    @inherit_check_shapes
    def __call__(self, X: gf.base.TensorType) -> tf.Tensor:
        return self.var_mean

Now we can plug that into our model:

[23]:
model = gf.models.GPR(
    data=(X, Y_mean),
    kernel=gf.kernels.SquaredExponential(active_dims=[0]),
    likelihood=gf.likelihoods.Gaussian(variance=FixedVarianceOfMean(Y)),
)
gf.optimizers.Scipy().minimize(
    model.training_loss, model.trainable_variables, options=optimizer_config
)
print_summary(model)
name class transform prior trainable shape dtype value
GPR.kernel.variance ParameterSoftplus True () float640.60614
GPR.kernel.lengthscalesParameterSoftplus True () float640.26903
[24]:
f_mean, f_var = model.predict_f(X_plot)
y_mean_mean, y_mean_var = model.predict_y(X)

plot_distribution(
    X_flat,
    Y_flat,
    X_plot,
    f_mean.numpy(),
    f_var.numpy(),
    X,
    y_mean_mean.numpy(),
    y_mean_var.numpy(),
)
../../_images/notebooks_advanced_varying_noise_44_0.png

You may notice that the data points fall outside our confidence interval. Again, this is because the confidence interval is for the mean of \(Y\), and not the \(Y\) points themselves.

Further reading#

To model the variance using a second GP, see the Heteroskedastic regression notebook.