#!/usr/bin/env python
"""
In bayesloop, each new data study is handled by an instance of a ``Study``-class. In this way, all data, the inference
results and the appropriate post-processing routines are stored in one object that can be accessed conveniently or
stored in a file. Apart from the basic ``Study`` class, there exist a number of specialized classes that extend the
basic fit method, for example to infer the full distribution of hyper-parameters or to apply model selection to on-line
data streams.
"""
from __future__ import division, print_function
import string
import numpy as np
import matplotlib.pyplot as plt
from scipy.optimize import minimize
from scipy.special import factorial, logsumexp
from scipy.special import beta as beta_func
import sympy.abc as abc
from sympy import symbols
from sympy import lambdify
from sympy.stats import density
import sympy.stats
from copy import copy, deepcopy
from collections import OrderedDict
from collections.abc import Iterable
from joblib import Parallel, delayed
from tqdm.auto import tqdm
from .helper import assign_nested_item, recursive_index, flatten, create_colormap, oint, cint, free_symbols
from .preprocessing import moving_window
from .observation_models import ObservationModel
from .transition_models import TransitionModel, CombinedTransitionModel, SerialTransitionModel
from .exceptions import ConfigurationError, PostProcessingError
[docs]
class Study(object):
"""
Fits with fixed hyper-parameters and hyper-parameter optimization. This class implements a
forward-backward-algorithm for analyzing time series data using hierarchical models. For efficient computation,
all parameter distributions are discretized on a parameter grid.
"""
def __init__(self, silent=False):
self.observation_model = None
self.transition_model = None
self.grid_size = []
self.boundaries = []
self.marginal_grid = []
self.grid = []
self.lattice_constant = []
self.raw_data = np.array([])
self.formatted_data = np.array([])
self.raw_timestamps = None
self.formatted_timestamps = None
self.posterior_sequence = []
self.posterior_mean_values = []
self.log_evidence = 0
self.local_evidence = []
self.selected_hyper_parameters = []
self.fit_warning_counter = 0
if not silent:
print('+ Created new study.')
@property
def log10_evidence(self):
return self.log_evidence/np.log(10)
[docs]
def load_example_data(self, silent=False):
"""
Loads UK coal mining disaster data.
Args:
silent(bool): If set to True, no output is generated by this method.
"""
self.raw_data = np.array([5, 4, 1, 0, 4, 3, 4, 0, 6, 3, 3, 4, 0, 2, 6, 3, 3, 5, 4, 5, 3, 1, 4,
4, 1, 5, 5, 3, 4, 2, 5, 2, 2, 3, 4, 2, 1, 3, 2, 2, 1, 1, 1, 1, 3, 0,
0, 1, 0, 1, 1, 0, 0, 3, 1, 0, 3, 2, 2, 0, 1, 1, 1, 0, 1, 0, 1, 0, 0,
0, 2, 1, 0, 0, 0, 1, 1, 0, 2, 3, 3, 1, 1, 2, 1, 1, 1, 1, 2, 3, 3, 0,
0, 0, 1, 4, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0])
self.raw_timestamps = np.arange(1852, 1962)
if not silent:
print('+ Successfully imported example data.')
[docs]
def load_data(self, array, timestamps=None, silent=False):
"""
Loads Numpy array as data.
Args:
array(ndarray): Numpy array containing time series data
timestamps(ndarray): Array of timestamps (same length as data array)
silent(bool): If set to True, no output is generated by this method.
"""
if isinstance(array, np.ndarray):
self.raw_data = array
elif isinstance(array, list):
if not silent:
print('! WARNING: Data supplied as list, not as Numpy array. Converting list to Numpy array '
'(dtype=float).')
self.raw_data = np.array(array, dtype=float)
else:
raise ConfigurationError('Data type not supported. Please provide data as Numpy array.')
if timestamps is not None: # load custom timestamps
if len(timestamps) == len(array):
self.raw_timestamps = np.array(timestamps)
else:
if not silent:
print('! WARNING: Number of timestamps does not match number of data points. Omitting timestamps.')
self.raw_timestamps = np.arange(len(self.raw_data))
else: # set default timestamps (integer range)
self.raw_timestamps = np.arange(len(self.raw_data))
if not silent:
print('+ Successfully imported array.')
[docs]
def set_observation_model(self, L, silent=False):
"""
Sets observation model (likelihood function) for analysis and creates parameter grid for inference routine.
Args:
L: Observation model class (see observation_models.py)
silent(bool): If set to True, no output is generated by this method.
"""
self.observation_model = L
# prepare parameter grid
self.marginal_grid = []
self.grid_size = []
self.boundaries = []
self.lattice_constant = []
for v, n in zip(self.observation_model.parameter_values, self.observation_model.parameter_names):
if v is None: # if user has not specified parameter values, we try to estimate them
try:
v = self.observation_model.estimate_parameter_values(n, self.raw_data)
print('+ Estimated parameter interval for "{}": [{}, {}] ({} values).'
.format(n, v[0], v[-1], len(v)))
except:
raise ConfigurationError('Could not estimate parameter values for "{}".'.format(n))
v = np.array(v, dtype=float) # inference algorithm needs floats
if len(v) == 0:
raise ConfigurationError('Parameter values for "{}" must not be empty.'.format(n))
self.marginal_grid.append(v)
self.grid_size.append(len(v))
self.boundaries.append([v[0], v[-1]])
# check if parameter values are equally spaced
if len(v) == 1:
self.lattice_constant.append(1.)
elif np.any(np.abs(np.diff(np.diff(v))) > 10 ** -10):
print('! WARNING: Supplied parameter values for "{}" are not equally spaced. Assuming categorical '
'parameter.'.format(n))
self.lattice_constant.append(1.)
else: # equally spaced (regular grid)
self.lattice_constant.append(np.abs(v[0] - v[1]))
# create grid
self.grid = [m for m in np.meshgrid(*self.marginal_grid, indexing='ij')]
self.observation_model.prepare_grid(self.grid)
# if observation model is updated, transition model must know the new lattice constant
if self.transition_model is not None:
self.transition_model.lattice_constant = self.lattice_constant
if not silent:
print('+ Observation model: {}. Parameter(s): {}'.format(L, L.parameter_names))
def _compute_prior(self, silent=False):
"""
Computes discrete prior probabilities (densities) for the parameters of the observation model. The custom prior
distribution may be passed as a Numpy array that has the same shape as the parameter grid, as a (lambda)
function or as a (list of) SymPy random variable(s).
Args:
silent(bool): If set to True, no output is generated by this method.
Returns:
ndarray: Prior probability (density) values with the same size as the parameter grid
"""
prior = self.observation_model.prior
# check if no prior is specified
if prior is None:
prior = np.ones(self.grid_size) # flat (improper) prior
prior /= np.sum(prior) # re-normalize (now equal to uniform prior)
prior /= np.prod(self.lattice_constant) # convert to prob. density
if not silent:
print(' + Set uniform prior with parameter boundaries.')
return prior
# check whether correctly shaped numpy array is provided
if isinstance(prior, np.ndarray):
if np.all(prior.shape == self.grid[0].shape):
norm = np.sum(prior)
if norm != 1.:
prior /= norm
prior /= np.prod(self.lattice_constant)
if not silent:
print(' + Set prior (numpy array). Values have been re-normalized.')
else:
if not silent:
print(' + Set prior (numpy array).')
return prior
else:
raise ConfigurationError('Prior array does not match parameter grid size.')
# check whether function is provided
if hasattr(prior, '__call__'):
values = prior(*self.grid)*np.ones(self.grid_size) # last factor for cases like lambda x: 1.
norm = np.sum(values)
if norm != 1.:
values /= norm
values /= np.prod(self.lattice_constant)
if not silent:
print(' + Set prior (function): {}. Values have been re-normalized.'.format(prior.__name__))
else:
if not silent:
print(' + Set prior (function): {}'.format(prior.__name__))
return values
# check whether single random variable is provided
if type(prior) is sympy.stats.rv.RandomSymbol:
prior = [prior]
# check if list/tuple is provided
if isinstance(prior, (list, tuple)) and not isinstance(prior, str):
if len(prior) != len(self.observation_model.parameter_names):
raise ConfigurationError('Observation model contains {} parameters, but {} priors were provided.'
.format(len(self.observation_model.parameter_names), len(prior)))
pdf = 1
if len(prior) > 1:
x = symbols(' '.join(list(string.ascii_lowercase)[:len(prior)]))
else:
x = [abc.x]
for i, rv in enumerate(prior):
if type(rv) is not sympy.stats.rv.RandomSymbol:
raise ConfigurationError('Only lambda functions or SymPy random variables can be used as a prior.')
if len(free_symbols(rv)) > 0:
raise ConfigurationError('Prior distribution must not contain free parameters.')
# multiply total pdf with density for current parameter
pdf = pdf*density(rv)(x[i])
# set density as lambda function
if not silent:
print(' + Set prior (sympy): {}'.format(pdf))
return lambdify(x, pdf, modules=['numpy', {'factorial': factorial, 'beta': beta_func}])(*self.grid)
[docs]
def set_transition_model(self, T, silent=False):
"""
Set transition model which describes the parameter dynamics.
Args:
T: Transition model class (see transition_models.py)
silent(bool): If true, no output is printed by this method
"""
# check if model is a break-point and raise error if so
if str(T) == 'Break-point':
raise ConfigurationError('The "BreakPoint" transition model can only be used with the '
'"SerialTransitionModel" class.')
self.transition_model = T
self.transition_model.study = self
self.transition_model.lattice_constant = self.lattice_constant
if not silent:
print('+ Transition model: {}. Hyper-Parameter(s): {}'
.format(T, self._unpack_all_hyper_parameters(values=False)))
[docs]
def set(self, *args, **kwargs):
"""
Set observation model or transition model, or both. See :meth:`.Study.set_transition_model` and
:meth:`.Study.set_observation_model`.
Args:
args: Sequence of Observation model instance and Transition model instance, or just one of those two types
silent(bool): If true, no output is printed by this method
"""
# Check for unknown keyword-arguments
for key in kwargs.keys():
if key not in ['silent']:
raise TypeError("set() got an unexpected keyword argument '{}'".format(key))
# Keep the legacy calling convention: S.set(model, silent=True).
silent = kwargs.pop('silent', False)
om = False
tm = False
for model in args:
if ObservationModel in model.__class__.__bases__:
if om:
raise ConfigurationError('More than one observation model supplied.')
om = True
self.set_observation_model(model, silent=silent)
elif TransitionModel in model.__class__.__bases__:
if tm:
raise ConfigurationError('More than one transition model supplied.')
tm = True
self.set_transition_model(model, silent=silent)
else:
raise ConfigurationError('Expected observation model or transition model instance as first argument.')
def _estimate_likelihood_cache_size(self):
"""
Estimate the memory needed for storing one likelihood array per formatted data segment.
Returns:
float: estimated cache size in MiB.
"""
return len(self.formatted_data) * int(np.prod(self.grid_size)) * np.dtype(float).itemsize / 2. ** 20
def _should_cache_likelihoods(self, cache_likelihoods, max_cache_size):
"""
Decide whether a likelihood cache should be allocated.
"""
cache_size = self._estimate_likelihood_cache_size()
if isinstance(cache_likelihoods, (bool, np.bool_)):
return bool(cache_likelihoods), cache_size
if isinstance(cache_likelihoods, str) and cache_likelihoods == 'auto':
if not self.observation_model.should_cache_likelihood_sequence():
return False, cache_size
if max_cache_size is None:
return True, cache_size
if max_cache_size < 0:
raise ConfigurationError('max_cache_size must be non-negative or None.')
return cache_size <= max_cache_size, cache_size
raise ConfigurationError('cache_likelihoods must be True, False, or "auto".')
def _compute_likelihood_sequence(self):
"""
Pre-compute observation likelihoods for all formatted data segments.
Returns:
ndarray: Likelihood values with shape [nTime] + grid_size.
"""
likelihood_sequence = np.empty([len(self.formatted_data)] + self.grid_size, dtype=float)
for i, data_segment in enumerate(self.formatted_data):
likelihood = self.observation_model.processed_pdf(self.grid, data_segment)
# force dtype float on likelihood (in case it is of dtype object)
if likelihood.dtype == object:
likelihood = likelihood.astype(float)
likelihood_sequence[i] = likelihood
return likelihood_sequence
def _fit_formatted_data(self, forward_only=False, evidence_only=False, silent=False,
cache_likelihoods='auto', max_cache_size=512, likelihood_sequence=None):
"""
Run the forward-backward algorithm on already formatted data.
"""
lattice_constant = np.prod(self.lattice_constant)
force_cache = isinstance(cache_likelihoods, (bool, np.bool_)) and bool(cache_likelihoods)
if likelihood_sequence is None and (force_cache or not (forward_only or evidence_only)):
use_cache, cache_size = self._should_cache_likelihoods(cache_likelihoods, max_cache_size)
if use_cache:
likelihood_sequence = self._compute_likelihood_sequence()
if not silent:
print(' + Cached observation likelihoods ({:.1f} MiB).'.format(cache_size))
# initialize array for posterior distributions
if not evidence_only:
self.posterior_sequence = np.empty([len(self.formatted_data)] + self.grid_size)
else:
self.posterior_sequence = []
# initialize array for computed evidence (marginal likelihood)
self.log_evidence = 0
self.local_evidence = np.empty(len(self.formatted_data))
# set prior distribution for forward-pass
alpha = self._compute_prior(silent=silent)
# show progressbar if silent=False
if not silent:
enum = tqdm(np.arange(0, len(self.formatted_data)), total=len(self.formatted_data))
else:
enum = np.arange(0, len(self.formatted_data))
# forward pass
for i in enum:
# compute likelihood
if likelihood_sequence is None:
likelihood = self.observation_model.processed_pdf(self.grid, self.formatted_data[i])
# force dtype float on likelihood (in case it is of dtype object)
if likelihood.dtype == object:
likelihood = likelihood.astype(float)
else:
likelihood = likelihood_sequence[i]
# update alpha based on likelihood
alpha *= likelihood
# normalization constant of alpha is used to compute evidence
norm = np.sum(alpha)
# normalize alpha (for numerical stability)
if norm > 0.:
alpha /= norm
else:
# if all probability values are zero, normalization is not possible
if self.fit_warning_counter < 5:
print(' ! WARNING: Forward pass distribution contains only zeros, check parameter boundaries!')
print(' Stopping inference process. Setting model evidence to zero.')
elif self.fit_warning_counter == 5:
print(' ! WARNING: Will omit further warnings about parameter boundaries.')
self.fit_warning_counter += 1
self.log_evidence = -np.inf
return
# update log-evidence and compute local evidence
self.log_evidence += np.log(norm)
self.local_evidence[i] = norm * lattice_constant # integration yields evidence, not only sum
# alphas are stored as preliminary posterior distributions
if not evidence_only:
self.posterior_sequence[i] = alpha
# compute alpha for next iteration
alpha = self.transition_model.compute_forward_prior(alpha, self.formatted_timestamps[i])
# remove progressbar correctly
if not silent:
enum.close()
self.log_evidence += np.log(lattice_constant) # integration yields evidence, not only sum
if not silent:
print(' + Finished forward pass.')
print(' + Log10-evidence: {:.5f}'.format(self.log_evidence / np.log(10)))
if not (forward_only or evidence_only):
# initialize beta and normalize by grid size (for numerical stability only)
beta = np.ones(self.grid_size)
beta /= np.sum(beta)
# show progressbar if silent=False
if not silent:
enum = tqdm(np.arange(0, len(self.formatted_data))[::-1], total=len(self.formatted_data))
else:
enum = np.arange(0, len(self.formatted_data))[::-1]
# backward pass
for i in enum:
# posterior ~ alpha*beta
self.posterior_sequence[i] *= beta # alpha*beta
# normalize posterior wrt the parameters
norm = np.sum(self.posterior_sequence[i])
if norm > 0.:
self.posterior_sequence[i] /= norm
else:
# if all posterior probabilities are zero, normalization is not possible
if self.fit_warning_counter < 5:
print(' ! WARNING: Posterior distribution contains only zeros, check parameter boundaries!')
print(' Stopping inference process. Setting model evidence to zero.')
elif self.fit_warning_counter == 5:
print(' ! WARNING: Will omit further warnings about parameter boundaries.')
self.fit_warning_counter += 1
self.log_evidence = -np.inf
return
# re-compute likelihood if no cache is available
if likelihood_sequence is None:
likelihood = self.observation_model.processed_pdf(self.grid, self.formatted_data[i])
# force dtype float on likelihood (in case it is of dtype object)
if likelihood.dtype == object:
likelihood = likelihood.astype(float)
else:
likelihood = likelihood_sequence[i]
# compute local evidence
with np.errstate(invalid='ignore'):
self.local_evidence[i] = 1. / (np.sum(self.posterior_sequence[i] / likelihood) *
lattice_constant) # integration, not only sum
# compute beta for next iteration
beta = self.transition_model.compute_backward_prior(beta * likelihood, self.formatted_timestamps[i])
# normalize beta (for numerical stability)
beta /= np.sum(beta)
if not silent:
enum.close() # remove progressbar correctly
print(' + Finished backward pass.')
# posterior mean values do not need to be computed for evidence
if evidence_only:
self.posterior_mean_values = []
else:
self.posterior_mean_values = np.empty([len(self.grid), len(self.posterior_sequence)])
for i in range(len(self.grid)):
self.posterior_mean_values[i] = np.array([np.sum(p * self.grid[i]) for p in self.posterior_sequence])
if not silent:
print(' + Computed mean parameter values.')
[docs]
def fit(self, forward_only=False, evidence_only=False, silent=False,
cache_likelihoods='auto', max_cache_size=512):
"""
Computes the sequence of posterior distributions and evidence for each time step. Evidence is also computed for
the complete data set.
Args:
forward_only(bool): If set to True, the fitting process is terminated after the forward pass. The resulting
posterior distributions are so-called "filtering distributions" which - at each time step -
only incorporate the information of past data points. This option thus emulates an online
analysis.
evidence_only(bool): If set to True, only forward pass is run and evidence is calculated. In contrast to the
forward_only option, no posterior mean values are computed and no posterior distributions are stored.
silent(bool): If set to True, no output is generated by the fitting method.
cache_likelihoods(bool or str): If set to True, observation likelihoods are cached for all time steps. If set
to "auto", caching is enabled only when the estimated cache size fits into max_cache_size. If False,
likelihoods are computed on demand.
max_cache_size(float): Maximum likelihood cache size in MiB used by cache_likelihoods="auto". Set to None for
no limit.
"""
self._check_consistency()
if not silent:
print('+ Started new fit:')
self.formatted_data = moving_window(self.raw_data, self.observation_model.segment_length)
self.formatted_timestamps = self.raw_timestamps[self.observation_model.segment_length-1:]
if not silent:
print(' + Formatted data.')
self._fit_formatted_data(forward_only=forward_only,
evidence_only=evidence_only,
silent=silent,
cache_likelihoods=cache_likelihoods,
max_cache_size=max_cache_size)
[docs]
def optimize(self, parameter_list=[], forward_only=False, **kwargs):
"""
Uses the COBYLA minimization algorithm from SciPy to perform a maximization of the log-evidence with respect
to all hyper-parameters (the parameters of the transition model) of a time seris model. The starting values
are the values set by the user when defining the transition model.
For the optimization, only the log-evidence is computed and no parameter distributions are stored. When a local
maximum is found, the parameter distribution is computed based on the optimal values for the hyper-parameters.
Args:
parameter_list(list): List of hyper-parameter names to optimize. For nested transition models with multiple,
identical hyper-parameter names, the sub-model index can be provided. By default, all hyper-parameters
are optimized.
forward_only(bool): If set to True, the fitting process is terminated after the forward pass. The resulting
posterior distributions are so-called "filtering distributions" which - at each time step -
only incorporate the information of past data points.
**kwargs - All other keyword parameters are passed to the 'minimize' routine of scipy.optimize.
"""
# set list of parameters to optimize
if isinstance(parameter_list, str): # in case only a single parameter name is provided as a string
self.selected_hyper_parameters = [parameter_list]
else:
self.selected_hyper_parameters = parameter_list
print('+ Starting optimization...')
self._check_consistency()
if self.selected_hyper_parameters:
print(' --> Parameter(s) to optimize:', self.selected_hyper_parameters)
else:
print(' --> All model parameters are optimized (except change/break-points).')
# load all hyper-parameter names (but remove break- and change-points)
all_hyper_parameters = list(flatten(self._unpack_hyper_parameters(self.transition_model)))
points = list(flatten(self._unpack_changepoint_names(self.transition_model))) + \
list(flatten(self._unpack_breakpoint_names(self.transition_model)))
self.selected_hyper_parameters = [x for x in all_hyper_parameters if x not in points]
# create parameter list to set start values for optimization
x0 = self._unpack_selected_hyper_parameters()
# check if valid parameter names were entered
if len(x0) == 0:
# reset list of parameters to optimize, so that unpacking and setting hyper-parameters works as expected
self.selected_hyper_parameters = []
raise ConfigurationError('No parameters to optimize. Check parameter names.')
# perform optimization (maximization of log-evidence)
result = minimize(self._optimization_step, x0, method='COBYLA', **kwargs)
print('+ Finished optimization.')
# set optimal hyperparameters in transition model
self._set_selected_hyper_parameters(result.x)
# run analysis with optimal parameter values
self.fit(forward_only=forward_only)
# reset list of parameters to optimize, so that unpacking and setting hyper-parameters works as expected
self.selected_hyper_parameters = []
def _optimization_step(self, x):
"""
Wrapper for the fit method to use it in conjunction with scipy.optimize.minimize.
Args:
x(list): unpacked list of current hyper-parameter values
"""
# set new hyperparameters in transition model
self._set_selected_hyper_parameters(x)
# compute log-evidence
self.fit(evidence_only=True, silent=True)
print(' + Log10-evidence: {:.5f}'.format(self.log_evidence / np.log(10)), '- Parameter values:', x)
# return negative log-evidence (is minimized to maximize evidence)
return -self.log_evidence
[docs]
def simulate(self, x, t=None, density=False):
"""
Computes the probability (density) for a set of observations, based on the inferred parameter distributions of a
given time step, or based on the time-averaged parameter distributions. It can be used to compute the expected
distribution of the observed data, taking into account the uncertainty in the parameters (as well as
hyper-parameters for Hyper-Studies).
Args:
x: array of observation values
t: Time step/stamp for which the parameter distribution is evaluated
density: If true, probability density is computed; if false, probability value is computed
Returns:
ndarray: probability (density) values corresponding to observation values
"""
if self.observation_model.segment_length > 1:
raise NotImplementedError('Method "simulate" is only available for observation models with '
'segment length 1.')
# if no time is provided, use time-averaged posterior distribution
if t is None:
post = np.sum(self.posterior_sequence, axis=0) / len(self.posterior_sequence)
else:
# check if supplied time stamp exists
if t not in self.formatted_timestamps:
raise PostProcessingError('Supplied time ({}) does not exist in data or is out of range.'.format(t))
time_index = list(self.formatted_timestamps).index(t) # to select corresponding posterior distribution
post = self.posterior_sequence[time_index]
# compute distribution of observations/data at the given points x
prob = np.array([np.sum(self.observation_model.pdf(self.grid, [xi]) * post) for xi in x])
if not density:
prob /= np.sum(prob)
return prob
def _unpack_hyper_parameters(self, transition_model, values=False):
"""
Returns list of all hyper-parameters (names or values), nested as the transition model.
Args:
transition_model: An instance of a transition model
values: By default, parameter names are returned; if set to True, parameter values are returned
Returns:
list: hyper-parameters (names or values)
"""
param_list = []
# recursion step for sub-models
if hasattr(transition_model, 'models'):
for m in transition_model.models:
param_list.append(self._unpack_hyper_parameters(m, values=values))
# extend hyper-parameter based on current (sub-)model
if hasattr(transition_model, 'hyper_parameter_names'):
if values:
param_list.extend(transition_model.hyper_parameter_values)
else:
param_list.extend(transition_model.hyper_parameter_names)
return param_list
def _unpack_all_hyper_parameters(self, values=True):
"""
Returns a flattened list of all hyper-parameter values of the current transition model.
Returns:
list: all hyper-parameter values of the current transition model
"""
return list(flatten(self._unpack_hyper_parameters(self.transition_model, values=values)))
def _unpack_selected_hyper_parameters(self):
"""
The parameters of a transition model can be split between several sub-models (using CombinedTransitionModel or
SerialTransitionModel) and can be lists of values (multiple standard deviations in GaussianRandomWalk). This
function unpacks the hyper-parameters, resulting in a single list of values that can be fed to the optimization
step routine. Note that only the hyper-parameters that are noted (by name) in the attribute
selected_hyper_parameters are regarded.
Returns:
list: currently selected hyper-parameter values if successful, 0 otherwise
"""
# if no hyper-parameters are selected, choose all
if not self.selected_hyper_parameters:
return self._unpack_all_hyper_parameters()
# if self.selected_hyper_parameters is not empty
name_tree = self._unpack_hyper_parameters(self.transition_model)
value_tree = self._unpack_hyper_parameters(self.transition_model, values=True)
output = []
# loop over selected hyper-parameters
for name in self.selected_hyper_parameters:
i_found = recursive_index(name_tree, name) # choose first hit
if len(i_found) == 0:
raise ConfigurationError('Could not find any hyper-parameter named {}.'.format(name))
value = value_tree[:]
for i in i_found:
value = value[i]
output.append(value)
# remove occurrence from name_tree (if name is listed twice, use second occurrence...)
assign_nested_item(name_tree, i_found, ' ')
# return selected values of hyper-parameters
return output
def _set_all_hyper_parameters(self, x):
"""
Sets all current hyper-parameters, based on a flattened list of parameter values.
Args:
x(list): list of values (e.g. from _unpack_selected_hyper_parameters)
"""
param_list = list(x[:]) # make copy of parameter list
name_tree = self._unpack_hyper_parameters(self.transition_model)
names_flat = list(flatten(self._unpack_hyper_parameters(self.transition_model)))
for name in names_flat:
index = recursive_index(name_tree, name)
# get correct sub-model
model = self.transition_model
for i in index[:-1]:
model = model.models[i]
model.hyper_parameter_values[model.hyper_parameter_names.index(name)] = param_list[0]
param_list.pop(0)
# remove occurrence from name_tree (if name is listed twice, use second occurrence...)
assign_nested_item(name_tree, index, ' ')
def _set_selected_hyper_parameters(self, x):
"""
The parameters of a transition model can be split between several sub-models (using CombinedTransitionModel or
SerialTransitionModel) and can be lists of values (multiple standard deviations in GaussianRandomWalk). This
function takes a list of values and sets the corresponding variables in the transition model instance. Note that
only the hyper-parameters that are noted (by name) in the attribute selected_hyper_parameters are regarded.
Args:
x(list): list of values (e.g. from _unpack_selected_hyper_parameters)
Returns:
int: 1, if successful, 0 otherwise
"""
# if no hyper-parameters are selected, choose all
if not self.selected_hyper_parameters:
self._set_all_hyper_parameters(x)
return 1
param_list = list(x[:]) # make copy of parameter list
name_tree = self._unpack_hyper_parameters(self.transition_model)
# loop over selected hyper-parameters
for name in self.selected_hyper_parameters:
i_found = recursive_index(name_tree, name) # choose first hit
if len(i_found) == 0:
raise ConfigurationError('Could not find any hyper-parameter named {}.'.format(name))
# get correct sub-model
model = self.transition_model
for i in i_found[:-1]:
model = model.models[i]
model.hyper_parameter_values[model.hyper_parameter_names.index(name)] = param_list[0]
param_list.pop(0)
# remove occurrence from name_tree (if name is listed twice, use second occurrence...)
assign_nested_item(name_tree, i_found, ' ')
return 1
def _unpack_changepoint_names(self, transition_model):
"""
Returns list of all hyper-parameter names that are associated with change-points, nested like the transition
model.
Returns:
list: all hyper-parameter names that are associated with change-points
"""
param_list = []
# recursion step for sub-models
if hasattr(transition_model, 'models'):
for m in transition_model.models:
param_list.append(self._unpack_changepoint_names(m))
# extend hyper-parameter based on current (sub-)model
if hasattr(transition_model, 'hyper_parameter_names'):
if str(transition_model) == 'Change-point':
param_list.extend(transition_model.hyper_parameter_names)
return param_list
def _unpack_breakpoint_names(self, transition_model):
"""
Returns list of all hyper-parameter names that are associated with break-points, nested like the transition
model.
Returns:
list: all hyper-parameter names that are associated with break-points
"""
param_list = []
# recursion step for sub-models
if hasattr(transition_model, 'models'):
for m in transition_model.models:
param_list.append(self._unpack_breakpoint_names(m))
# extend hyper-parameter based on current (sub-)model
if hasattr(transition_model, 'hyper_parameter_names'):
if str(transition_model) == 'Serial transition model':
param_list.extend(transition_model.hyper_parameter_names)
return param_list
def _get_hyper_parameter_index(self, transition_model, name):
"""
Helper function that returns the index at which a hyper-parameter is found in the flattened list of
hyper-parameter names.
Args:
transition_model: transition model instance in which to search
name(str): Name of a hyper-parameter. If the name occurs multiple times, the index of the submodel can be
supplied (starting at 1 for the first submodel).
Returns:
int: index of the hyper-parameter
"""
# no index provided: choose first occurrence and determine axis of hyper-parameter on grid of
# hyper-parameter values
hpn = list(flatten(self._unpack_hyper_parameters(transition_model, values=False)))
if name in hpn:
param_index = hpn.index(name)
else:
raise PostProcessingError('Could not find any hyper-parameter with name: {}.'.format(name))
return param_index
[docs]
def get_hyper_parameter_value(self, name):
"""
Returns the currently set value of a hyper-parameter. Note: The returned value is NOT an inferred value, but
simply the last value used by the fitting method.
Args:
name(str): Hyper-parameter name.
Returns:
float: current value of the specified hyper-parameter.
"""
flat_hyper_parameter_values = self._unpack_all_hyper_parameters(values=True)
value = flat_hyper_parameter_values[self._get_hyper_parameter_index(self.transition_model, name)]
return value
[docs]
def get_parameter_mean_values(self, name):
"""
Returns posterior mean values for a parameter of the observation model.
Args:
name(str): Name of the parameter to display
Returns:
ndarray: array of posterior mean values for the selected parameter
"""
# get parameter index
param_index = -1
for i, n in enumerate(self.observation_model.parameter_names):
if n == name:
param_index = i
# check if match was found
if param_index == -1:
raise PostProcessingError('Wrong parameter name. Available options: {0}'
.format(self.observation_model.parameter_names))
return self.posterior_mean_values[param_index]
[docs]
def get_parameter_distribution(self, t, name, plot=False, density=True, **kwargs):
"""
Compute the marginal parameter distribution at a given time step.
Args:
t: Time step/stamp for which the parameter distribution is evaluated (or 'avg' for time-averaged parameter
distribution)
name(str): Name of the parameter to display
plot(bool): If True, a plot of the distribution is created
density(bool): If true, probability density is returned; if false, probability values
**kwargs: All further keyword-arguments are passed to the plot (see matplotlib documentation)
Returns:
ndarray, ndarray: The first array contains the parameter values, the second one the corresponding
probability (density) values
"""
if len(self.posterior_sequence) == 0:
raise PostProcessingError('Cannot plot posterior sequence as it has not yet been computed. '
'Run complete fit.')
if t == 'avg':
# compute time-averaged posterior distribution
dist = np.sum(self.posterior_sequence, axis=0)/len(self.posterior_sequence)
else:
# check if supplied time stamp exists
if t not in self.formatted_timestamps:
raise PostProcessingError('Supplied time ({}) does not exist in data or is out of range.'.format(t))
time_index = list(self.formatted_timestamps).index(t) # to select corresponding posterior distribution
# select posterior distribution of speciifed time step
dist = self.posterior_sequence[time_index]
# get parameter index
param_index = -1
for i, n in enumerate(self.observation_model.parameter_names):
if n == name:
param_index = i
# check if match was found
if param_index == -1:
raise PostProcessingError('Wrong parameter name. Available options: {0}'
.format(self.observation_model.parameter_names))
axes_to_marginalize = list(range(len(self.observation_model.parameter_names)))
try:
axes_to_marginalize.remove(param_index)
except ValueError:
raise PostProcessingError('Wrong parameter index. Available indices: {}'.format(axes_to_marginalize))
x = self.marginal_grid[param_index]
dx = self.lattice_constant[param_index]
marginal_distribution = np.squeeze(np.apply_over_axes(np.sum, dist, axes_to_marginalize)).copy()
if density:
marginal_distribution /= dx
if plot:
plt.fill_between(x, 0, marginal_distribution, **kwargs)
plt.xlabel(self.observation_model.parameter_names[param_index])
if density:
plt.ylabel('probability density')
else:
plt.ylabel('probability')
return x, marginal_distribution
[docs]
def get_parameter_distributions(self, name, plot=False, density=True, **kwargs):
"""
Computes the time series of marginal posterior distributions with respect to a given model parameter.
Args:
name(str): Name of the parameter to display
plot(bool): If True, a plot of the series of distributions is created (density map)
density(bool): If true, probability density is returned; if false, probability values
**kwargs: All further keyword-arguments are passed to the plot (see matplotlib documentation)
Returns:
ndarray, ndarray: The first array contains the parameter values, the second one the sequence of
corresponding posterior distributions.
"""
if len(self.posterior_sequence) == 0:
raise PostProcessingError('Cannot plot posterior sequence as it has not yet been computed. '
'Run complete fit.')
dt = self.formatted_timestamps[1:] - self.formatted_timestamps[:-1]
if not np.all(dt == dt[0]):
print('! WARNING: Time stamps are not equally spaced. This may result in false plotting of parameter '
'distributions.')
# get parameter index
param_index = -1
for i, n in enumerate(self.observation_model.parameter_names):
if n == name:
param_index = i
# check if match was found
if param_index == -1:
raise PostProcessingError('Wrong parameter name. Available options: {0}'
.format(self.observation_model.parameter_names))
axes_to_marginalize = list(range(1, len(self.observation_model.parameter_names) + 1)) # axis 0 is time
try:
axes_to_marginalize.remove(param_index + 1)
except ValueError:
raise PostProcessingError('Wrong parameter index. Available indices: {}'
.format(np.array(axes_to_marginalize) - 1))
x = self.marginal_grid[param_index]
dx = self.lattice_constant[param_index]
marginal_posterior_sequence = np.squeeze(
np.apply_over_axes(np.sum, self.posterior_sequence, axes_to_marginalize)).copy()
if density:
marginal_posterior_sequence /= dx
if plot:
if 'c' in kwargs:
cmap = create_colormap(kwargs['c'])
elif 'color' in kwargs:
cmap = create_colormap(kwargs['color'])
else:
cmap = create_colormap('b')
plt.imshow(marginal_posterior_sequence.T,
origin='lower',
cmap=cmap,
extent=[self.formatted_timestamps[0], self.formatted_timestamps[-1]] + self.boundaries[param_index],
aspect='auto')
return x, marginal_posterior_sequence
[docs]
def plot_parameter_evolution(self, name, color='b', gamma=0.5, **kwargs):
"""
Extended plot method to display a series of marginal posterior distributions corresponding to a single model
parameter. In contrast to getMarginalParameterDistributions(), this method includes the removal of plotting
artefacts, gamma correction as well as an overlay of the posterior mean values.
Args:
name(str): name of the parameter to display
color: color from which a light colormap is created
gamma(float): exponent for gamma correction of the displayed marginal distribution; default: 0.5
kwargs: all further keyword-arguments are passed to the plot of the posterior mean values
"""
if len(self.posterior_sequence) == 0:
raise PostProcessingError('Cannot plot posterior sequence as it has not yet been computed. '
'Run complete fit.')
dt = self.formatted_timestamps[1:] - self.formatted_timestamps[:-1]
if not np.all(dt == dt[0]):
print('! WARNING: Time stamps are not equally spaced. This may result in false plotting of parameter '
'distributions.')
# get parameter index
param_index = -1
for i, n in enumerate(self.observation_model.parameter_names):
if n == name:
param_index = i
# check if match was found
if param_index == -1:
raise PostProcessingError('Wrong parameter name. Available options: {0}'
.format(self.observation_model.parameter_names))
axes_to_marginalize = list(range(1, len(self.observation_model.parameter_names) + 1)) # axis 0 is time
try:
axes_to_marginalize.remove(param_index + 1)
except ValueError:
raise PostProcessingError('Wrong parameter index to plot. Available indices: {}'
.format(np.array(axes_to_marginalize)-1))
marginal_posterior_sequence = np.squeeze(np.apply_over_axes(np.sum, self.posterior_sequence, axes_to_marginalize))
# clean up very small probability values, as they may create image artefacts
pmax = np.amax(marginal_posterior_sequence)
marginal_posterior_sequence[marginal_posterior_sequence < pmax*(10**-20)] = 0
plt.imshow(marginal_posterior_sequence.T**gamma,
origin='lower',
cmap=create_colormap(color),
extent=[self.formatted_timestamps[0], self.formatted_timestamps[-1]] + self.boundaries[param_index],
aspect='auto')
# set default color of plot to black
if ('c' not in kwargs) and ('color' not in kwargs):
kwargs['c'] = 'k'
# set default linewidth to 1.5
if ('lw' not in kwargs) and ('linewidth' not in kwargs):
kwargs['lw'] = 1.5
plt.plot(self.formatted_timestamps, self.posterior_mean_values[param_index], **kwargs)
plt.ylim(self.boundaries[param_index])
plt.ylabel(self.observation_model.parameter_names[param_index])
plt.xlabel('time step')
[docs]
def plot(self, name, **kwargs):
"""
Convenience method to plot the temporal evolution of observation model parameters, or the parameter distribution
at a specific time step. Extended functionality for other study classes.
Args:
name(str): name of the parameter to display
color: color from which a light colormap is created (for parameter evolution only)
gamma(float): exponent for gamma correction of the displayed marginal distribution; default: 0.5 (for
parameter evolution only)
t: Time step/stamp for which the parameter distribution is evaluated
density(bool): If true, probability density is plotted; if false, probability values
kwargs: all further keyword-arguments are passed to the axes object of the plot
"""
density = kwargs.pop('density', True)
# plot parameter distribution at specific time step
if 't' in kwargs.keys():
t = kwargs.pop('t')
self.get_parameter_distribution(t, name, plot=True, density=density, **kwargs)
# plot parameter evolution
else:
# read additional kwargs (or set default values)
color = kwargs.pop('color', 'b')
gamma = kwargs.pop('gamma', 0.5)
self.plot_parameter_evolution(name, color=color, gamma=gamma, **kwargs)
def _check_consistency(self):
"""
This method is called at the very beginning of analysis methods to ensure that all necessary elements of the
model are set correctly. If problem with user input is detected, an exception will be raised.
"""
if len(self.raw_data) == 0:
raise ConfigurationError('No data loaded.')
if not self.observation_model:
raise ConfigurationError('No observation model chosen.')
if not self.transition_model:
raise ConfigurationError('No transition model chosen.')
# check for duplicate hyper-parameter names
flat_names = self._unpack_all_hyper_parameters(values=False)
u, i = np.unique(flat_names, return_inverse=True)
duplicates = u[np.bincount(i) > 1]
if len(duplicates) > 0:
raise ConfigurationError('Detected duplicate hyper-parameter names: {}.'.format(duplicates))
[docs]
class HyperStudy(Study):
"""
Infers hyper-parameter distributions. This class serves as an extension to the basic Study class and allows to
compute the distribution of hyper-parameters of a given transition model. For further information, see the
documentation of the fit-method of this class.
"""
def __init__(self, silent=False):
super(HyperStudy, self).__init__(silent=silent)
self.hyper_grid = []
self.hyper_grid_values = []
self.hyper_grid_constant = []
self.flat_hyper_parameters = []
self.flat_hyper_parameter_names = []
self.flat_hyper_priors = []
self.flat_hyper_prior_values = []
self.hyper_parameter_distribution = None
self.average_posterior_sequence = None
self.log_evidence_list = []
self.local_evidence_list = []
if not silent:
print(' --> Hyper-study')
def _create_hyper_grid(self, silent=False):
"""
Creates an array of hyper-parameter values that are fitted. Also determines grid constants for proper
normalisation and computes prior probability (density) values
Args:
silent(bool): If true, no output is produced by this method
"""
# extract flat list of hyper-parameter names and values
self.flat_hyper_parameters = self._unpack_all_hyper_parameters()
self.flat_hyper_parameter_names = self._unpack_all_hyper_parameters(values=False)
self.flat_hyper_priors = self._unpack_all_hyper_priors()
# look if change/break-points have value 'all' and assign array of all time-stamps
for i, v in enumerate(self.flat_hyper_parameters):
if isinstance(v, str) and v == 'all':
self.flat_hyper_parameters[i] = self.formatted_timestamps[:-1]
# create hyper-parameter grid
temp = np.meshgrid(*self.flat_hyper_parameters, indexing='ij')
if len(self.flat_hyper_parameter_names) > 0:
self.hyper_grid_values = np.array([t.ravel() for t in temp]).T
else:
self.hyper_grid_values = np.array([])
# find lattice constants for equally spaced hyper-parameter values
self.hyper_grid_constant = []
for values in self.flat_hyper_parameters:
if isinstance(values, Iterable) and len(values) > 1:
a = np.array(values)
d = a[1:] - a[:-1]
dd = d[1:] - d[:-1]
if np.all(np.abs(dd) < 10 ** -10): # for equally spaced values, set difference as grid-constant
self.hyper_grid_constant.append(np.abs(d[0]))
else: # for irregularly spaced values (e.g. categorical), set grid-constant to 1
self.hyper_grid_constant.append(1)
else: # for single value, set grid-constant to 1
self.hyper_grid_constant.append(1)
self.hyper_grid_constant = np.array(self.hyper_grid_constant)
# evaluate hyper-prior values
prior_values_list = []
prior_names_list = []
for prior, values, grid_const, name in zip(self.flat_hyper_priors, self.flat_hyper_parameters,
self.hyper_grid_constant, self.flat_hyper_parameter_names):
if prior is None: # no prior specified
prior_values = np.ones_like(values, dtype=float)
prior_values /= np.sum(prior_values)
prior_values /= grid_const
prior_names_list.append('uniform')
elif hasattr(prior, '__call__'): # prior specified by function
try:
prior_values = np.array([prior(value) for value in values])
norm = np.sum(prior_values)
prior_values /= norm
prior_values /= grid_const
except:
raise ConfigurationError('Failed to set hyper-prior for "{}" from function "{}".'
.format(name, prior.__name__))
if norm != 1.:
prior_names_list.append(prior.__name__ + ' (re-normalized)')
else:
prior_names_list.append(prior.__name__)
elif isinstance(prior, Iterable): # prior specified by list/array
if len(prior) != len(values):
raise ConfigurationError('Failed to set hyper-prior for "{}" from list/array.'.format(name))
prior_values = np.array(prior, dtype=float)
norm = np.sum(prior_values)
prior_values /= norm
prior_values /= grid_const
if norm != 1.:
prior_names_list.append('list/array (re-normalized)')
else:
prior_names_list.append('list/array')
else: # SymPy RV
if len(free_symbols(prior)) > 0:
raise ConfigurationError('Hyper-prior for "{}" must not contain free parameters.'.format(name))
# get symbolic representation of probability density
x = abc.x
sym_density = density(prior)(x)
# get density as lambda function
pdf = lambdify([x], sym_density, modules=['numpy', {'factorial': factorial}])
# evaluate density
prior_values = pdf(values)
prior_names_list.append(str(sym_density))
prior_values_list.append(prior_values)
# create hyper-prior grid
if len(self.flat_hyper_parameter_names) > 0:
temp = np.meshgrid(*prior_values_list, indexing='ij')
self.flat_hyper_prior_values = np.array([t.ravel() for t in temp]).T
self.flat_hyper_prior_values = np.prod(self.flat_hyper_prior_values, axis=1) # joint prob. density
if not silent and len(self.hyper_grid_values) > 1:
print('+ Set hyper-prior(s): {}'.format(prior_names_list))
else:
# we need a dummy value for transition models without hyper-parameters
self.flat_hyper_prior_values = np.array([1])
[docs]
def fit(self, forward_only=False, evidence_only=False, silent=False, n_jobs=1, custom_hyper_grid=False,
cache_likelihoods='auto', max_cache_size=512):
"""
This method over-rides the according method of the Study-class. It runs the algorithm for equally spaced hyper-
parameter values as defined by the variable 'hyper_grid'. The posterior sequence represents the average
model of all analyses. Posterior mean values are computed from this average model.
Args:
forward_only(bool): If set to True, the fitting process is terminated after the forward pass. The resulting
posterior distributions are so-called "filtering distributions" which - at each time step -
only incorporate the information of past data points. This option thus emulates an online
analysis.
evidence_only(bool): If set to True, only forward pass is run and evidence is calculated. In contrast to the
forward_only option, no posterior mean values are computed and no posterior distributions are stored.
silent(bool): If set to true, reduced output is created by this method.
n_jobs(int): Number of processes to employ. Multiprocessing is based on the 'joblib' module.
custom_hyper_grid(bool): If set to true, the method "_create_hyper_grid" is not called before starting the fit.
This is used by the class "ChangepointStudy", which employs a custom version of "_create_hyper_grid".
cache_likelihoods(bool or str): If set to True, observation likelihoods are cached for all time steps and
reused across hyper-parameter values. If set to "auto", caching is enabled only when the estimated cache
size fits into max_cache_size. If False, likelihoods are computed on demand.
max_cache_size(float): Maximum likelihood cache size in MiB used by cache_likelihoods="auto". Set to None for
no limit.
"""
self.fit_warning_counter = 0
# format data/timestamps once, so number of data segments is known and _createGrid() works properly
self.formatted_data = moving_window(self.raw_data, self.observation_model.segment_length)
self.formatted_timestamps = self.raw_timestamps[self.observation_model.segment_length - 1:]
# create hyper-parameter grid
if not custom_hyper_grid:
self._create_hyper_grid(silent=silent)
# additional consistency check if multiple change/break-points share identical values.
# in this case, a ChangepointStudy should be used!
names = list(flatten(self._unpack_changepoint_names(self.transition_model))) + \
list(flatten(self._unpack_breakpoint_names(self.transition_model)))
if len(names) > 1:
indices = [self.flat_hyper_parameter_names.index(name) for name in names]
values = self.hyper_grid_values[:, indices]
for v in values:
if np.unique(v).size < v.size:
raise ConfigurationError('Detected multiple change-/break-points with identical values and/or '
'overlapping value intervals. Use "ChangepointStudy" instead of '
'"HyperStudy" for such cases.')
self._check_consistency()
if evidence_only:
self.posterior_sequence = []
self.posterior_mean_values = []
self.average_posterior_sequence = None
if not evidence_only:
# The average posterior distribution is stored in log-space for numerical stability. This way, it can be
# updated iteratively without storing all individual posterior distributions (after the iteration over all
# hyper-parameter values is done, it is transformed back to linear space)
self.average_posterior_sequence = np.zeros([len(self.formatted_data)]+self.grid_size) - np.inf
self.log_evidence_list = []
self.local_evidence_list = []
# hyper-study fit is only necessary for more than one combination of hyper-parameter values
if len(self.hyper_grid_values) > 1:
if not silent:
print('+ Started new fit.')
print(' + {} analyses to run.'.format(len(self.hyper_grid_values)))
if n_jobs > 1:
# prepare parallel execution
if not silent:
print(' + Creating {} processes.'.format(n_jobs))
# use _parallel_fit method to create copies of this HyperStudy instance with only partial
# hyper-grid values
sub_studies = Parallel(n_jobs=n_jobs, prefer='processes')(
delayed(self._parallel_fit)(
idx,
n_jobs,
forward_only,
evidence_only,
silent,
cache_likelihoods,
max_cache_size,
)
for idx in range(n_jobs)
)
# merge all sub-studies
for S in sub_studies:
self.log_evidence_list += S.log_evidence_list
self.local_evidence_list += S.local_evidence_list
if not evidence_only:
self.average_posterior_sequence = np.logaddexp(self.average_posterior_sequence,
S.average_posterior_sequence)
# single process fit
else:
likelihood_sequence = None
use_cache, cache_size = self._should_cache_likelihoods(cache_likelihoods, max_cache_size)
if use_cache:
likelihood_sequence = self._compute_likelihood_sequence()
if not silent:
print(' + Cached observation likelihoods ({:.1f} MiB).'.format(cache_size))
# show progressbar if silent=False
if not silent:
enum = tqdm(enumerate(self.hyper_grid_values), total=len(self.hyper_grid_values))
else:
enum = enumerate(self.hyper_grid_values)
for i, hyper_param_values in enum:
self._set_selected_hyper_parameters(hyper_param_values)
# call fit method from parent class
self._fit_formatted_data(forward_only=forward_only,
evidence_only=evidence_only,
silent=True,
cache_likelihoods=cache_likelihoods,
max_cache_size=max_cache_size,
likelihood_sequence=likelihood_sequence)
self.log_evidence_list.append(self.log_evidence)
self.local_evidence_list.append(self.local_evidence)
if (not evidence_only) and np.isfinite(self.log_evidence):
# For numerical stability, zeros in posterior distribution are replaced with small constant.
# Note that this is typically only the case for hyper-parameter values with low likelihood. The
# procedure therefore has no (measurable) effect on the average posterior sequence.
self.posterior_sequence[self.posterior_sequence < 10.**-300] = 10.**-300
self.average_posterior_sequence = np.logaddexp(self.average_posterior_sequence,
np.log(self.posterior_sequence) +
self.log_evidence +
np.log(self.flat_hyper_prior_values[i]))
# remove progressbar correctly
if not silent:
enum.close()
if not evidence_only:
# transform average posterior distribution into linear space
# (we prefer underflows rather than overflows)
self.average_posterior_sequence -= np.amax(self.average_posterior_sequence)
self.average_posterior_sequence = np.exp(self.average_posterior_sequence)
# compute average posterior distribution
normalization = np.array([np.sum(posterior) for posterior in self.average_posterior_sequence])
for i in range(len(self.grid)):
normalization = normalization[:, None] # add axis; needs to match average_posterior_sequence
self.average_posterior_sequence /= normalization
# set self.posterior_sequence to average posterior sequence for plotting reasons
self.posterior_sequence = self.average_posterior_sequence
if not silent:
print(' + Computed average posterior sequence')
# compute hyper-parameter distribution
log_hyper_parameter_distribution = self.log_evidence_list + np.log(self.flat_hyper_prior_values) + \
np.sum(np.log(self.hyper_grid_constant))
# transform hyper-parameter distribution into linear space
# (we prefer underflows rather than overflows)
scaled_log_hyper_parameter_distribution = log_hyper_parameter_distribution - np.amax(log_hyper_parameter_distribution)
self.hyper_parameter_distribution = np.exp(scaled_log_hyper_parameter_distribution)
self.hyper_parameter_distribution /= np.sum(self.hyper_parameter_distribution)
self.hyper_parameter_distribution /= np.prod(self.hyper_grid_constant) # probability density
if not silent:
print(' + Computed hyper-parameter distribution')
# compute log-evidence of average model
self.log_evidence = logsumexp(log_hyper_parameter_distribution)
if not silent:
print(' + Log10-evidence of average model: {:.5f}'.format(self.log_evidence / np.log(10)))
# compute local evidence of average model
self.local_evidence = np.sum((np.array(self.local_evidence_list).T*self.flat_hyper_prior_values).T, axis=0)
if not silent:
print(' + Computed local evidence of average model')
# compute posterior mean values
if not evidence_only:
self.posterior_mean_values = np.empty([len(self.grid), len(self.posterior_sequence)])
for i in range(len(self.grid)):
self.posterior_mean_values[i] = np.array([np.sum(p*self.grid[i]) for p in self.posterior_sequence])
if not silent:
print(' + Computed mean parameter values.')
# clear local_evidence_list (to keep file size small for stored studies)
self.local_evidence_list = []
# restore hyper-parameter values (individual values have been set during fitting)
self._set_all_hyper_parameters(self.flat_hyper_parameters)
if not silent:
print('+ Finished fit.')
# only one combination of hyper-parameter values
else:
if not silent:
if len(self.hyper_grid_values) == 1:
print('+ Only one combination of hyper-parameter values, switching to standard fit method.')
if len(self.hyper_grid_values) == 0:
print('+ Transition model contains no hyper-parameters, switching to standard fit method.')
Study.fit(self,
forward_only=forward_only,
evidence_only=evidence_only,
silent=silent,
cache_likelihoods=cache_likelihoods,
max_cache_size=max_cache_size)
def _parallel_fit(self, idx, n_jobs, forward_only, evidence_only, silent, cache_likelihoods, max_cache_size):
"""
This method is called by the fit method of the HyperStudy class. It creates a copy of the current class
instance and performs a fit based on a subset of the specified hyper-parameter grid. The method thus allows
to distribute a HyperStudy fit among multiple processes for multiprocessing.
Args:
idx(int): Index from 0 to (n_jobs-1), indicating which part of the hyper-grid values are to be analyzed.
n_jobs(int): Number of processes to employ. Multiprocessing is based on the 'joblib' module.
forward_only(bool): If set to True, the fitting process is terminated after the forward pass. The resulting
posterior distributions are so-called "filtering distributions" which - at each time step -
only incorporate the information of past data points. This option thus emulates an online
analysis.
evidence_only(bool): If set to True, only forward pass is run and evidence is calculated. In contrast to the
forward_only option, no posterior mean values are computed and no posterior distributions are stored.
silent(bool): If set to True, no output is generated by the fitting method.
cache_likelihoods(bool or str): Whether to cache observation likelihoods.
max_cache_size(float): Maximum likelihood cache size in MiB used by cache_likelihoods="auto".
Returns:
HyperStudy instance
"""
S = copy(self)
S.hyper_grid_values = np.array_split(S.hyper_grid_values, n_jobs)[idx]
S.flat_hyper_prior_values = np.array_split(S.flat_hyper_prior_values, n_jobs)[idx]
likelihood_sequence = None
if len(S.hyper_grid_values) > 1:
use_cache, _ = S._should_cache_likelihoods(cache_likelihoods, max_cache_size)
if use_cache:
likelihood_sequence = S._compute_likelihood_sequence()
# show progressbar for last process if silent=False
if not silent and idx == n_jobs-1:
enum = tqdm(enumerate(S.hyper_grid_values), total=len(S.hyper_grid_values))
else:
enum = enumerate(S.hyper_grid_values)
for i, hyper_param_values in enum:
S._set_selected_hyper_parameters(hyper_param_values)
# call fit method from parent class
S._fit_formatted_data(forward_only=forward_only,
evidence_only=evidence_only,
silent=True,
cache_likelihoods=cache_likelihoods,
max_cache_size=max_cache_size,
likelihood_sequence=likelihood_sequence)
S.log_evidence_list.append(S.log_evidence)
S.local_evidence_list.append(S.local_evidence)
if (not evidence_only) and np.isfinite(S.log_evidence):
# For numerical stability, zeros in posterior distribution are replaced with small constant.
# Note that this is typically only the case for hyper-parameter values with low likelihood. The
# procedure therefore has no (measurable) effect on the average posterior sequence.
S.posterior_sequence[S.posterior_sequence < 10. ** -300] = 10. ** -300
S.average_posterior_sequence = np.logaddexp(S.average_posterior_sequence,
np.log(S.posterior_sequence) +
S.log_evidence +
np.log(S.flat_hyper_prior_values[i]))
# remove progressbar correctly
if not silent and idx == n_jobs-1:
enum.close()
return S
# optimization methods are inherited from Study class, but cannot be used in this case
[docs]
def optimize(self, *args, **kwargs):
raise NotImplementedError('HyperStudy object has no optimizing method.')
def _unpack_hyper_priors(self, transition_model):
"""
Returns flat list of all hyper-priors in transition model order.
Args:
transition_model: An instance of a transition model
Returns:
list: hyper-priors
"""
prior_list = []
# recursion step for sub-models
if hasattr(transition_model, 'models'):
for m in transition_model.models:
prior_list.extend(self._unpack_hyper_priors(m))
# append prior
if hasattr(transition_model, 'prior'):
hyper_parameter_names = getattr(transition_model, 'hyper_parameter_names', [])
n_parameters = len(hyper_parameter_names)
# only take priors from transition models that define hyper-parameters. This includes the structural
# break parameters stored directly in SerialTransitionModel.
if n_parameters == 1:
prior = transition_model.prior
if isinstance(prior, (list, tuple)) and len(prior) == 1:
first_prior = prior[0]
if (first_prior is None or hasattr(first_prior, '__call__') or
type(first_prior) is sympy.stats.rv.RandomSymbol or
isinstance(first_prior, (list, tuple, np.ndarray))):
prior = first_prior
prior_list.append(prior)
elif n_parameters > 1:
prior = transition_model.prior
if prior is None:
prior_list.extend([None] * n_parameters)
elif isinstance(prior, Iterable) and not isinstance(prior, (str, np.ndarray)):
if len(prior) != n_parameters:
raise ConfigurationError(
'{} priors are defined for transition model "{}", but model contains {} '
'hyper-parameters.'.format(len(prior), transition_model, n_parameters))
prior_list.extend(list(prior))
else:
raise ConfigurationError(
'Transition model "{}" contains {} hyper-parameters, but its hyper-prior is not a '
'matching list or tuple.'.format(transition_model, n_parameters))
elif str(transition_model) == 'Break-point':
prior_list.append(transition_model.prior)
return prior_list
def _unpack_all_hyper_priors(self):
"""
Returns a flattened list of all hyper-priors of the current transition model.
Returns:
list: all hyper-priors of the current transition model
"""
return self._unpack_hyper_priors(self.transition_model)
[docs]
def get_hyper_parameter_distribution(self, name, plot=False, **kwargs):
"""
Computes marginal hyper-parameter distribution of a single hyper-parameter in a HyperStudy fit.
Args:
name(str): Name of the hyper-parameter to display
(first model hyper-parameter)
plot(bool): If True, a bar chart of the distribution is created
**kwargs: All further keyword-arguments are passed to the bar-plot (see matplotlib documentation)
Returns:
ndarray, ndarray: The first array contains the hyper-parameter values, the second one the
corresponding probability values
"""
# check if only a standard fit has been carried out
if len(self.hyper_grid_values) < 2:
raise PostProcessingError('At least two combinations of hyper-parameter values need to be fitted to '
'evaluate a hyper-parameter distribution. Check transition model.')
param_index = self._get_hyper_parameter_index(self.transition_model, name)
axes_to_marginalize = list(range(len(self.flat_hyper_parameter_names)))
axes_to_marginalize.remove(param_index)
# reshape hyper-parameter distribution for easy marginalizing
hyper_grid_steps = []
for x in self.flat_hyper_parameters:
if isinstance(x, Iterable):
hyper_grid_steps.append(len(x))
else:
hyper_grid_steps.append(1)
distribution = self.hyper_parameter_distribution.reshape(hyper_grid_steps, order='C')
marginal_distribution = np.squeeze(np.apply_over_axes(np.sum, distribution, axes_to_marginalize))
marginal_distribution *= np.prod(self.hyper_grid_constant) # convert to probability (from density)
x = self.flat_hyper_parameters[param_index]
if plot:
# check if categorical
if np.any(np.abs(np.diff(np.diff(x))) > 10 ** -10):
plt.bar(np.arange(len(x)), marginal_distribution, align='center', width=1., **kwargs)
plt.xticks(np.arange(len(x)), x)
plt.ylabel('probability')
# regular spacing
else:
plt.bar(x, marginal_distribution, align='center', width=self.hyper_grid_constant[param_index], **kwargs)
plt.ylabel('probability')
plt.xlabel(self.flat_hyper_parameter_names[param_index])
return x, marginal_distribution
[docs]
def get_joint_hyper_parameter_distribution(self, names, plot=False, figure=None, subplot=111, **kwargs):
"""
Computes the joint distribution of two hyper-parameters of a HyperStudy and optionally creates a 3D bar chart.
Note that the 3D plot can only be included in an existing plot by passing a figure object and subplot
specification.
Args:
names(list): List of two hyper-parameter names to display
plot(bool): If True, a 3D-bar chart of the distribution is created
figure: In case the plot is supposed to be part of an existing figure, it can be passed to the method. By
default, a new figure is created.
subplot: Characterization of subplot alignment, as in matplotlib. Default: 111
**kwargs: all further keyword-arguments are passed to the bar3d-plot (see matplotlib documentation)
Returns:
ndarray, ndarray, ndarray: The first and second array contains the hyper-parameter values, the
third one the corresponding probability values
"""
# check if only a standard fit has been carried out
if len(self.hyper_grid_values) < 2:
raise PostProcessingError('At least two combinations of hyper-parameter values need to be fitted to '
'evaluate a hyper-parameter distribution. Check transition model.')
# check if list with two elements is provided
if not isinstance(names, Iterable):
raise PostProcessingError('A list of exactly two hyper-parameters has to be provided.')
elif not len(names) == 2:
raise PostProcessingError('A list of exactly two hyper-parameters has to be provided.')
param_indices = [self._get_hyper_parameter_index(self.transition_model, n) for n in names]
# put parameter indices in ascending order for marginalization and plotting
# original order is restored for returned values
switch = False
if not param_indices[0] < param_indices[1]:
switch = True
param_indices = param_indices[::-1]
axes_to_marginalize = list(range(len(self.flat_hyper_parameter_names)))
for p in param_indices:
axes_to_marginalize.remove(p)
# reshape hyper-parameter distribution for easy marginalizing
hyper_grid_steps = []
for x in self.flat_hyper_parameters:
if isinstance(x, Iterable):
hyper_grid_steps.append(len(x))
else:
hyper_grid_steps.append(1)
distribution = self.hyper_parameter_distribution.reshape(hyper_grid_steps, order='C')
marginal_distribution = np.squeeze(np.apply_over_axes(np.sum, distribution, axes_to_marginalize))
marginal_distribution *= np.prod(self.hyper_grid_constant) # convert to probability (from density)
x, y = [self.flat_hyper_parameters[i] for i in param_indices]
if np.any(np.abs(np.diff(np.diff(x))) > 10 ** -10):
x2 = np.tile(np.arange(len(x)), (len(y), 1)).T
else:
x2 = np.tile(x, (len(y), 1)).T
if np.any(np.abs(np.diff(np.diff(y))) > 10 ** -10):
y2 = np.tile(np.arange(len(y)), (len(x), 1))
else:
y2 = np.tile(y, (len(x), 1))
z = marginal_distribution
if plot:
# allow to add plot to predefined figure
if figure is None:
fig = plt.figure()
else:
fig = figure
ax = fig.add_subplot(subplot, projection='3d')
ax.bar3d(x2.flatten() - self.hyper_grid_constant[param_indices[0]]/2.,
y2.flatten() - self.hyper_grid_constant[param_indices[1]]/2.,
z.flatten()*0.,
self.hyper_grid_constant[param_indices[0]],
self.hyper_grid_constant[param_indices[1]],
z.flatten(),
zsort='max',
**kwargs
)
# check for categorical hyper-parameter values
if np.any(np.abs(np.diff(np.diff(x))) > 10 ** -10):
plt.xticks(np.arange(len(x)), x)
if np.any(np.abs(np.diff(np.diff(y))) > 10 ** -10):
plt.yticks(np.arange(len(y)), y)
ax.set_xlabel(self.flat_hyper_parameter_names[param_indices[0]])
ax.set_ylabel(self.flat_hyper_parameter_names[param_indices[1]])
ax.set_zlabel('probability')
# restore original order of parameters before returning values
if switch:
x, y = y, x
marginal_distribution = marginal_distribution.T
return x, y, marginal_distribution
[docs]
def plot(self, name, **kwargs):
"""
Convenience method to plot the temporal evolution of observation model parameters, the distribution of a
parameter at a specific time step, or the distribution of a hyper-parameter.
Args:
name(str): name of the (hyper-)parameter to display
color: color from which a light colormap is created (for parameter evolution only)
gamma(float): exponent for gamma correction of the displayed marginal distribution; default: 0.5 (for
parameter evolution only)
t: Time step/stamp for which the parameter distribution is evaluated
density(bool): If true, probability density is plotted; if false, probability values. Note: Only availble
for parameters, not hyper-parameters.
kwargs: all further keyword-arguments are passed to the axes object of the plot
"""
density = kwargs.pop('density', True)
# plot parameter distribution at specific time step
if 't' in kwargs.keys():
t = kwargs.pop('t')
self.get_parameter_distribution(t, name, plot=True, density=density, **kwargs)
else:
# check is name belongs to hyper-parameter
try:
self._get_hyper_parameter_index(self.transition_model, name)
hyper = True
except PostProcessingError:
hyper = False
# for hyper-parameters, plot distribution
if hyper:
self.get_hyper_parameter_distribution(name, plot=True, **kwargs)
# for parameters, plot temporal evolution
else:
# read additional kwargs (or set default values)
color = kwargs.pop('color', 'b')
gamma = kwargs.pop('gamma', 0.5)
self.plot_parameter_evolution(name, color=color, gamma=gamma, **kwargs)
[docs]
class ChangepointStudy(HyperStudy):
"""
Infers change-points and structural breaks. This class builds on the HyperStudy-class and the change-point
transition model to perform a series of analyses with varying change point times. It subsequently computes the
average model from all possible change points and creates a probability distribution of change point times. It
supports any number of change-points and arbitarily combined models.
"""
def __init__(self, silent=False):
super(ChangepointStudy, self).__init__(silent=silent)
# store all possible combinations of change-points (even the ones that are assigned a probability of zero),
# to reconstruct change-point distribution after analysis
self.all_hyper_grid_values = []
self.all_hyper_prior_values = []
self.mask = [] # mask to select valid change-point combinations
self.user_defined_grid = False # needed to ensure that user-defined hyper-grid is not overwritten by fit-method
self.hyper_grid_backup = [] # needed to reconstruct hyper_grid attribute in the case of break-point model
if not silent:
print(' --> Change-point analysis')
[docs]
def fit(self, forward_only=False, evidence_only=False, silent=False, n_jobs=1,
cache_likelihoods='auto', max_cache_size=512):
"""
This method over-rides the corresponding method of the HyperStudy-class. It runs the algorithm for all possible
combinations of change-points (and possible scans a range of values for other hyper-parameters). The posterior
sequence represents the average model of all analyses. Posterior mean values are computed from this average
model.
Args:
forward_only(bool): If set to True, the fitting process is terminated after the forward pass. The resulting
posterior distributions are so-called "filtering distributions" which - at each time step -
only incorporate the information of past data points. This option thus emulates an online
analysis.
evidence_only(bool): If set to True, only forward pass is run and evidence is calculated. In contrast to the
forward_only option, no posterior mean values are computed and no posterior distributions are stored.
silent(bool): If set to True, reduced output is generated by the fitting method.
n_jobs(int): Number of processes to employ. Multiprocessing is based on the 'joblib' module.
cache_likelihoods(bool or str): If set to True, observation likelihoods are cached for all time steps and
reused across hyper-parameter values. If set to "auto", caching is enabled only when the estimated cache
size fits into max_cache_size. If False, likelihoods are computed on demand.
max_cache_size(float): Maximum likelihood cache size in MiB used by cache_likelihoods="auto". Set to None for
no limit.
"""
# format data/timestamps once, so number of data segments is known
self.formatted_data = moving_window(self.raw_data, self.observation_model.segment_length)
self.formatted_timestamps = self.raw_timestamps[self.observation_model.segment_length - 1:]
# nested serial transition models are not supported, as the correct order is not determined correctly
if len(list(flatten(self._unpack_serial_transition_models(self.transition_model)))) > 1:
raise NotImplementedError('Multiple instances of SerialTransition models are currently not supported by '
'ChangepointStudy.')
# determine names of change/break-points
changepoints = list(flatten(self._unpack_changepoint_names(self.transition_model)))
breakpoints = list(flatten(self._unpack_breakpoint_names(self.transition_model)))
# both types are not allowed at the moment, as the correct order is not determined correctly
if len(changepoints) > 0 and len(breakpoints) > 0:
raise NotImplementedError('Detected both change-points (Changepoint transition model) and break-points '
'(SerialTransitionModel). Currently, only one type is supported in a single '
'transition model.')
# at least one change/break-point should be present
if len(changepoints) == 0 and len(breakpoints) == 0:
raise ConfigurationError('No change-points or break-points detected in transition model. Check transition '
'model.')
self.flat_hyper_parameters = self._unpack_all_hyper_parameters()
self.flat_hyper_parameter_names = self._unpack_all_hyper_parameters(values=False)
# create hyper_grid in the case of change-points
if len(changepoints) > 0:
print('+ Detected {} change-point(s) in transition model: {}'.format(len(changepoints), changepoints))
points = changepoints
else:
print('+ Detected {} break-point(s) in transition model: {}'.format(len(breakpoints), breakpoints))
points = breakpoints
# first create standard hyper-grid
self._create_hyper_grid()
self.all_hyper_grid_values = self.hyper_grid_values[:]
self.all_hyper_prior_values = self.flat_hyper_prior_values[:]
# extract hyper-grid values that belong to changepoints
point_mask = np.sum([np.array(self.flat_hyper_parameter_names) == p for p in points], axis=0).astype(bool)
masked_hyper_grid_values = self.all_hyper_grid_values[:, point_mask]
# only accept if change-point values are ordered (and not equal)
self.mask = np.array(
[all(x[i] < x[i + 1] for i in range(len(points) - 1)) for x in masked_hyper_grid_values],
dtype=bool)
self.hyper_grid_values = self.all_hyper_grid_values[self.mask]
self.flat_hyper_prior_values = self.all_hyper_prior_values[self.mask]
# correct prior values to ensure correct normalization after sorting out combinations
self.flat_hyper_prior_values *= np.sum(self.all_hyper_prior_values)/np.sum(self.all_hyper_prior_values[self.mask])
# call fit method of hyper-study
HyperStudy.fit(self,
forward_only=forward_only,
evidence_only=evidence_only,
silent=silent,
n_jobs=n_jobs,
custom_hyper_grid=True,
cache_likelihoods=cache_likelihoods,
max_cache_size=max_cache_size)
# for proper plotting, hyper_grid_values must include all possible combinations of hyper-parameter values. We
# therefore have to include invalid combinations and assign the probability zero to them.
temp = np.zeros(len(self.all_hyper_grid_values))
temp[self.mask] = self.hyper_parameter_distribution
self.hyper_parameter_distribution = temp
temp = np.zeros(len(self.all_hyper_prior_values))
temp[self.mask] = self.flat_hyper_prior_values
self.flat_hyper_prior_values = temp
def _unpack_serial_transition_models(self, transition_model):
"""
Returns list of all occurrences of serial transition models in the transition model, nested like the transition
model.
Returns:
list: all serial transition models
"""
model_list = []
# recursion step for sub-models
if hasattr(transition_model, 'models'):
for m in transition_model.models:
model_list.append(self._unpack_serial_transition_models(m))
# extend hyper-parameter based on current (sub-)model
if hasattr(transition_model, 'hyper_parameter_names'):
if str(transition_model) == 'Serial transition model':
model_list.append(transition_model)
return model_list
[docs]
def get_duration_distribution(self, names, plot=False, **kwargs):
"""
Computes the distribution of the number of time steps between two change/break-points. This distribution of
duration is created from the joint distribution of the two specified change/break-points.
Args:
names(list): List of two parameter names of change/break-points to display
(first and second model parameter)
plot(bool): If True, a bar chart of the distribution is created
**kwargs: All further keyword-arguments are passed to the bar-plot (see matplotlib documentation)
Returns:
ndarray, ndarray: The first array contains the number of time steps, the second one the corresponding
probability values.
"""
# check if list with two elements is provided
if not isinstance(names, Iterable):
raise PostProcessingError('A list of exactly two hyper-parameters has to be provided.')
elif not len(names) == 2:
raise PostProcessingError('A list of exactly two hyper-parameters has to be provided.')
param_indices = [self._get_hyper_parameter_index(self.transition_model, n) for n in names]
# check if parameter indices are in ascending order (so axes are labeled correctly)
if not param_indices[0] < param_indices[1]:
print('! WARNING: Switching order of change-/breakpoints to obtain positive values for duration.')
param_indices = param_indices[::-1]
values = self.hyper_grid_values[:, param_indices].T
duration = np.unique(values[1] - values[0]) # get all possible differences between time points
duration_distribution = np.zeros(len(duration)) # initialize array for distribution
# loop over all hyper-grid points and collect probabilities for different durations
for i, values in enumerate(self.all_hyper_grid_values[:, param_indices]):
if values[1] > values[0]:
# get matching index in duration (rounding needed because of finite precision)
idx = np.where(duration.round(10) == (values[1]-values[0]).round(10))[0][0]
duration_distribution[idx] += self.hyper_parameter_distribution[i]
# properly normalize duration distribution
duration_distribution /= np.sum(duration_distribution)
if plot:
plt.bar(duration, duration_distribution, align='center', width=duration[0], **kwargs)
plt.xlabel('duration between {} and {} (in time steps)'
.format(self.flat_hyper_parameter_names[param_indices[0]],
self.flat_hyper_parameter_names[param_indices[1]]))
plt.ylabel('probability')
return duration, duration_distribution
[docs]
class OnlineStudy(HyperStudy):
"""
Enables model selection for online data streams. This class builds on the Study-class and features a step-method
to include new data points in the study as they arrive from a data stream. This online-analysis is performed in an
forward-only way, resulting in filtering-distributions only. In contrast to a normal study, however, one can add
multiple transition models to account for different types of parameter dynamics (similar to a Hyper study). The
Online study then computes the probability distribution over all transition models for each new data point (or all
past data points), enabling real-time model selection.
Args:
store_history(bool): If true, posterior distributions and their mean values, as well as hyper-posterior
distributions are stored for all time steps.
"""
def __init__(self, store_history=False, silent=False):
super(OnlineStudy, self).__init__(silent=silent)
self.first_step = True
self.transition_models = []
self.transition_model_names = []
self.tm_count = None
self.tm_counts = []
self.hyper_parameter_values = []
self.all_flat_hyper_parameter_values = []
self.hyper_parameter_names = []
self.hyper_grid_constants = []
self.log_evidence_list = None
self.hyper_log_evidence_list = None
self.hyper_prior = []
self.hyper_prior_values = []
self.transition_model_prior = None
self.parameter_posterior = None
self.transition_model_posterior = None
self.marginalized_posterior = None
self.hyper_parameter_distribution = None
self.transition_model_distribution = None
self.local_transition_model_distribution = None
self.store_history = store_history
self.posterior_mean_values = []
self.posterior_sequence = []
self.hyper_parameter_sequence = []
self.transition_model_sequence = []
self.local_transition_model_sequence = []
if not silent:
print(' --> Online study')
[docs]
def add_transition_model(self, name, transition_model):
"""
Adds a transition model to the list of transition models that are fitted in each time step. Note that a list of
hyper-parameter values can be supplied.
Args:
name(str): a custom name for this transition model to identify it in post-processing methods
transition_model: instance of a transition model class.
Example::
S.set_observation_model(bl.om.Poisson('lambda', bl.oint(0, 6, 1000)))
T = bl.tm.GaussianRandomWalk('sigma', [0, 0.1, 0.2, 0.3], target='lambda')
S.add_transition_model('dynamic', T)
"""
# extract hyper-parameter values and names
self.set_transition_model(transition_model, silent=True)
self._create_hyper_grid(silent=True)
self.transition_models.append(transition_model)
self.transition_model_names.append(name)
self.hyper_parameter_values.append(self.hyper_grid_values[:])
self.all_flat_hyper_parameter_values.append(self.flat_hyper_parameters)
self.hyper_parameter_names.append(self.flat_hyper_parameter_names[:])
self.hyper_grid_constants.append(self.hyper_grid_constant[:])
self.hyper_prior.append(self.flat_hyper_priors[:])
self.hyper_prior_values.append(self.flat_hyper_prior_values[:])
# count individual transition models
self.tm_counts = []
for hpv in self.hyper_parameter_values:
if len(hpv) > 0:
self.tm_counts.append(len(hpv))
else:
self.tm_counts.append(1)
self.tm_count = np.sum(self.tm_counts)
if len(self.hyper_grid_values) > 0:
print('+ Added transition model: {} ({} combination(s) of the following hyper-parameters: {})'
.format(name, len(self.hyper_grid_values), self.hyper_parameter_names[-1]))
else:
print('+ Added transition model: {} (no hyper-parameters)'.format(name))
[docs]
def set_transition_model_prior(self, transition_model_prior, silent=False):
"""
Sets prior probabilities for transition models added to the online study instance.
Args:
transition_model_prior: List/Array of probabilities, one for each transition model. If the list does not sum
to one, it will be re-normalised.
silent: If true, no output is generated by this method.
"""
if not (isinstance(transition_model_prior, Iterable) and len(transition_model_prior) == len(self.transition_models)):
raise ConfigurationError('Length of transition model prior ({}) does not fit number of transition models '
'({})'.format(len(transition_model_prior), len(self.transition_models)))
self.transition_model_prior = np.array(transition_model_prior)
if not np.sum(transition_model_prior) == 1.:
print('+ WARNING: Transition model prior does not sum up to one. Will re-normalize.')
self.transition_model_prior /= np.sum(self.transition_model_prior)
if not silent:
print('+ Set custom transition model prior.')
[docs]
def step(self, data_point):
"""
Update the current parameter distribution by adding a new data point to the data set.
Args:
data_point(float, int, ndarray): Float, int, or 1D-array of those (for multidimensional data).
"""
# at least one transition model has to be set or added
if (self.tm_count is None) and (self.transition_model is None):
raise ConfigurationError('No transition model set or added.')
# if one only sets a transition model, but does not use add_transition_model, we add it here
if (self.tm_count is None) and (self.transition_model is not None):
self.add_transition_model('transition model', self.transition_model)
if not isinstance(data_point, list):
data_point = [data_point]
if len(self.raw_data) == 0:
print('+ Start model fit')
# consistency check to detect duplicate hyper-parameter names in different transition models
all_names = list(flatten(self.hyper_parameter_names))
if len(all_names) != len(np.unique(all_names)):
raise ConfigurationError('Detected duplicate hyper-parameter names. Choose unique identifiers.')
# check the model consistency the first time that 'step' is called
self.raw_data = np.array(data_point)
Study._check_consistency(self)
self.raw_timestamps = np.array([0])
self.formatted_timestamps = []
else:
self.raw_data = np.append(self.raw_data, np.array(data_point), axis=0)
self.raw_timestamps = np.append(self.raw_timestamps, self.raw_timestamps[-1]+1)
# only proceed if at least one data segment can be created
if len(self.raw_data) < self.observation_model.segment_length:
print('+ Not enough data points to start analysis. Will wait for more data.')
return
self.formatted_timestamps.append(self.raw_timestamps[-1])
if self.first_step:
# initialize parameter prior distribution
prior = self._compute_prior(silent=False)
# initialize hyper-prior as flat
if self.hyper_prior is None:
self.hyper_prior = 'flat hyper-prior'
self.hyper_prior_values = [np.ones(tmc) / (tmc * np.prod(hgc))
for tmc, hgc in zip(self.tm_counts, self.hyper_grid_constants)]
print(' + Set flat hyper-prior.')
# initialize transition model prior as flat
if self.transition_model_prior is None:
self.transition_model_prior = np.ones(len(self.transition_models))/len(self.transition_models)
if len(self.transition_models) > 1:
print(' + Set flat transition model prior.')
# initialize log_evidence_list as an array of zeros and add log-lattice-constant for proper normalization
self.log_evidence_list = [np.zeros(tmc)+np.log(np.prod(self.lattice_constant)) for tmc in self.tm_counts]
self.hyper_log_evidence_list = np.array([0. for tmc in self.tm_counts])
# initialize parameter posterior
self.parameter_posterior = [np.zeros([tmc] + self.grid_size) for tmc in self.tm_counts]
# initialize hyper-parameter distribution
self.hyper_parameter_distribution = [np.zeros(tmc) for tmc in self.tm_counts]
# initialize (local) transition model distribution
self.transition_model_distribution = np.zeros(len(self.transition_models))
self.local_transition_model_distribution = np.zeros(len(self.transition_models))
# initialize transition model posterior
self.transition_model_posterior = np.zeros([len(self.transition_models)] + self.grid_size)
# initialize marginalized posterior
self.marginalized_posterior = np.zeros(self.grid_size)
# select data segment
data_segment = self.raw_data[-self.observation_model.segment_length:]
# compute current likelihood only once
likelihood = self.observation_model.processed_pdf(self.grid, data_segment)
# loop over all hypotheses/transition models
for i, (tm, hpv) in enumerate(zip(self.transition_models, self.hyper_parameter_values)):
self.set_transition_model(tm, silent=True) # set current transition model
# account for transition models with no hyper-parameters
if len(hpv) == 0:
hpv = [None]
# loop over all hyper-parameter values to fit
for j, x in enumerate(hpv):
# set current hyper-parameter values
if x is not None:
self._set_all_hyper_parameters(x)
# compute alpha_i
if self.first_step: # first time step, so use predefined prior
alphai = prior*likelihood
else: # in all other time steps transform "old" alpha/posterior
alphai = self.transition_model.compute_forward_prior(self.parameter_posterior[i][j],
len(self.formatted_data)-1)*likelihood
ni = np.sum(alphai)
# update log-evidence list
self.log_evidence_list[i][j] += np.log(ni)
self.hyper_parameter_distribution[i][j] = self.log_evidence_list[i][j] + np.log(self.hyper_prior_values[i][j])
# store parameter posterior
self.parameter_posterior[i][j] = alphai/ni
# marginalize model evidence wrt hyper-parameters, for all past data points and only for the current one
old_hyper_evidence = self.hyper_log_evidence_list[i]
self.hyper_log_evidence_list[i] = logsumexp(self.log_evidence_list[i] + np.log(self.hyper_prior_values[i]))
self.transition_model_distribution[i] = self.hyper_log_evidence_list[i]
self.local_transition_model_distribution[i] = self.hyper_log_evidence_list[i] - old_hyper_evidence + \
np.log(self.transition_model_prior[i])
# normalize hyper-parameter distribution of current transition model
self.hyper_parameter_distribution[i] -= np.amax(self.hyper_parameter_distribution[i])
self.hyper_parameter_distribution[i] = np.exp(self.hyper_parameter_distribution[i])
self.hyper_parameter_distribution[i] /= np.sum(self.hyper_parameter_distribution[i])
if len(self.hyper_grid_constants[i]) > 0:
self.hyper_parameter_distribution[i] /= np.prod(self.hyper_grid_constants[i])
# compute parameter posterior, marginalized wrt the hyper-parameters of the current transition model
hpd = deepcopy(self.hyper_parameter_distribution[i])
while len(self.parameter_posterior[i].shape) > len(hpd.shape):
hpd = np.expand_dims(hpd, axis=-1)
self.transition_model_posterior[i] = np.sum(self.parameter_posterior[i] *
hpd *
np.prod(self.hyper_grid_constants[i]), axis=0)
# normalize (local) transition model distribution (and transform to linear space)
self.transition_model_distribution -= np.amax(self.transition_model_distribution)
self.transition_model_distribution = np.exp(self.transition_model_distribution)
self.transition_model_distribution /= np.sum(self.transition_model_distribution)
self.local_transition_model_distribution -= np.amax(self.local_transition_model_distribution)
self.local_transition_model_distribution = np.exp(self.local_transition_model_distribution)
self.local_transition_model_distribution /= np.sum(self.local_transition_model_distribution)
# normalize marginalized parameter posterior
tmd = deepcopy(self.transition_model_distribution)
while len(self.transition_model_posterior.shape) > len(tmd.shape):
tmd = np.expand_dims(tmd, axis=-1)
self.marginalized_posterior = np.sum(self.transition_model_posterior * tmd, axis=0)
# compute log-evidence value marginalized over different transition models
self.log_evidence = logsumexp(self.hyper_log_evidence_list + np.log(self.transition_model_prior))
# store results for future plotting
if self.store_history:
self.posterior_mean_values.append(np.array([np.sum(self.marginalized_posterior*g) for g in self.grid]))
self.posterior_sequence.append(self.marginalized_posterior.copy())
self.hyper_parameter_sequence.append(deepcopy(self.hyper_parameter_distribution))
self.transition_model_sequence.append(deepcopy(self.transition_model_distribution))
self.local_transition_model_sequence.append(deepcopy(self.local_transition_model_distribution))
if self.first_step:
self.first_step = False
[docs]
def fit(self, *args, **kwargs):
raise NotImplementedError('OnlineStudy object has no "fit" method. Use "step" instead.')
[docs]
def get_parameter_distribution(self, t, name, plot=False, density=True, **kwargs):
"""
Compute the marginal parameter distribution at a given time step. Only available if Online Study is created
with flag 'store_history=True'.
Args:
t(int, float): Time step/stamp for which the parameter distribution is evaluated
name(str): Name of the parameter to display
plot(bool): If True, a plot of the distribution is created
density(bool): If true, probability density is plotted; if false, probability values.
**kwargs: All further keyword-arguments are passed to the plot (see matplotlib documentation)
Returns:
ndarray, ndarray: The first array contains the parameter values, the second one the corresponding
probability values
"""
if not self.store_history:
raise PostProcessingError('To get past parameter distributions, Online Study must be called with flag'
'"store_history=True". Use "get_current_parameter_distribution" instead.')
# plotting function of Study class can only handle arrays, not lists
self.formatted_timestamps = np.array(self.formatted_timestamps)
self.posterior_sequence = np.array(self.posterior_sequence)
try:
return Study.get_parameter_distribution(self, t, name, plot=plot, density=density, **kwargs)
finally:
# re-transform arrays to lists, so online study may continue to append values
self.formatted_timestamps = list(self.formatted_timestamps)
self.posterior_sequence = list(self.posterior_sequence)
[docs]
def get_current_parameter_distribution(self, name, plot=False, density=True, **kwargs):
"""
Compute the current marginal parameter distribution.
Args:
name(str): Name of the parameter to display
plot(bool): If True, a plot of the distribution is created
density(bool): If true, probability density is plotted; if false, probability values.
**kwargs: All further keyword-arguments are passed to the plot (see matplotlib documentation)
Returns:
ndarray, ndarray: The first array contains the parameter values, the second one the corresponding
probability values
"""
# get parameter index
param_index = -1
for i, n in enumerate(self.observation_model.parameter_names):
if n == name:
param_index = i
# check if match was found
if param_index == -1:
raise PostProcessingError('Wrong parameter name. Available options: {0}'
.format(self.observation_model.parameter_names))
axes_to_marginalize = list(range(len(self.observation_model.parameter_names)))
try:
axes_to_marginalize.remove(param_index)
except ValueError:
raise PostProcessingError('Wrong parameter index. Available indices: {}'.format(axes_to_marginalize))
x = self.marginal_grid[param_index]
dx = self.lattice_constant[param_index]
marginal_distribution = np.squeeze(
np.apply_over_axes(np.sum, self.marginalized_posterior, axes_to_marginalize)).copy()
if density:
marginal_distribution /= dx
if plot:
plt.fill_between(x, 0, marginal_distribution, **kwargs)
plt.xlabel(self.observation_model.parameter_names[param_index])
if density:
plt.ylabel('probability density')
else:
plt.ylabel('probability')
return x, marginal_distribution
[docs]
def get_parameter_distributions(self, name, plot=False, density=True, **kwargs):
"""
Computes the time series of marginal posterior distributions with respect to a given model parameter. Only
available if Online Study is created with flag 'store_history=True'.
Args:
name(str): Name of the parameter to display
plot(bool): If True, a plot of the series of distributions is created (density map)
density(bool): If true, probability density is plotted; if false, probability values.
**kwargs: All further keyword-arguments are passed to the plot (see matplotlib documentation)
Returns:
ndarray, ndarray: The first array contains the parameter values, the second one the sequence of
corresponding posterior distributions.
"""
if not self.store_history:
raise PostProcessingError('To get past parameter distributions, Online Study must be called with flag'
'"store_history=True". Use "get_current_parameter_distribution" instead.')
# plotting function of Study class can only handle arrays, not lists
self.formatted_timestamps = np.array(self.formatted_timestamps)
self.posterior_sequence = np.array(self.posterior_sequence)
try:
return Study.get_parameter_distributions(self, name, plot=plot, density=density, **kwargs)
finally:
# re-transform arrays to lists, so online study may continue to append values
self.formatted_timestamps = list(self.formatted_timestamps)
self.posterior_sequence = list(self.posterior_sequence)
[docs]
def plot_parameter_evolution(self, name, color='b', gamma=0.5, **kwargs):
"""
Plots a series of marginal posterior distributions corresponding to a single model parameter, together with the
posterior mean values. Only available if Online Study is created with flag 'store_history=True'.
Args:
name(str): Name of the parameter to display
color: color from which a light colormap is created
gamma(float): exponent for gamma correction of the displayed marginal distribution; default: 0.5
kwargs: all further keyword-arguments are passed to the plot of the posterior mean values
"""
if not self.store_history:
raise PostProcessingError('To plot past parameter distributions, Online Study must be called with flag'
'"store_history=True". Use "get_current_parameter_distribution" instead.')
# plotting function of Study class can only handle arrays, not lists
self.formatted_timestamps = np.array(self.formatted_timestamps)
self.posterior_mean_values = np.array(self.posterior_mean_values).T
self.posterior_sequence = np.array(self.posterior_sequence)
try:
Study.plot_parameter_evolution(self, name, color=color, gamma=gamma, **kwargs)
finally:
# re-transform arrays to lists, so online study may continue to append values
self.formatted_timestamps = list(self.formatted_timestamps)
self.posterior_mean_values = list(self.posterior_mean_values.T)
self.posterior_sequence = list(self.posterior_sequence)
[docs]
def get_current_transition_model_distribution(self, local=False):
"""
Returns the current probabilities for each transition model defined in the Online Study.
Args:
local(bool): If true, transition model distribution taking into account only the last data point is returned.
Returns:
ndarray, ndarray: Arrays of transition model names and normalized probabilities.
"""
if local:
return np.array(self.transition_model_names), self.local_transition_model_distribution
else:
return np.array(self.transition_model_names), self.transition_model_distribution
[docs]
def get_current_transition_model_probability(self, transition_model, local=False):
"""
Returns the current posterior probability for a specified transition model.
Args:
transition_model(str): Name of the transition model
local(bool): If true, transition model probability taking into account only the last data point is returned.
Returns:
float: Posterior probability value for the specified transition model
"""
transition_model_index = self.transition_model_names.index(transition_model)
if local:
return self.local_transition_model_distribution[transition_model_index]
else:
return self.transition_model_distribution[transition_model_index]
[docs]
def get_transition_model_distributions(self, local=False):
"""
The transition model distribution contains posterior probability values for all transition models included in
the online study. This distribution is available for all time steps analyzed. Only available if Online Study
is created with flag 'store_history=True'.
Args:
local(bool): If true, transition model distributions taking into account only the data point at the
corresponding time step is returned.
Returns:
ndarray, ndarray: Arrays containing the names and posterior probability values for all transition models
included in the online study for all time steps analyzed
"""
if not self.store_history:
raise PostProcessingError('To get past transition model distributions, Online Study must be called with '
'flag "store_history=True". Use "get_current_transition_model_distribution" instead.')
if local:
return np.array(self.transition_model_names), np.array(self.local_transition_model_sequence)
else:
return np.array(self.transition_model_names), np.array(self.transition_model_sequence)
[docs]
def get_transition_model_probabilities(self, transition_model, local=False):
"""
Returns posterior probability values for a specified transition model. This distribution is available for all
time steps analyzed. Only available if Online Study is created with flag 'store_history=True'.
Args:
transition_model(str): Name of the transition model
local(bool): If true, transition model probabilities taking into account only the data point at the
corresponding time step is returned.
Returns:
ndarray: Array containing the posterior probability values for the specified transition model for all time
steps analyzed
"""
if not self.store_history:
raise PostProcessingError('To get past transition model distributions, Online Study must be called with '
'flag "store_history=True". Use "get_current_transition_model_distribution" instead.')
transition_model_index = self.transition_model_names.index(transition_model)
if local:
return np.array(self.local_transition_model_sequence)[:, transition_model_index]
else:
return np.array(self.transition_model_sequence)[:, transition_model_index]
[docs]
def get_current_parameter_mean_value(self, name):
"""
Returns the posterior mean value for a given parameter of the observation model.
Args:
name(str): Name of the parameter
Returns:
float: posterior mean value
"""
# get parameter index
param_index = -1
for i, n in enumerate(self.observation_model.parameter_names):
if n == name:
param_index = i
# check if match was found
if param_index == -1:
raise PostProcessingError('Wrong parameter name. Available options: {0}'
.format(self.observation_model.parameter_names))
mean = np.sum(self.marginalized_posterior*self.grid[param_index])
return mean
[docs]
def get_parameter_mean_value(self, t, name):
"""
Returns the posterior mean value for a given parameter of the observation model at a specified time step. Only
available if Online Study is created with flag 'store_history=True'.
Args:
t(int): Time step at which to compute parameter mean value
name(str): Name of the parameter
Returns:
float: posterior mean value
"""
if not self.store_history:
raise PostProcessingError('To get past parameter mean values, Online Study must be called with '
'flag "store_history=True". Use "get_current_parameter_mean_value" instead.')
# get parameter index
param_index = -1
for i, n in enumerate(self.observation_model.parameter_names):
if n == name:
param_index = i
# check if match was found
if param_index == -1:
raise PostProcessingError('Wrong parameter name. Available options: {0}'
.format(self.observation_model.parameter_names))
# access parameter distribution
if t not in self.formatted_timestamps:
raise PostProcessingError('Supplied time ({}) does not exist in data or is out of range.'.format(t))
time_index = list(self.formatted_timestamps).index(t) # to select corresponding posterior distribution
parameter_distribution = self.posterior_sequence[time_index]
mean = np.sum(parameter_distribution * self.grid[param_index])
return mean
[docs]
def get_parameter_mean_values(self, name):
"""
Returns the posterior mean value for a given parameter of the observation model for all time steps. Only
available if Online Study is created with flag 'store_history=True'.
Args:
name(str): Name of the parameter
Returns:
ndarray: posterior mean values
"""
if not self.store_history:
raise PostProcessingError('To get past parameter mean values, Online Study must be called with '
'flag "store_history=True". Use "get_current_parameter_mean_value" instead.')
# get parameter index
param_index = -1
for i, n in enumerate(self.observation_model.parameter_names):
if n == name:
param_index = i
# check if match was found
if param_index == -1:
raise PostProcessingError('Wrong parameter name. Available options: {0}'
.format(self.observation_model.parameter_names))
mean = np.array(self.posterior_mean_values).T[param_index]
return mean
[docs]
def get_hyper_parameter_mean_value(self, t, name):
"""
Computes the mean value of the joint hyper-parameter distribution for a given hyper-parameter at a
given time step. Only available if Online Study is created with flag 'store_history=True'.
Args:
t(int): Time step at which to compute distribution
name(str): name of hyper-parameter
Returns:
ndarray: Array containing the mean values of all hyper-parameters of the given transition model
"""
if not self.store_history:
raise PostProcessingError('To get past hyper-parameter mean values, Online Study must be called with '
'flag "store_history=True". Use "getCurrentHyperParameterMeanValue" instead.')
# determine indices of transition model and hyper-parameter
hp_index = -1
for i, tm in enumerate(self.transition_models):
try:
hp_index = self._get_hyper_parameter_index(tm, name)
tm_index = i
except PostProcessingError:
pass
if hp_index == -1:
raise PostProcessingError('No hyper-parameter "{}" found. Check hyper-parameter names.'.format(name))
# access hyper-parameter distribution
if t not in self.formatted_timestamps:
raise PostProcessingError('Supplied time ({}) does not exist in data or is out of range.'.format(t))
time_index = list(self.formatted_timestamps).index(t) # to select corresponding posterior distribution
hyper_parameter_distribution = self.hyper_parameter_sequence[time_index]
hyper_parameter_distribution = hyper_parameter_distribution[tm_index][:, None]
hyper_parameter_values = self.hyper_parameter_values[tm_index]
hyper_grid_constants = self.hyper_grid_constants[tm_index]
# compute mean value
mean = np.sum(hyper_parameter_values*hyper_parameter_distribution*np.prod(hyper_grid_constants), axis=0)
return mean[hp_index]
[docs]
def get_hyper_parameter_mean_values(self, name):
"""
Computes the sequence of mean value of the joint hyper-parameter distribution for a given hyper-parameter for
all time steps. Only available if Online Study is created with flag 'store_history=True'.
Args:
name(str): name of hyper-parameter
Returns:
ndarray: Array containing the sequences of mean values of the given hyper-parameter
"""
if not self.store_history:
raise PostProcessingError('To get past hyper-parameter mean values, Online Study must be called with '
'flag "store_history=True". Use "getCurrentHyperParameterMeanValue" instead.')
# determine indices of transition model and hyper-parameter
hp_index = -1
for i, tm in enumerate(self.transition_models):
try:
hp_index = self._get_hyper_parameter_index(tm, name)
tm_index = i
except PostProcessingError:
pass
if hp_index == -1:
raise PostProcessingError('No hyper-parameter "{}" found. Check hyper-parameter names.'.format(name))
hyper_parameter_sequence = np.array([hp[tm_index].tolist()
for hp in self.hyper_parameter_sequence])[:, :, None]
hyper_parameter_values = self.hyper_parameter_values[tm_index]
hyper_grid_constants = self.hyper_grid_constants[tm_index]
# compute mean value
mean = np.sum(hyper_parameter_sequence * hyper_parameter_values * np.prod(hyper_grid_constants), axis=1).T
return mean[hp_index]
[docs]
def get_hyper_parameter_distribution(self, t, name, plot=False, **kwargs):
"""
Computes marginal hyper-parameter distribution of a single hyper-parameter at a specific time step in an
OnlineStudy fit. Only available if Online Study is created with flag 'store_history=True'.
Args:
t(int): Time step at which to compute distribution, or 'avg' for time-averaged distribution
name(str): hyper-parameter name
plot(bool): If True, a bar chart of the distribution is created
**kwargs: All further keyword-arguments are passed to the bar-plot (see matplotlib documentation)
Returns:
ndarray, ndarray: The first array contains the hyper-parameter values, the second one the
corresponding probability values
"""
if not self.store_history:
raise PostProcessingError('To get past hyper-parameter distributions, Online Study must be called with '
'flag "store_history=True". Use "get_current_hyper_parameter_distribution" instead.')
# determine indices of transition model and hyper-parameter
hp_index = -1
for i, tm in enumerate(self.transition_models):
try:
hp_index = self._get_hyper_parameter_index(tm, name)
tm_index = i
except PostProcessingError:
pass
if hp_index == -1:
raise PostProcessingError('No hyper-parameter "{}" found. Check hyper-parameter names.'.format(name))
# access hyper-parameter distribution
if t == 'avg':
# compute time-averaged distribution
hyper_parameter_distribution = np.sum(self.hyper_parameter_sequence, axis=0)/len(self.hyper_parameter_sequence)
else:
# try to access distribution of specified time step
if t not in self.formatted_timestamps:
raise PostProcessingError('Supplied time ({}) does not exist in data or is out of range.'.format(t))
time_index = list(self.formatted_timestamps).index(t) # to select corresponding posterior distribution
hyper_parameter_distribution = self.hyper_parameter_sequence[time_index]
hyper_parameter_distribution = hyper_parameter_distribution[tm_index]
axes_to_marginalize = list(range(len(self.hyper_parameter_names[tm_index])))
axes_to_marginalize.remove(hp_index)
# reshape hyper-parameter grid for easy marginalization
hyper_grid_steps = [len(x) for x in self.all_flat_hyper_parameter_values[tm_index]]
distribution = hyper_parameter_distribution.reshape(hyper_grid_steps, order='C')
marginal_distribution = np.squeeze(np.apply_over_axes(np.sum, distribution, axes_to_marginalize))
x = self.all_flat_hyper_parameter_values[tm_index][hp_index]
if plot:
# check if categorical
if np.any(np.abs(np.diff(np.diff(x))) > 10 ** -10):
plt.bar(np.arange(len(x)), marginal_distribution, align='center', width=1., **kwargs)
plt.xticks(np.arange(len(x)), x)
plt.ylabel('probability')
# regular spacing
else:
plt.bar(x, marginal_distribution, align='center',
width=self.hyper_grid_constants[tm_index][hp_index],
**kwargs)
plt.ylabel('probability')
plt.xlabel(self.hyper_parameter_names[tm_index][hp_index])
return x, marginal_distribution
[docs]
def get_current_hyper_parameter_distribution(self, name, plot=False, **kwargs):
"""
Computes marginal hyper-parameter distribution of a single hyper-parameter at the current time step in an
OnlineStudy fit.
Args:
name(str): hyper-parameter name
plot(bool): If True, a bar chart of the distribution is created
**kwargs: All further keyword-arguments are passed to the bar-plot (see matplotlib documentation)
Returns:
ndarray, ndarray: The first array contains the hyper-parameter values, the second one the
corresponding probability values
"""
# determine indices of transition model and hyper-parameter
hp_index = -1
for i, tm in enumerate(self.transition_models):
try:
hp_index = self._get_hyper_parameter_index(tm, name)
tm_index = i
except PostProcessingError:
pass
if hp_index == -1:
raise PostProcessingError('No hyper-parameter "{}" found. Check hyper-parameter names.'.format(name))
hyper_parameter_distribution = self.hyper_parameter_distribution[tm_index]
axes_to_marginalize = list(range(len(self.hyper_parameter_names[tm_index])))
axes_to_marginalize.remove(hp_index)
# reshape hyper-parameter grid for easy marginalization
hyper_grid_steps = [len(x) for x in self.all_flat_hyper_parameter_values[tm_index]]
distribution = hyper_parameter_distribution.reshape(hyper_grid_steps, order='C')
marginal_distribution = np.squeeze(np.apply_over_axes(np.sum, distribution, axes_to_marginalize))
marginal_distribution *= np.prod(self.hyper_grid_constants[tm_index])
x = self.all_flat_hyper_parameter_values[tm_index][hp_index]
if plot:
# check if categorical
if np.any(np.abs(np.diff(np.diff(x))) > 10 ** -10):
plt.bar(np.arange(len(x)), marginal_distribution, align='center', width=1., **kwargs)
plt.xticks(np.arange(len(x)), x)
plt.ylabel('probability')
# regular spacing
else:
plt.bar(x, marginal_distribution, align='center',
width=self.hyper_grid_constants[tm_index][hp_index],
**kwargs)
plt.ylabel('probability')
plt.xlabel(self.hyper_parameter_names[tm_index][hp_index])
return x, marginal_distribution
[docs]
def get_hyper_parameter_distributions(self, name):
"""
Computes marginal hyper-parameter distributions of a single hyper-parameter for all time steps in an
OnlineStudy fit. Only available if Online Study is created with flag 'store_history=True'.
Args:
name(str): hyper-parameter name
Returns:
ndarray, ndarray: The first array contains the hyper-parameter values, the second one the
corresponding probability values (first axis is time).
"""
if not self.store_history:
raise PostProcessingError('To get past hyper-parameter distributions, Online Study must be called with '
'flag "store_history=True". Use "get_current_hyper_parameter_distribution" instead.')
# determine indices of transition model and hyper-parameter
hp_index = -1
for i, tm in enumerate(self.transition_models):
try:
hp_index = self._get_hyper_parameter_index(tm, name)
tm_index = i
except PostProcessingError:
pass
if hp_index == -1:
raise PostProcessingError('No hyper-parameter "{}" found. Check hyper-parameter names.'.format(name))
hyper_parameter_sequence = np.array([x[tm_index] for x in self.hyper_parameter_sequence])
# marginalize the hyper-posterior probabilities
hpv = np.array(self.hyper_parameter_values[tm_index])
param_hpv = hpv[:, hp_index]
unique_values = np.sort(np.unique(param_hpv))
marginal_distribution = []
for value in unique_values:
marginal_distribution.append([])
indices = np.where(param_hpv == value)
for hp in hyper_parameter_sequence:
probabilities = hp[indices]
marginal_distribution[-1].append(np.sum(probabilities))
marginal_distribution = np.array(marginal_distribution).T
# renormalize marginal probability distribution
marginal_distribution /= np.sum(marginal_distribution, axis=1)[:, None]
return unique_values, marginal_distribution
[docs]
def plot_hyper_parameter_evolution(self, name, color='b', gamma=0.5, **kwargs):
"""
Plot method to display a series of marginal posterior distributions corresponding to a single model parameter.
This method includes the removal of plotting artefacts, gamma correction as well as an overlay of the posterior
mean values. Only available if Online Study is created with flag 'store_history=True'.
Args:
name(str): hyper-parameter name
color: color from which a light colormap is created
gamma(float): exponent for gamma correction of the displayed marginal distribution; default: 0.5
kwargs: all further keyword-arguments are passed to the plot of the posterior mean values
"""
if not self.store_history:
raise PostProcessingError('To get past hyper-parameter distributions, Online Study must be called with '
'flag "store_history=True". Use "get_current_hyper_parameter_distribution" instead.')
# determine indices of transition model and hyper-parameter
hp_index = -1
for i, tm in enumerate(self.transition_models):
try:
hp_index = self._get_hyper_parameter_index(tm, name)
tm_index = i
except PostProcessingError:
pass
if hp_index == -1:
raise PostProcessingError('No hyper-parameter "{}" found. Check hyper-parameter names.'.format(name))
# get sequence of hyper-parameter distributions
unique_values, marginal_distribution = self.get_hyper_parameter_distributions(name)
# compute hyper-posterior mean values
mean_values = self.get_hyper_parameter_mean_values(name)
# clean up very small probability values, as they may create image artefacts
pmax = np.amax(marginal_distribution)
marginal_distribution[marginal_distribution < pmax * (10 ** -20)] = 0
plt.imshow(marginal_distribution.T ** gamma,
origin='lower',
cmap=create_colormap(color),
extent=[self.formatted_timestamps[0], self.formatted_timestamps[-1]] +
[unique_values[0], unique_values[-1]],
aspect='auto')
# set default color of plot to black
if ('c' not in kwargs) and ('color' not in kwargs):
kwargs['c'] = 'k'
# set default linewidth to 1.5
if ('lw' not in kwargs) and ('linewidth' not in kwargs):
kwargs['lw'] = 1.5
plt.plot(self.formatted_timestamps, mean_values, **kwargs)
plt.ylim(unique_values[0], unique_values[-1])
plt.ylabel(self.hyper_parameter_names[tm_index][hp_index])
plt.xlabel('time step')
[docs]
def get_joint_hyper_parameter_distribution(self, names, plot=False, figure=None, subplot=111, **kwargs):
raise NotImplementedError('This method is not available in "OnlineStudy".')
[docs]
def plot(self, name, **kwargs):
"""
Convenience method to plot the temporal evolution of (hyper-)parameters, the distribution of a
(hyper-)parameter at a specific time step, or the temporal evolution of the probability of a transition model.
Args:
name(str): name of the (hyper-)parameter or transition model to display
color: color from which a light colormap is created (for (hyper-)parameter evolution only)
gamma(float): exponent for gamma correction of the displayed marginal distribution; default: 0.5 (for
(hyper-)parameter evolution only)
t: Time step/stamp for which the parameter distribution is evaluated
density(bool): If true, probability density is plotted; if false, probability values. Note: Only availble
for parameters, not hyper-parameters.
local(bool): If true, transition model probabilities taking into account only the data point at the
corresponding time step is returned
kwargs: all further keyword-arguments are passed to the axes object of the plot
"""
density = kwargs.pop('density', True)
# check if time-step/stamp is supplied
t = kwargs.pop('t', None)
# check if results were stored
if t is not None and not self.store_history:
raise PostProcessingError('Online study has only stored current parameter data ("store_history=False"), '
'no time step can be specified, only current (hyper-)parameter distributions will'
'be plotted.')
# check if name belongs to hyper-parameter
hyper = False
for tm in self.transition_models:
try:
self._get_hyper_parameter_index(tm, name)
hyper = True
except PostProcessingError:
pass
# check if name belongs to transition model
try:
transition_model_index = self.transition_model_names.index(name)
model = True
except ValueError:
model = False
if not hyper and not model:
# parameter evolution plot
if t is None and self.store_history:
# read additional kwargs (or set default values)
color = kwargs.pop('color', 'b')
gamma = kwargs.pop('gamma', 0.5)
self.plot_parameter_evolution(name, color=color, gamma=gamma, **kwargs)
# parameter distribution plot
elif t is not None and self.store_history:
self.get_parameter_distribution(t, name, plot=True, density=density, **kwargs)
else:
self.get_current_parameter_distribution(name, plot=True, density=density, **kwargs)
elif hyper and not model:
# hyper-parameter evolution plot
if t is None and self.store_history:
# read additional kwargs (or set default values)
color = kwargs.pop('color', 'b')
gamma = kwargs.pop('gamma', 0.5)
self.plot_hyper_parameter_evolution(name, color=color, gamma=gamma, **kwargs)
# hyper-parameter distribution plot
elif t is not None and self.store_history:
self.get_hyper_parameter_distribution(t, name, plot=True, **kwargs)
else:
self.get_current_hyper_parameter_distribution(name, plot=True, **kwargs)
elif model and not hyper:
# read additional kwargs (or set default values)
local = kwargs.pop('local', False)
# plot local transition model probabilities
if local:
plt.plot(self.formatted_timestamps,
np.array(self.local_transition_model_sequence)[:, transition_model_index],
**kwargs)
# plot transition model probabilities
else:
plt.plot(self.formatted_timestamps,
np.array(self.transition_model_sequence)[:, transition_model_index],
**kwargs)
else:
raise PostProcessingError('Duplicate names of hyper-/parameters/transition models, cannot use "plot" '
'method.')