Source code for stheno.model.observations

from lab import B
from matrix import AbstractMatrix, Diagonal
from mlkernels import (
    num_elements,
    PosteriorKernel,
    SubspaceKernel,
    PosteriorMean,
)
from plum import Union

from .fdd import FDD
from .gp import cross
from .. import _dispatch

__all__ = [
    "combine",
    "AbstractObservations",
    "Observations",
    "Obs",
    "PseudoObservations",
    "PseudoObs",
    "SparseObservations",
    "SparseObs",
]



@_dispatch
def combine(*fdds: FDD):
    """Combine multiple FDDs or tuples of observations into one.

    Args:
        *objs (:class:`.fdd.FDD` or tuple): FDDs or tuples of observations.

    Returns:
        :class:`.fdd.FDD` or tuple: `args` combined into one.
    """
    combined_noise = B.block_diag(*[fdd.noise for fdd in fdds])
    return cross(*[fdd.p for fdd in fdds])(fdds, combined_noise)


[docs]@_dispatch def combine(*pairs: tuple): fdds, ys = zip(*pairs) combined_fdd = combine(*fdds) combined_y = B.concat(*[B.uprank(y) for y in ys], axis=0) return combined_fdd, combined_y
[docs]class AbstractObservations: """Abstract base class for observations. Also takes in one or multiple tuples of the described arguments. Args: fdd (:class:`.fdd.FDD`): FDD of observations. y (column vector): Values of observations. Attributes: fdd (:class:`.fdd.FDD`): FDD of observations. y (column vector): Values of observations. """ @_dispatch def __init__(self, fdd: Union[FDD], y: Union[B.Numeric, AbstractMatrix]): self.fdd = fdd self.y = y @_dispatch def __init__(self, *pairs: tuple): AbstractObservations.__init__(self, *combine(*pairs))
[docs] def posterior_kernel(self, measure, p_i, p_j): # pragma: no cover """Get the posterior kernel between two processes. Args: measure (:class:`.measure.Measure`): Prior. p_i (:class:`.gp.GP`): First process. p_j (:class:`.gp.GP`): Second process. Returns: :class:`mlkernels.Kernel`: Posterior kernel between the first and second process. """ raise NotImplementedError("Posterior kernel construction not implemented.")
[docs] def posterior_mean(self, measure, p): # pragma: no cover """Get the posterior kernel of a process. Args: measure (:class:`.measure.Measure`): Prior. p (:class:`.gp.GP`): Process. Returns: :class:`mlkernels.Mean`: Posterior mean of `p`. """ raise NotImplementedError("Posterior mean construction not implemented.")
[docs]class Observations(AbstractObservations): """Observations. Takes arguments according to the constructor of :class:`.measure.AbstractObservations`. Args: fdd (:class:`.fdd.FDD`): FDDs to corresponding to the observations. y (tensor): Values of observations. """ def __init__(self, *args): AbstractObservations.__init__(self, *args) self._K_x_store = {}
[docs] def K_x(self, measure): """Kernel matrix of the data. Args: measure (:class:`.measure.Measure`): Measure. Returns: matrix: Kernel matrix. """ try: return self._K_x_store[id(measure)] except KeyError: K_x = measure.kernels[self.fdd.p](self.fdd.x) + self.fdd.noise self._K_x_store[id(measure)] = K_x return K_x
[docs] def posterior_kernel(self, measure, p_i, p_j): if num_elements(self.fdd.x) == 0: # There are no observations! Just return prior. return measure.kernels[p_i, p_j] return PosteriorKernel( measure.kernels[p_i, p_j], measure.kernels[self.fdd.p, p_i], measure.kernels[self.fdd.p, p_j], self.fdd.x, self.K_x(measure), )
[docs] def posterior_mean(self, measure, p): if num_elements(self.fdd.x) == 0: # There are no observations! Just return prior. return measure.means[p] return PosteriorMean( measure.means[p], measure.means[self.fdd.p], measure.kernels[self.fdd.p, p], self.fdd.x, self.K_x(measure), self.y, )
[docs]class PseudoObservations(AbstractObservations): """Observations through inducing points. Further takes arguments according to the constructor of :class:`.measure.AbstractObservations`. Can also take in tuples of inducing points. Args: u (:class:`.fdd.FDD`): Inducing points """ @_dispatch def __init__(self, u: FDD, *args): AbstractObservations.__init__(self, *args) self.u = u self._K_z_store = {} self._elbo_store = {} self._mu_store = {} self._A_store = {} @_dispatch def __init__(self, us: tuple, *args): PseudoObservations.__init__(self, combine(*us), *args)
[docs] def K_z(self, measure): """Kernel matrix of the data. Args: measure (:class:`.measure.Measure`): Measure. Returns: matrix: Kernel matrix. """ try: return self._K_z_store[id(measure)] except KeyError: self._compute(measure) return self._K_z_store[id(measure)]
[docs] def elbo(self, measure): """ELBO. Args: measure (:class:`.measure.Measure`): Measure. Returns: scalar: ELBO. """ try: return self._elbo_store[id(measure)] except KeyError: self._compute(measure) return self._elbo_store[id(measure)]
[docs] def mu(self, measure): """Mean of optimal approximating distribution. Args: measure (:class:`.measure.Measure`): Measure. Returns: matrix: Mean. """ try: return self._mu_store[id(measure)] except KeyError: self._compute(measure) return self._mu_store[id(measure)]
[docs] def A(self, measure): """Parameter of the corrective variance of the kernel of the optimal approximating distribution. Args: measure (:class:`.measure.Measure`): Measure. Returns: matrix: Corrective variance. """ try: return self._A_store[id(measure)] except KeyError: self._compute(measure) return self._A_store[id(measure)]
def _compute(self, measure): # Extract processes and inputs. p_x, x, noise_x = self.fdd.p, self.fdd.x, self.fdd.noise p_z, z, noise_z = self.u.p, self.u.x, self.u.noise # Construct the necessary kernel matrices. K_zx = measure.kernels[p_z, p_x](z, x) K_z = B.add(measure.kernels[p_z](z), noise_z) self._K_z_store[id(measure)] = K_z # Noise kernel matrix: K_n = noise_x # The approximation can only handle diagonal noise matrices. if not isinstance(K_n, Diagonal): raise RuntimeError( f"Kernel matrix of observation noise must be diagonal, " f'not "{type(K_n).__name__}".' ) # And construct the components for the inducing point approximation. L_z = B.cholesky(K_z) A = B.add(B.eye(K_z), B.iqf(K_n, B.transpose(B.solve(L_z, K_zx)))) self._A_store[id(measure)] = A y_bar = B.subtract(B.uprank(self.y), measure.means[p_x](x)) prod_y_bar = B.solve(L_z, B.iqf(K_n, B.transpose(K_zx), y_bar)) # Compute the optimal mean. mu = B.add( measure.means[p_z](z), B.iqf(A, B.solve(L_z, K_z), prod_y_bar), ) self._mu_store[id(measure)] = mu # Compute the ELBO. # NOTE: The calculation of `trace_part` asserts that `K_n` is diagonal. # The rest, however, is completely generic. trace_part = B.ratio( Diagonal(measure.kernels[p_x].elwise(x)[:, 0]) - Diagonal(B.iqf_diag(K_z, K_zx)), K_n, ) det_part = B.logdet(2 * B.pi * K_n) + B.logdet(A) iqf_part = B.iqf(K_n, y_bar)[0, 0] - B.iqf(A, prod_y_bar)[0, 0] self._elbo_store[id(measure)] = -0.5 * (trace_part + det_part + iqf_part)
[docs] def posterior_kernel(self, measure, p_i, p_j): L = B.chol(self.K_z(measure)) return PosteriorKernel( measure.kernels[p_i, p_j], measure.kernels[self.u.p, p_i], measure.kernels[self.u.p, p_j], self.u.x, self.K_z(measure), ) + SubspaceKernel( measure.kernels[self.u.p, p_i], measure.kernels[self.u.p, p_j], self.u.x, B.mm(L, self.A(measure), L, tr_c=True), )
[docs] def posterior_mean(self, measure, p): return PosteriorMean( measure.means[p], measure.means[self.u.p], measure.kernels[self.u.p, p], self.u.x, self.K_z(measure), self.mu(measure), )
Obs = Observations #: Shorthand for `Observations`. PseudoObs = PseudoObservations #: Shorthand for `PseudoObservations`. # Backward compatibility: SparseObs = PseudoObservations SparseObservations = PseudoObservations