Source code for bayesloop.core

#!/usr/bin/env python
"""
In bayesloop, each new data study is handled by an instance of a ``Study``-class. In this way, all data, the inference
results and the appropriate post-processing routines are stored in one object that can be accessed conveniently or
stored in a file. Apart from the basic ``Study`` class, there exist a number of specialized classes that extend the
basic fit method, for example to infer the full distribution of hyper-parameters or to apply model selection to on-line
data streams.
"""

from __future__ import division, print_function
import string
import numpy as np
import matplotlib.pyplot as plt
from scipy.optimize import minimize
from scipy.special import factorial, logsumexp
from scipy.special import beta as beta_func
import sympy.abc as abc
from sympy import symbols
from sympy import lambdify
from sympy.stats import density
import sympy.stats
from copy import copy, deepcopy
from collections import OrderedDict
from collections.abc import Iterable
from joblib import Parallel, delayed
from tqdm.auto import tqdm
from .helper import assign_nested_item, recursive_index, flatten, create_colormap, oint, cint, free_symbols
from .preprocessing import moving_window
from .observation_models import ObservationModel
from .transition_models import TransitionModel, CombinedTransitionModel, SerialTransitionModel
from .exceptions import ConfigurationError, PostProcessingError


[docs] class Study(object): """ Fits with fixed hyper-parameters and hyper-parameter optimization. This class implements a forward-backward-algorithm for analyzing time series data using hierarchical models. For efficient computation, all parameter distributions are discretized on a parameter grid. """ def __init__(self, silent=False): self.observation_model = None self.transition_model = None self.grid_size = [] self.boundaries = [] self.marginal_grid = [] self.grid = [] self.lattice_constant = [] self.raw_data = np.array([]) self.formatted_data = np.array([]) self.raw_timestamps = None self.formatted_timestamps = None self.posterior_sequence = [] self.posterior_mean_values = [] self.log_evidence = 0 self.local_evidence = [] self.selected_hyper_parameters = [] self.fit_warning_counter = 0 if not silent: print('+ Created new study.') @property def log10_evidence(self): return self.log_evidence/np.log(10)
[docs] def load_example_data(self, silent=False): """ Loads UK coal mining disaster data. Args: silent(bool): If set to True, no output is generated by this method. """ self.raw_data = np.array([5, 4, 1, 0, 4, 3, 4, 0, 6, 3, 3, 4, 0, 2, 6, 3, 3, 5, 4, 5, 3, 1, 4, 4, 1, 5, 5, 3, 4, 2, 5, 2, 2, 3, 4, 2, 1, 3, 2, 2, 1, 1, 1, 1, 3, 0, 0, 1, 0, 1, 1, 0, 0, 3, 1, 0, 3, 2, 2, 0, 1, 1, 1, 0, 1, 0, 1, 0, 0, 0, 2, 1, 0, 0, 0, 1, 1, 0, 2, 3, 3, 1, 1, 2, 1, 1, 1, 1, 2, 3, 3, 0, 0, 0, 1, 4, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0]) self.raw_timestamps = np.arange(1852, 1962) if not silent: print('+ Successfully imported example data.')
[docs] def load_data(self, array, timestamps=None, silent=False): """ Loads Numpy array as data. Args: array(ndarray): Numpy array containing time series data timestamps(ndarray): Array of timestamps (same length as data array) silent(bool): If set to True, no output is generated by this method. """ if isinstance(array, np.ndarray): self.raw_data = array elif isinstance(array, list): if not silent: print('! WARNING: Data supplied as list, not as Numpy array. Converting list to Numpy array ' '(dtype=float).') self.raw_data = np.array(array, dtype=float) else: raise ConfigurationError('Data type not supported. Please provide data as Numpy array.') if timestamps is not None: # load custom timestamps if len(timestamps) == len(array): self.raw_timestamps = np.array(timestamps) else: if not silent: print('! WARNING: Number of timestamps does not match number of data points. Omitting timestamps.') self.raw_timestamps = np.arange(len(self.raw_data)) else: # set default timestamps (integer range) self.raw_timestamps = np.arange(len(self.raw_data)) if not silent: print('+ Successfully imported array.')
[docs] def set_observation_model(self, L, silent=False): """ Sets observation model (likelihood function) for analysis and creates parameter grid for inference routine. Args: L: Observation model class (see observation_models.py) silent(bool): If set to True, no output is generated by this method. """ self.observation_model = L # prepare parameter grid self.marginal_grid = [] self.grid_size = [] self.boundaries = [] self.lattice_constant = [] for v, n in zip(self.observation_model.parameter_values, self.observation_model.parameter_names): if v is None: # if user has not specified parameter values, we try to estimate them try: v = self.observation_model.estimate_parameter_values(n, self.raw_data) print('+ Estimated parameter interval for "{}": [{}, {}] ({} values).' .format(n, v[0], v[-1], len(v))) except: raise ConfigurationError('Could not estimate parameter values for "{}".'.format(n)) v = np.array(v, dtype=float) # inference algorithm needs floats if len(v) == 0: raise ConfigurationError('Parameter values for "{}" must not be empty.'.format(n)) self.marginal_grid.append(v) self.grid_size.append(len(v)) self.boundaries.append([v[0], v[-1]]) # check if parameter values are equally spaced if len(v) == 1: self.lattice_constant.append(1.) elif np.any(np.abs(np.diff(np.diff(v))) > 10 ** -10): print('! WARNING: Supplied parameter values for "{}" are not equally spaced. Assuming categorical ' 'parameter.'.format(n)) self.lattice_constant.append(1.) else: # equally spaced (regular grid) self.lattice_constant.append(np.abs(v[0] - v[1])) # create grid self.grid = [m for m in np.meshgrid(*self.marginal_grid, indexing='ij')] self.observation_model.prepare_grid(self.grid) # if observation model is updated, transition model must know the new lattice constant if self.transition_model is not None: self.transition_model.lattice_constant = self.lattice_constant if not silent: print('+ Observation model: {}. Parameter(s): {}'.format(L, L.parameter_names))
def _compute_prior(self, silent=False): """ Computes discrete prior probabilities (densities) for the parameters of the observation model. The custom prior distribution may be passed as a Numpy array that has the same shape as the parameter grid, as a (lambda) function or as a (list of) SymPy random variable(s). Args: silent(bool): If set to True, no output is generated by this method. Returns: ndarray: Prior probability (density) values with the same size as the parameter grid """ prior = self.observation_model.prior # check if no prior is specified if prior is None: prior = np.ones(self.grid_size) # flat (improper) prior prior /= np.sum(prior) # re-normalize (now equal to uniform prior) prior /= np.prod(self.lattice_constant) # convert to prob. density if not silent: print(' + Set uniform prior with parameter boundaries.') return prior # check whether correctly shaped numpy array is provided if isinstance(prior, np.ndarray): if np.all(prior.shape == self.grid[0].shape): norm = np.sum(prior) if norm != 1.: prior /= norm prior /= np.prod(self.lattice_constant) if not silent: print(' + Set prior (numpy array). Values have been re-normalized.') else: if not silent: print(' + Set prior (numpy array).') return prior else: raise ConfigurationError('Prior array does not match parameter grid size.') # check whether function is provided if hasattr(prior, '__call__'): values = prior(*self.grid)*np.ones(self.grid_size) # last factor for cases like lambda x: 1. norm = np.sum(values) if norm != 1.: values /= norm values /= np.prod(self.lattice_constant) if not silent: print(' + Set prior (function): {}. Values have been re-normalized.'.format(prior.__name__)) else: if not silent: print(' + Set prior (function): {}'.format(prior.__name__)) return values # check whether single random variable is provided if type(prior) is sympy.stats.rv.RandomSymbol: prior = [prior] # check if list/tuple is provided if isinstance(prior, (list, tuple)) and not isinstance(prior, str): if len(prior) != len(self.observation_model.parameter_names): raise ConfigurationError('Observation model contains {} parameters, but {} priors were provided.' .format(len(self.observation_model.parameter_names), len(prior))) pdf = 1 if len(prior) > 1: x = symbols(' '.join(list(string.ascii_lowercase)[:len(prior)])) else: x = [abc.x] for i, rv in enumerate(prior): if type(rv) is not sympy.stats.rv.RandomSymbol: raise ConfigurationError('Only lambda functions or SymPy random variables can be used as a prior.') if len(free_symbols(rv)) > 0: raise ConfigurationError('Prior distribution must not contain free parameters.') # multiply total pdf with density for current parameter pdf = pdf*density(rv)(x[i]) # set density as lambda function if not silent: print(' + Set prior (sympy): {}'.format(pdf)) return lambdify(x, pdf, modules=['numpy', {'factorial': factorial, 'beta': beta_func}])(*self.grid)
[docs] def set_transition_model(self, T, silent=False): """ Set transition model which describes the parameter dynamics. Args: T: Transition model class (see transition_models.py) silent(bool): If true, no output is printed by this method """ # check if model is a break-point and raise error if so if str(T) == 'Break-point': raise ConfigurationError('The "BreakPoint" transition model can only be used with the ' '"SerialTransitionModel" class.') self.transition_model = T self.transition_model.study = self self.transition_model.lattice_constant = self.lattice_constant if not silent: print('+ Transition model: {}. Hyper-Parameter(s): {}' .format(T, self._unpack_all_hyper_parameters(values=False)))
[docs] def set(self, *args, **kwargs): """ Set observation model or transition model, or both. See :meth:`.Study.set_transition_model` and :meth:`.Study.set_observation_model`. Args: args: Sequence of Observation model instance and Transition model instance, or just one of those two types silent(bool): If true, no output is printed by this method """ # Check for unknown keyword-arguments for key in kwargs.keys(): if key not in ['silent']: raise TypeError("set() got an unexpected keyword argument '{}'".format(key)) # Keep the legacy calling convention: S.set(model, silent=True). silent = kwargs.pop('silent', False) om = False tm = False for model in args: if ObservationModel in model.__class__.__bases__: if om: raise ConfigurationError('More than one observation model supplied.') om = True self.set_observation_model(model, silent=silent) elif TransitionModel in model.__class__.__bases__: if tm: raise ConfigurationError('More than one transition model supplied.') tm = True self.set_transition_model(model, silent=silent) else: raise ConfigurationError('Expected observation model or transition model instance as first argument.')
def _estimate_likelihood_cache_size(self): """ Estimate the memory needed for storing one likelihood array per formatted data segment. Returns: float: estimated cache size in MiB. """ return len(self.formatted_data) * int(np.prod(self.grid_size)) * np.dtype(float).itemsize / 2. ** 20 def _should_cache_likelihoods(self, cache_likelihoods, max_cache_size): """ Decide whether a likelihood cache should be allocated. """ cache_size = self._estimate_likelihood_cache_size() if isinstance(cache_likelihoods, (bool, np.bool_)): return bool(cache_likelihoods), cache_size if isinstance(cache_likelihoods, str) and cache_likelihoods == 'auto': if not self.observation_model.should_cache_likelihood_sequence(): return False, cache_size if max_cache_size is None: return True, cache_size if max_cache_size < 0: raise ConfigurationError('max_cache_size must be non-negative or None.') return cache_size <= max_cache_size, cache_size raise ConfigurationError('cache_likelihoods must be True, False, or "auto".') def _compute_likelihood_sequence(self): """ Pre-compute observation likelihoods for all formatted data segments. Returns: ndarray: Likelihood values with shape [nTime] + grid_size. """ likelihood_sequence = np.empty([len(self.formatted_data)] + self.grid_size, dtype=float) for i, data_segment in enumerate(self.formatted_data): likelihood = self.observation_model.processed_pdf(self.grid, data_segment) # force dtype float on likelihood (in case it is of dtype object) if likelihood.dtype == object: likelihood = likelihood.astype(float) likelihood_sequence[i] = likelihood return likelihood_sequence def _fit_formatted_data(self, forward_only=False, evidence_only=False, silent=False, cache_likelihoods='auto', max_cache_size=512, likelihood_sequence=None): """ Run the forward-backward algorithm on already formatted data. """ lattice_constant = np.prod(self.lattice_constant) force_cache = isinstance(cache_likelihoods, (bool, np.bool_)) and bool(cache_likelihoods) if likelihood_sequence is None and (force_cache or not (forward_only or evidence_only)): use_cache, cache_size = self._should_cache_likelihoods(cache_likelihoods, max_cache_size) if use_cache: likelihood_sequence = self._compute_likelihood_sequence() if not silent: print(' + Cached observation likelihoods ({:.1f} MiB).'.format(cache_size)) # initialize array for posterior distributions if not evidence_only: self.posterior_sequence = np.empty([len(self.formatted_data)] + self.grid_size) else: self.posterior_sequence = [] # initialize array for computed evidence (marginal likelihood) self.log_evidence = 0 self.local_evidence = np.empty(len(self.formatted_data)) # set prior distribution for forward-pass alpha = self._compute_prior(silent=silent) # show progressbar if silent=False if not silent: enum = tqdm(np.arange(0, len(self.formatted_data)), total=len(self.formatted_data)) else: enum = np.arange(0, len(self.formatted_data)) # forward pass for i in enum: # compute likelihood if likelihood_sequence is None: likelihood = self.observation_model.processed_pdf(self.grid, self.formatted_data[i]) # force dtype float on likelihood (in case it is of dtype object) if likelihood.dtype == object: likelihood = likelihood.astype(float) else: likelihood = likelihood_sequence[i] # update alpha based on likelihood alpha *= likelihood # normalization constant of alpha is used to compute evidence norm = np.sum(alpha) # normalize alpha (for numerical stability) if norm > 0.: alpha /= norm else: # if all probability values are zero, normalization is not possible if self.fit_warning_counter < 5: print(' ! WARNING: Forward pass distribution contains only zeros, check parameter boundaries!') print(' Stopping inference process. Setting model evidence to zero.') elif self.fit_warning_counter == 5: print(' ! WARNING: Will omit further warnings about parameter boundaries.') self.fit_warning_counter += 1 self.log_evidence = -np.inf return # update log-evidence and compute local evidence self.log_evidence += np.log(norm) self.local_evidence[i] = norm * lattice_constant # integration yields evidence, not only sum # alphas are stored as preliminary posterior distributions if not evidence_only: self.posterior_sequence[i] = alpha # compute alpha for next iteration alpha = self.transition_model.compute_forward_prior(alpha, self.formatted_timestamps[i]) # remove progressbar correctly if not silent: enum.close() self.log_evidence += np.log(lattice_constant) # integration yields evidence, not only sum if not silent: print(' + Finished forward pass.') print(' + Log10-evidence: {:.5f}'.format(self.log_evidence / np.log(10))) if not (forward_only or evidence_only): # initialize beta and normalize by grid size (for numerical stability only) beta = np.ones(self.grid_size) beta /= np.sum(beta) # show progressbar if silent=False if not silent: enum = tqdm(np.arange(0, len(self.formatted_data))[::-1], total=len(self.formatted_data)) else: enum = np.arange(0, len(self.formatted_data))[::-1] # backward pass for i in enum: # posterior ~ alpha*beta self.posterior_sequence[i] *= beta # alpha*beta # normalize posterior wrt the parameters norm = np.sum(self.posterior_sequence[i]) if norm > 0.: self.posterior_sequence[i] /= norm else: # if all posterior probabilities are zero, normalization is not possible if self.fit_warning_counter < 5: print(' ! WARNING: Posterior distribution contains only zeros, check parameter boundaries!') print(' Stopping inference process. Setting model evidence to zero.') elif self.fit_warning_counter == 5: print(' ! WARNING: Will omit further warnings about parameter boundaries.') self.fit_warning_counter += 1 self.log_evidence = -np.inf return # re-compute likelihood if no cache is available if likelihood_sequence is None: likelihood = self.observation_model.processed_pdf(self.grid, self.formatted_data[i]) # force dtype float on likelihood (in case it is of dtype object) if likelihood.dtype == object: likelihood = likelihood.astype(float) else: likelihood = likelihood_sequence[i] # compute local evidence with np.errstate(invalid='ignore'): self.local_evidence[i] = 1. / (np.sum(self.posterior_sequence[i] / likelihood) * lattice_constant) # integration, not only sum # compute beta for next iteration beta = self.transition_model.compute_backward_prior(beta * likelihood, self.formatted_timestamps[i]) # normalize beta (for numerical stability) beta /= np.sum(beta) if not silent: enum.close() # remove progressbar correctly print(' + Finished backward pass.') # posterior mean values do not need to be computed for evidence if evidence_only: self.posterior_mean_values = [] else: self.posterior_mean_values = np.empty([len(self.grid), len(self.posterior_sequence)]) for i in range(len(self.grid)): self.posterior_mean_values[i] = np.array([np.sum(p * self.grid[i]) for p in self.posterior_sequence]) if not silent: print(' + Computed mean parameter values.')
[docs] def fit(self, forward_only=False, evidence_only=False, silent=False, cache_likelihoods='auto', max_cache_size=512): """ Computes the sequence of posterior distributions and evidence for each time step. Evidence is also computed for the complete data set. Args: forward_only(bool): If set to True, the fitting process is terminated after the forward pass. The resulting posterior distributions are so-called "filtering distributions" which - at each time step - only incorporate the information of past data points. This option thus emulates an online analysis. evidence_only(bool): If set to True, only forward pass is run and evidence is calculated. In contrast to the forward_only option, no posterior mean values are computed and no posterior distributions are stored. silent(bool): If set to True, no output is generated by the fitting method. cache_likelihoods(bool or str): If set to True, observation likelihoods are cached for all time steps. If set to "auto", caching is enabled only when the estimated cache size fits into max_cache_size. If False, likelihoods are computed on demand. max_cache_size(float): Maximum likelihood cache size in MiB used by cache_likelihoods="auto". Set to None for no limit. """ self._check_consistency() if not silent: print('+ Started new fit:') self.formatted_data = moving_window(self.raw_data, self.observation_model.segment_length) self.formatted_timestamps = self.raw_timestamps[self.observation_model.segment_length-1:] if not silent: print(' + Formatted data.') self._fit_formatted_data(forward_only=forward_only, evidence_only=evidence_only, silent=silent, cache_likelihoods=cache_likelihoods, max_cache_size=max_cache_size)
[docs] def optimize(self, parameter_list=[], forward_only=False, **kwargs): """ Uses the COBYLA minimization algorithm from SciPy to perform a maximization of the log-evidence with respect to all hyper-parameters (the parameters of the transition model) of a time seris model. The starting values are the values set by the user when defining the transition model. For the optimization, only the log-evidence is computed and no parameter distributions are stored. When a local maximum is found, the parameter distribution is computed based on the optimal values for the hyper-parameters. Args: parameter_list(list): List of hyper-parameter names to optimize. For nested transition models with multiple, identical hyper-parameter names, the sub-model index can be provided. By default, all hyper-parameters are optimized. forward_only(bool): If set to True, the fitting process is terminated after the forward pass. The resulting posterior distributions are so-called "filtering distributions" which - at each time step - only incorporate the information of past data points. **kwargs - All other keyword parameters are passed to the 'minimize' routine of scipy.optimize. """ # set list of parameters to optimize if isinstance(parameter_list, str): # in case only a single parameter name is provided as a string self.selected_hyper_parameters = [parameter_list] else: self.selected_hyper_parameters = parameter_list print('+ Starting optimization...') self._check_consistency() if self.selected_hyper_parameters: print(' --> Parameter(s) to optimize:', self.selected_hyper_parameters) else: print(' --> All model parameters are optimized (except change/break-points).') # load all hyper-parameter names (but remove break- and change-points) all_hyper_parameters = list(flatten(self._unpack_hyper_parameters(self.transition_model))) points = list(flatten(self._unpack_changepoint_names(self.transition_model))) + \ list(flatten(self._unpack_breakpoint_names(self.transition_model))) self.selected_hyper_parameters = [x for x in all_hyper_parameters if x not in points] # create parameter list to set start values for optimization x0 = self._unpack_selected_hyper_parameters() # check if valid parameter names were entered if len(x0) == 0: # reset list of parameters to optimize, so that unpacking and setting hyper-parameters works as expected self.selected_hyper_parameters = [] raise ConfigurationError('No parameters to optimize. Check parameter names.') # perform optimization (maximization of log-evidence) result = minimize(self._optimization_step, x0, method='COBYLA', **kwargs) print('+ Finished optimization.') # set optimal hyperparameters in transition model self._set_selected_hyper_parameters(result.x) # run analysis with optimal parameter values self.fit(forward_only=forward_only) # reset list of parameters to optimize, so that unpacking and setting hyper-parameters works as expected self.selected_hyper_parameters = []
def _optimization_step(self, x): """ Wrapper for the fit method to use it in conjunction with scipy.optimize.minimize. Args: x(list): unpacked list of current hyper-parameter values """ # set new hyperparameters in transition model self._set_selected_hyper_parameters(x) # compute log-evidence self.fit(evidence_only=True, silent=True) print(' + Log10-evidence: {:.5f}'.format(self.log_evidence / np.log(10)), '- Parameter values:', x) # return negative log-evidence (is minimized to maximize evidence) return -self.log_evidence
[docs] def simulate(self, x, t=None, density=False): """ Computes the probability (density) for a set of observations, based on the inferred parameter distributions of a given time step, or based on the time-averaged parameter distributions. It can be used to compute the expected distribution of the observed data, taking into account the uncertainty in the parameters (as well as hyper-parameters for Hyper-Studies). Args: x: array of observation values t: Time step/stamp for which the parameter distribution is evaluated density: If true, probability density is computed; if false, probability value is computed Returns: ndarray: probability (density) values corresponding to observation values """ if self.observation_model.segment_length > 1: raise NotImplementedError('Method "simulate" is only available for observation models with ' 'segment length 1.') # if no time is provided, use time-averaged posterior distribution if t is None: post = np.sum(self.posterior_sequence, axis=0) / len(self.posterior_sequence) else: # check if supplied time stamp exists if t not in self.formatted_timestamps: raise PostProcessingError('Supplied time ({}) does not exist in data or is out of range.'.format(t)) time_index = list(self.formatted_timestamps).index(t) # to select corresponding posterior distribution post = self.posterior_sequence[time_index] # compute distribution of observations/data at the given points x prob = np.array([np.sum(self.observation_model.pdf(self.grid, [xi]) * post) for xi in x]) if not density: prob /= np.sum(prob) return prob
def _unpack_hyper_parameters(self, transition_model, values=False): """ Returns list of all hyper-parameters (names or values), nested as the transition model. Args: transition_model: An instance of a transition model values: By default, parameter names are returned; if set to True, parameter values are returned Returns: list: hyper-parameters (names or values) """ param_list = [] # recursion step for sub-models if hasattr(transition_model, 'models'): for m in transition_model.models: param_list.append(self._unpack_hyper_parameters(m, values=values)) # extend hyper-parameter based on current (sub-)model if hasattr(transition_model, 'hyper_parameter_names'): if values: param_list.extend(transition_model.hyper_parameter_values) else: param_list.extend(transition_model.hyper_parameter_names) return param_list def _unpack_all_hyper_parameters(self, values=True): """ Returns a flattened list of all hyper-parameter values of the current transition model. Returns: list: all hyper-parameter values of the current transition model """ return list(flatten(self._unpack_hyper_parameters(self.transition_model, values=values))) def _unpack_selected_hyper_parameters(self): """ The parameters of a transition model can be split between several sub-models (using CombinedTransitionModel or SerialTransitionModel) and can be lists of values (multiple standard deviations in GaussianRandomWalk). This function unpacks the hyper-parameters, resulting in a single list of values that can be fed to the optimization step routine. Note that only the hyper-parameters that are noted (by name) in the attribute selected_hyper_parameters are regarded. Returns: list: currently selected hyper-parameter values if successful, 0 otherwise """ # if no hyper-parameters are selected, choose all if not self.selected_hyper_parameters: return self._unpack_all_hyper_parameters() # if self.selected_hyper_parameters is not empty name_tree = self._unpack_hyper_parameters(self.transition_model) value_tree = self._unpack_hyper_parameters(self.transition_model, values=True) output = [] # loop over selected hyper-parameters for name in self.selected_hyper_parameters: i_found = recursive_index(name_tree, name) # choose first hit if len(i_found) == 0: raise ConfigurationError('Could not find any hyper-parameter named {}.'.format(name)) value = value_tree[:] for i in i_found: value = value[i] output.append(value) # remove occurrence from name_tree (if name is listed twice, use second occurrence...) assign_nested_item(name_tree, i_found, ' ') # return selected values of hyper-parameters return output def _set_all_hyper_parameters(self, x): """ Sets all current hyper-parameters, based on a flattened list of parameter values. Args: x(list): list of values (e.g. from _unpack_selected_hyper_parameters) """ param_list = list(x[:]) # make copy of parameter list name_tree = self._unpack_hyper_parameters(self.transition_model) names_flat = list(flatten(self._unpack_hyper_parameters(self.transition_model))) for name in names_flat: index = recursive_index(name_tree, name) # get correct sub-model model = self.transition_model for i in index[:-1]: model = model.models[i] model.hyper_parameter_values[model.hyper_parameter_names.index(name)] = param_list[0] param_list.pop(0) # remove occurrence from name_tree (if name is listed twice, use second occurrence...) assign_nested_item(name_tree, index, ' ') def _set_selected_hyper_parameters(self, x): """ The parameters of a transition model can be split between several sub-models (using CombinedTransitionModel or SerialTransitionModel) and can be lists of values (multiple standard deviations in GaussianRandomWalk). This function takes a list of values and sets the corresponding variables in the transition model instance. Note that only the hyper-parameters that are noted (by name) in the attribute selected_hyper_parameters are regarded. Args: x(list): list of values (e.g. from _unpack_selected_hyper_parameters) Returns: int: 1, if successful, 0 otherwise """ # if no hyper-parameters are selected, choose all if not self.selected_hyper_parameters: self._set_all_hyper_parameters(x) return 1 param_list = list(x[:]) # make copy of parameter list name_tree = self._unpack_hyper_parameters(self.transition_model) # loop over selected hyper-parameters for name in self.selected_hyper_parameters: i_found = recursive_index(name_tree, name) # choose first hit if len(i_found) == 0: raise ConfigurationError('Could not find any hyper-parameter named {}.'.format(name)) # get correct sub-model model = self.transition_model for i in i_found[:-1]: model = model.models[i] model.hyper_parameter_values[model.hyper_parameter_names.index(name)] = param_list[0] param_list.pop(0) # remove occurrence from name_tree (if name is listed twice, use second occurrence...) assign_nested_item(name_tree, i_found, ' ') return 1 def _unpack_changepoint_names(self, transition_model): """ Returns list of all hyper-parameter names that are associated with change-points, nested like the transition model. Returns: list: all hyper-parameter names that are associated with change-points """ param_list = [] # recursion step for sub-models if hasattr(transition_model, 'models'): for m in transition_model.models: param_list.append(self._unpack_changepoint_names(m)) # extend hyper-parameter based on current (sub-)model if hasattr(transition_model, 'hyper_parameter_names'): if str(transition_model) == 'Change-point': param_list.extend(transition_model.hyper_parameter_names) return param_list def _unpack_breakpoint_names(self, transition_model): """ Returns list of all hyper-parameter names that are associated with break-points, nested like the transition model. Returns: list: all hyper-parameter names that are associated with break-points """ param_list = [] # recursion step for sub-models if hasattr(transition_model, 'models'): for m in transition_model.models: param_list.append(self._unpack_breakpoint_names(m)) # extend hyper-parameter based on current (sub-)model if hasattr(transition_model, 'hyper_parameter_names'): if str(transition_model) == 'Serial transition model': param_list.extend(transition_model.hyper_parameter_names) return param_list def _get_hyper_parameter_index(self, transition_model, name): """ Helper function that returns the index at which a hyper-parameter is found in the flattened list of hyper-parameter names. Args: transition_model: transition model instance in which to search name(str): Name of a hyper-parameter. If the name occurs multiple times, the index of the submodel can be supplied (starting at 1 for the first submodel). Returns: int: index of the hyper-parameter """ # no index provided: choose first occurrence and determine axis of hyper-parameter on grid of # hyper-parameter values hpn = list(flatten(self._unpack_hyper_parameters(transition_model, values=False))) if name in hpn: param_index = hpn.index(name) else: raise PostProcessingError('Could not find any hyper-parameter with name: {}.'.format(name)) return param_index
[docs] def get_hyper_parameter_value(self, name): """ Returns the currently set value of a hyper-parameter. Note: The returned value is NOT an inferred value, but simply the last value used by the fitting method. Args: name(str): Hyper-parameter name. Returns: float: current value of the specified hyper-parameter. """ flat_hyper_parameter_values = self._unpack_all_hyper_parameters(values=True) value = flat_hyper_parameter_values[self._get_hyper_parameter_index(self.transition_model, name)] return value
[docs] def get_parameter_mean_values(self, name): """ Returns posterior mean values for a parameter of the observation model. Args: name(str): Name of the parameter to display Returns: ndarray: array of posterior mean values for the selected parameter """ # get parameter index param_index = -1 for i, n in enumerate(self.observation_model.parameter_names): if n == name: param_index = i # check if match was found if param_index == -1: raise PostProcessingError('Wrong parameter name. Available options: {0}' .format(self.observation_model.parameter_names)) return self.posterior_mean_values[param_index]
[docs] def get_parameter_distribution(self, t, name, plot=False, density=True, **kwargs): """ Compute the marginal parameter distribution at a given time step. Args: t: Time step/stamp for which the parameter distribution is evaluated (or 'avg' for time-averaged parameter distribution) name(str): Name of the parameter to display plot(bool): If True, a plot of the distribution is created density(bool): If true, probability density is returned; if false, probability values **kwargs: All further keyword-arguments are passed to the plot (see matplotlib documentation) Returns: ndarray, ndarray: The first array contains the parameter values, the second one the corresponding probability (density) values """ if len(self.posterior_sequence) == 0: raise PostProcessingError('Cannot plot posterior sequence as it has not yet been computed. ' 'Run complete fit.') if t == 'avg': # compute time-averaged posterior distribution dist = np.sum(self.posterior_sequence, axis=0)/len(self.posterior_sequence) else: # check if supplied time stamp exists if t not in self.formatted_timestamps: raise PostProcessingError('Supplied time ({}) does not exist in data or is out of range.'.format(t)) time_index = list(self.formatted_timestamps).index(t) # to select corresponding posterior distribution # select posterior distribution of speciifed time step dist = self.posterior_sequence[time_index] # get parameter index param_index = -1 for i, n in enumerate(self.observation_model.parameter_names): if n == name: param_index = i # check if match was found if param_index == -1: raise PostProcessingError('Wrong parameter name. Available options: {0}' .format(self.observation_model.parameter_names)) axes_to_marginalize = list(range(len(self.observation_model.parameter_names))) try: axes_to_marginalize.remove(param_index) except ValueError: raise PostProcessingError('Wrong parameter index. Available indices: {}'.format(axes_to_marginalize)) x = self.marginal_grid[param_index] dx = self.lattice_constant[param_index] marginal_distribution = np.squeeze(np.apply_over_axes(np.sum, dist, axes_to_marginalize)).copy() if density: marginal_distribution /= dx if plot: plt.fill_between(x, 0, marginal_distribution, **kwargs) plt.xlabel(self.observation_model.parameter_names[param_index]) if density: plt.ylabel('probability density') else: plt.ylabel('probability') return x, marginal_distribution
[docs] def get_parameter_distributions(self, name, plot=False, density=True, **kwargs): """ Computes the time series of marginal posterior distributions with respect to a given model parameter. Args: name(str): Name of the parameter to display plot(bool): If True, a plot of the series of distributions is created (density map) density(bool): If true, probability density is returned; if false, probability values **kwargs: All further keyword-arguments are passed to the plot (see matplotlib documentation) Returns: ndarray, ndarray: The first array contains the parameter values, the second one the sequence of corresponding posterior distributions. """ if len(self.posterior_sequence) == 0: raise PostProcessingError('Cannot plot posterior sequence as it has not yet been computed. ' 'Run complete fit.') dt = self.formatted_timestamps[1:] - self.formatted_timestamps[:-1] if not np.all(dt == dt[0]): print('! WARNING: Time stamps are not equally spaced. This may result in false plotting of parameter ' 'distributions.') # get parameter index param_index = -1 for i, n in enumerate(self.observation_model.parameter_names): if n == name: param_index = i # check if match was found if param_index == -1: raise PostProcessingError('Wrong parameter name. Available options: {0}' .format(self.observation_model.parameter_names)) axes_to_marginalize = list(range(1, len(self.observation_model.parameter_names) + 1)) # axis 0 is time try: axes_to_marginalize.remove(param_index + 1) except ValueError: raise PostProcessingError('Wrong parameter index. Available indices: {}' .format(np.array(axes_to_marginalize) - 1)) x = self.marginal_grid[param_index] dx = self.lattice_constant[param_index] marginal_posterior_sequence = np.squeeze( np.apply_over_axes(np.sum, self.posterior_sequence, axes_to_marginalize)).copy() if density: marginal_posterior_sequence /= dx if plot: if 'c' in kwargs: cmap = create_colormap(kwargs['c']) elif 'color' in kwargs: cmap = create_colormap(kwargs['color']) else: cmap = create_colormap('b') plt.imshow(marginal_posterior_sequence.T, origin='lower', cmap=cmap, extent=[self.formatted_timestamps[0], self.formatted_timestamps[-1]] + self.boundaries[param_index], aspect='auto') return x, marginal_posterior_sequence
[docs] def plot_parameter_evolution(self, name, color='b', gamma=0.5, **kwargs): """ Extended plot method to display a series of marginal posterior distributions corresponding to a single model parameter. In contrast to getMarginalParameterDistributions(), this method includes the removal of plotting artefacts, gamma correction as well as an overlay of the posterior mean values. Args: name(str): name of the parameter to display color: color from which a light colormap is created gamma(float): exponent for gamma correction of the displayed marginal distribution; default: 0.5 kwargs: all further keyword-arguments are passed to the plot of the posterior mean values """ if len(self.posterior_sequence) == 0: raise PostProcessingError('Cannot plot posterior sequence as it has not yet been computed. ' 'Run complete fit.') dt = self.formatted_timestamps[1:] - self.formatted_timestamps[:-1] if not np.all(dt == dt[0]): print('! WARNING: Time stamps are not equally spaced. This may result in false plotting of parameter ' 'distributions.') # get parameter index param_index = -1 for i, n in enumerate(self.observation_model.parameter_names): if n == name: param_index = i # check if match was found if param_index == -1: raise PostProcessingError('Wrong parameter name. Available options: {0}' .format(self.observation_model.parameter_names)) axes_to_marginalize = list(range(1, len(self.observation_model.parameter_names) + 1)) # axis 0 is time try: axes_to_marginalize.remove(param_index + 1) except ValueError: raise PostProcessingError('Wrong parameter index to plot. Available indices: {}' .format(np.array(axes_to_marginalize)-1)) marginal_posterior_sequence = np.squeeze(np.apply_over_axes(np.sum, self.posterior_sequence, axes_to_marginalize)) # clean up very small probability values, as they may create image artefacts pmax = np.amax(marginal_posterior_sequence) marginal_posterior_sequence[marginal_posterior_sequence < pmax*(10**-20)] = 0 plt.imshow(marginal_posterior_sequence.T**gamma, origin='lower', cmap=create_colormap(color), extent=[self.formatted_timestamps[0], self.formatted_timestamps[-1]] + self.boundaries[param_index], aspect='auto') # set default color of plot to black if ('c' not in kwargs) and ('color' not in kwargs): kwargs['c'] = 'k' # set default linewidth to 1.5 if ('lw' not in kwargs) and ('linewidth' not in kwargs): kwargs['lw'] = 1.5 plt.plot(self.formatted_timestamps, self.posterior_mean_values[param_index], **kwargs) plt.ylim(self.boundaries[param_index]) plt.ylabel(self.observation_model.parameter_names[param_index]) plt.xlabel('time step')
[docs] def plot(self, name, **kwargs): """ Convenience method to plot the temporal evolution of observation model parameters, or the parameter distribution at a specific time step. Extended functionality for other study classes. Args: name(str): name of the parameter to display color: color from which a light colormap is created (for parameter evolution only) gamma(float): exponent for gamma correction of the displayed marginal distribution; default: 0.5 (for parameter evolution only) t: Time step/stamp for which the parameter distribution is evaluated density(bool): If true, probability density is plotted; if false, probability values kwargs: all further keyword-arguments are passed to the axes object of the plot """ density = kwargs.pop('density', True) # plot parameter distribution at specific time step if 't' in kwargs.keys(): t = kwargs.pop('t') self.get_parameter_distribution(t, name, plot=True, density=density, **kwargs) # plot parameter evolution else: # read additional kwargs (or set default values) color = kwargs.pop('color', 'b') gamma = kwargs.pop('gamma', 0.5) self.plot_parameter_evolution(name, color=color, gamma=gamma, **kwargs)
def _check_consistency(self): """ This method is called at the very beginning of analysis methods to ensure that all necessary elements of the model are set correctly. If problem with user input is detected, an exception will be raised. """ if len(self.raw_data) == 0: raise ConfigurationError('No data loaded.') if not self.observation_model: raise ConfigurationError('No observation model chosen.') if not self.transition_model: raise ConfigurationError('No transition model chosen.') # check for duplicate hyper-parameter names flat_names = self._unpack_all_hyper_parameters(values=False) u, i = np.unique(flat_names, return_inverse=True) duplicates = u[np.bincount(i) > 1] if len(duplicates) > 0: raise ConfigurationError('Detected duplicate hyper-parameter names: {}.'.format(duplicates))
[docs] class HyperStudy(Study): """ Infers hyper-parameter distributions. This class serves as an extension to the basic Study class and allows to compute the distribution of hyper-parameters of a given transition model. For further information, see the documentation of the fit-method of this class. """ def __init__(self, silent=False): super(HyperStudy, self).__init__(silent=silent) self.hyper_grid = [] self.hyper_grid_values = [] self.hyper_grid_constant = [] self.flat_hyper_parameters = [] self.flat_hyper_parameter_names = [] self.flat_hyper_priors = [] self.flat_hyper_prior_values = [] self.hyper_parameter_distribution = None self.average_posterior_sequence = None self.log_evidence_list = [] self.local_evidence_list = [] if not silent: print(' --> Hyper-study') def _create_hyper_grid(self, silent=False): """ Creates an array of hyper-parameter values that are fitted. Also determines grid constants for proper normalisation and computes prior probability (density) values Args: silent(bool): If true, no output is produced by this method """ # extract flat list of hyper-parameter names and values self.flat_hyper_parameters = self._unpack_all_hyper_parameters() self.flat_hyper_parameter_names = self._unpack_all_hyper_parameters(values=False) self.flat_hyper_priors = self._unpack_all_hyper_priors() # look if change/break-points have value 'all' and assign array of all time-stamps for i, v in enumerate(self.flat_hyper_parameters): if isinstance(v, str) and v == 'all': self.flat_hyper_parameters[i] = self.formatted_timestamps[:-1] # create hyper-parameter grid temp = np.meshgrid(*self.flat_hyper_parameters, indexing='ij') if len(self.flat_hyper_parameter_names) > 0: self.hyper_grid_values = np.array([t.ravel() for t in temp]).T else: self.hyper_grid_values = np.array([]) # find lattice constants for equally spaced hyper-parameter values self.hyper_grid_constant = [] for values in self.flat_hyper_parameters: if isinstance(values, Iterable) and len(values) > 1: a = np.array(values) d = a[1:] - a[:-1] dd = d[1:] - d[:-1] if np.all(np.abs(dd) < 10 ** -10): # for equally spaced values, set difference as grid-constant self.hyper_grid_constant.append(np.abs(d[0])) else: # for irregularly spaced values (e.g. categorical), set grid-constant to 1 self.hyper_grid_constant.append(1) else: # for single value, set grid-constant to 1 self.hyper_grid_constant.append(1) self.hyper_grid_constant = np.array(self.hyper_grid_constant) # evaluate hyper-prior values prior_values_list = [] prior_names_list = [] for prior, values, grid_const, name in zip(self.flat_hyper_priors, self.flat_hyper_parameters, self.hyper_grid_constant, self.flat_hyper_parameter_names): if prior is None: # no prior specified prior_values = np.ones_like(values, dtype=float) prior_values /= np.sum(prior_values) prior_values /= grid_const prior_names_list.append('uniform') elif hasattr(prior, '__call__'): # prior specified by function try: prior_values = np.array([prior(value) for value in values]) norm = np.sum(prior_values) prior_values /= norm prior_values /= grid_const except: raise ConfigurationError('Failed to set hyper-prior for "{}" from function "{}".' .format(name, prior.__name__)) if norm != 1.: prior_names_list.append(prior.__name__ + ' (re-normalized)') else: prior_names_list.append(prior.__name__) elif isinstance(prior, Iterable): # prior specified by list/array if len(prior) != len(values): raise ConfigurationError('Failed to set hyper-prior for "{}" from list/array.'.format(name)) prior_values = np.array(prior, dtype=float) norm = np.sum(prior_values) prior_values /= norm prior_values /= grid_const if norm != 1.: prior_names_list.append('list/array (re-normalized)') else: prior_names_list.append('list/array') else: # SymPy RV if len(free_symbols(prior)) > 0: raise ConfigurationError('Hyper-prior for "{}" must not contain free parameters.'.format(name)) # get symbolic representation of probability density x = abc.x sym_density = density(prior)(x) # get density as lambda function pdf = lambdify([x], sym_density, modules=['numpy', {'factorial': factorial}]) # evaluate density prior_values = pdf(values) prior_names_list.append(str(sym_density)) prior_values_list.append(prior_values) # create hyper-prior grid if len(self.flat_hyper_parameter_names) > 0: temp = np.meshgrid(*prior_values_list, indexing='ij') self.flat_hyper_prior_values = np.array([t.ravel() for t in temp]).T self.flat_hyper_prior_values = np.prod(self.flat_hyper_prior_values, axis=1) # joint prob. density if not silent and len(self.hyper_grid_values) > 1: print('+ Set hyper-prior(s): {}'.format(prior_names_list)) else: # we need a dummy value for transition models without hyper-parameters self.flat_hyper_prior_values = np.array([1])
[docs] def fit(self, forward_only=False, evidence_only=False, silent=False, n_jobs=1, custom_hyper_grid=False, cache_likelihoods='auto', max_cache_size=512): """ This method over-rides the according method of the Study-class. It runs the algorithm for equally spaced hyper- parameter values as defined by the variable 'hyper_grid'. The posterior sequence represents the average model of all analyses. Posterior mean values are computed from this average model. Args: forward_only(bool): If set to True, the fitting process is terminated after the forward pass. The resulting posterior distributions are so-called "filtering distributions" which - at each time step - only incorporate the information of past data points. This option thus emulates an online analysis. evidence_only(bool): If set to True, only forward pass is run and evidence is calculated. In contrast to the forward_only option, no posterior mean values are computed and no posterior distributions are stored. silent(bool): If set to true, reduced output is created by this method. n_jobs(int): Number of processes to employ. Multiprocessing is based on the 'joblib' module. custom_hyper_grid(bool): If set to true, the method "_create_hyper_grid" is not called before starting the fit. This is used by the class "ChangepointStudy", which employs a custom version of "_create_hyper_grid". cache_likelihoods(bool or str): If set to True, observation likelihoods are cached for all time steps and reused across hyper-parameter values. If set to "auto", caching is enabled only when the estimated cache size fits into max_cache_size. If False, likelihoods are computed on demand. max_cache_size(float): Maximum likelihood cache size in MiB used by cache_likelihoods="auto". Set to None for no limit. """ self.fit_warning_counter = 0 # format data/timestamps once, so number of data segments is known and _createGrid() works properly self.formatted_data = moving_window(self.raw_data, self.observation_model.segment_length) self.formatted_timestamps = self.raw_timestamps[self.observation_model.segment_length - 1:] # create hyper-parameter grid if not custom_hyper_grid: self._create_hyper_grid(silent=silent) # additional consistency check if multiple change/break-points share identical values. # in this case, a ChangepointStudy should be used! names = list(flatten(self._unpack_changepoint_names(self.transition_model))) + \ list(flatten(self._unpack_breakpoint_names(self.transition_model))) if len(names) > 1: indices = [self.flat_hyper_parameter_names.index(name) for name in names] values = self.hyper_grid_values[:, indices] for v in values: if np.unique(v).size < v.size: raise ConfigurationError('Detected multiple change-/break-points with identical values and/or ' 'overlapping value intervals. Use "ChangepointStudy" instead of ' '"HyperStudy" for such cases.') self._check_consistency() if evidence_only: self.posterior_sequence = [] self.posterior_mean_values = [] self.average_posterior_sequence = None if not evidence_only: # The average posterior distribution is stored in log-space for numerical stability. This way, it can be # updated iteratively without storing all individual posterior distributions (after the iteration over all # hyper-parameter values is done, it is transformed back to linear space) self.average_posterior_sequence = np.zeros([len(self.formatted_data)]+self.grid_size) - np.inf self.log_evidence_list = [] self.local_evidence_list = [] # hyper-study fit is only necessary for more than one combination of hyper-parameter values if len(self.hyper_grid_values) > 1: if not silent: print('+ Started new fit.') print(' + {} analyses to run.'.format(len(self.hyper_grid_values))) if n_jobs > 1: # prepare parallel execution if not silent: print(' + Creating {} processes.'.format(n_jobs)) # use _parallel_fit method to create copies of this HyperStudy instance with only partial # hyper-grid values sub_studies = Parallel(n_jobs=n_jobs, prefer='processes')( delayed(self._parallel_fit)( idx, n_jobs, forward_only, evidence_only, silent, cache_likelihoods, max_cache_size, ) for idx in range(n_jobs) ) # merge all sub-studies for S in sub_studies: self.log_evidence_list += S.log_evidence_list self.local_evidence_list += S.local_evidence_list if not evidence_only: self.average_posterior_sequence = np.logaddexp(self.average_posterior_sequence, S.average_posterior_sequence) # single process fit else: likelihood_sequence = None use_cache, cache_size = self._should_cache_likelihoods(cache_likelihoods, max_cache_size) if use_cache: likelihood_sequence = self._compute_likelihood_sequence() if not silent: print(' + Cached observation likelihoods ({:.1f} MiB).'.format(cache_size)) # show progressbar if silent=False if not silent: enum = tqdm(enumerate(self.hyper_grid_values), total=len(self.hyper_grid_values)) else: enum = enumerate(self.hyper_grid_values) for i, hyper_param_values in enum: self._set_selected_hyper_parameters(hyper_param_values) # call fit method from parent class self._fit_formatted_data(forward_only=forward_only, evidence_only=evidence_only, silent=True, cache_likelihoods=cache_likelihoods, max_cache_size=max_cache_size, likelihood_sequence=likelihood_sequence) self.log_evidence_list.append(self.log_evidence) self.local_evidence_list.append(self.local_evidence) if (not evidence_only) and np.isfinite(self.log_evidence): # For numerical stability, zeros in posterior distribution are replaced with small constant. # Note that this is typically only the case for hyper-parameter values with low likelihood. The # procedure therefore has no (measurable) effect on the average posterior sequence. self.posterior_sequence[self.posterior_sequence < 10.**-300] = 10.**-300 self.average_posterior_sequence = np.logaddexp(self.average_posterior_sequence, np.log(self.posterior_sequence) + self.log_evidence + np.log(self.flat_hyper_prior_values[i])) # remove progressbar correctly if not silent: enum.close() if not evidence_only: # transform average posterior distribution into linear space # (we prefer underflows rather than overflows) self.average_posterior_sequence -= np.amax(self.average_posterior_sequence) self.average_posterior_sequence = np.exp(self.average_posterior_sequence) # compute average posterior distribution normalization = np.array([np.sum(posterior) for posterior in self.average_posterior_sequence]) for i in range(len(self.grid)): normalization = normalization[:, None] # add axis; needs to match average_posterior_sequence self.average_posterior_sequence /= normalization # set self.posterior_sequence to average posterior sequence for plotting reasons self.posterior_sequence = self.average_posterior_sequence if not silent: print(' + Computed average posterior sequence') # compute hyper-parameter distribution log_hyper_parameter_distribution = self.log_evidence_list + np.log(self.flat_hyper_prior_values) + \ np.sum(np.log(self.hyper_grid_constant)) # transform hyper-parameter distribution into linear space # (we prefer underflows rather than overflows) scaled_log_hyper_parameter_distribution = log_hyper_parameter_distribution - np.amax(log_hyper_parameter_distribution) self.hyper_parameter_distribution = np.exp(scaled_log_hyper_parameter_distribution) self.hyper_parameter_distribution /= np.sum(self.hyper_parameter_distribution) self.hyper_parameter_distribution /= np.prod(self.hyper_grid_constant) # probability density if not silent: print(' + Computed hyper-parameter distribution') # compute log-evidence of average model self.log_evidence = logsumexp(log_hyper_parameter_distribution) if not silent: print(' + Log10-evidence of average model: {:.5f}'.format(self.log_evidence / np.log(10))) # compute local evidence of average model self.local_evidence = np.sum((np.array(self.local_evidence_list).T*self.flat_hyper_prior_values).T, axis=0) if not silent: print(' + Computed local evidence of average model') # compute posterior mean values if not evidence_only: self.posterior_mean_values = np.empty([len(self.grid), len(self.posterior_sequence)]) for i in range(len(self.grid)): self.posterior_mean_values[i] = np.array([np.sum(p*self.grid[i]) for p in self.posterior_sequence]) if not silent: print(' + Computed mean parameter values.') # clear local_evidence_list (to keep file size small for stored studies) self.local_evidence_list = [] # restore hyper-parameter values (individual values have been set during fitting) self._set_all_hyper_parameters(self.flat_hyper_parameters) if not silent: print('+ Finished fit.') # only one combination of hyper-parameter values else: if not silent: if len(self.hyper_grid_values) == 1: print('+ Only one combination of hyper-parameter values, switching to standard fit method.') if len(self.hyper_grid_values) == 0: print('+ Transition model contains no hyper-parameters, switching to standard fit method.') Study.fit(self, forward_only=forward_only, evidence_only=evidence_only, silent=silent, cache_likelihoods=cache_likelihoods, max_cache_size=max_cache_size)
def _parallel_fit(self, idx, n_jobs, forward_only, evidence_only, silent, cache_likelihoods, max_cache_size): """ This method is called by the fit method of the HyperStudy class. It creates a copy of the current class instance and performs a fit based on a subset of the specified hyper-parameter grid. The method thus allows to distribute a HyperStudy fit among multiple processes for multiprocessing. Args: idx(int): Index from 0 to (n_jobs-1), indicating which part of the hyper-grid values are to be analyzed. n_jobs(int): Number of processes to employ. Multiprocessing is based on the 'joblib' module. forward_only(bool): If set to True, the fitting process is terminated after the forward pass. The resulting posterior distributions are so-called "filtering distributions" which - at each time step - only incorporate the information of past data points. This option thus emulates an online analysis. evidence_only(bool): If set to True, only forward pass is run and evidence is calculated. In contrast to the forward_only option, no posterior mean values are computed and no posterior distributions are stored. silent(bool): If set to True, no output is generated by the fitting method. cache_likelihoods(bool or str): Whether to cache observation likelihoods. max_cache_size(float): Maximum likelihood cache size in MiB used by cache_likelihoods="auto". Returns: HyperStudy instance """ S = copy(self) S.hyper_grid_values = np.array_split(S.hyper_grid_values, n_jobs)[idx] S.flat_hyper_prior_values = np.array_split(S.flat_hyper_prior_values, n_jobs)[idx] likelihood_sequence = None if len(S.hyper_grid_values) > 1: use_cache, _ = S._should_cache_likelihoods(cache_likelihoods, max_cache_size) if use_cache: likelihood_sequence = S._compute_likelihood_sequence() # show progressbar for last process if silent=False if not silent and idx == n_jobs-1: enum = tqdm(enumerate(S.hyper_grid_values), total=len(S.hyper_grid_values)) else: enum = enumerate(S.hyper_grid_values) for i, hyper_param_values in enum: S._set_selected_hyper_parameters(hyper_param_values) # call fit method from parent class S._fit_formatted_data(forward_only=forward_only, evidence_only=evidence_only, silent=True, cache_likelihoods=cache_likelihoods, max_cache_size=max_cache_size, likelihood_sequence=likelihood_sequence) S.log_evidence_list.append(S.log_evidence) S.local_evidence_list.append(S.local_evidence) if (not evidence_only) and np.isfinite(S.log_evidence): # For numerical stability, zeros in posterior distribution are replaced with small constant. # Note that this is typically only the case for hyper-parameter values with low likelihood. The # procedure therefore has no (measurable) effect on the average posterior sequence. S.posterior_sequence[S.posterior_sequence < 10. ** -300] = 10. ** -300 S.average_posterior_sequence = np.logaddexp(S.average_posterior_sequence, np.log(S.posterior_sequence) + S.log_evidence + np.log(S.flat_hyper_prior_values[i])) # remove progressbar correctly if not silent and idx == n_jobs-1: enum.close() return S # optimization methods are inherited from Study class, but cannot be used in this case
[docs] def optimize(self, *args, **kwargs): raise NotImplementedError('HyperStudy object has no optimizing method.')
def _unpack_hyper_priors(self, transition_model): """ Returns flat list of all hyper-priors in transition model order. Args: transition_model: An instance of a transition model Returns: list: hyper-priors """ prior_list = [] # recursion step for sub-models if hasattr(transition_model, 'models'): for m in transition_model.models: prior_list.extend(self._unpack_hyper_priors(m)) # append prior if hasattr(transition_model, 'prior'): hyper_parameter_names = getattr(transition_model, 'hyper_parameter_names', []) n_parameters = len(hyper_parameter_names) # only take priors from transition models that define hyper-parameters. This includes the structural # break parameters stored directly in SerialTransitionModel. if n_parameters == 1: prior = transition_model.prior if isinstance(prior, (list, tuple)) and len(prior) == 1: first_prior = prior[0] if (first_prior is None or hasattr(first_prior, '__call__') or type(first_prior) is sympy.stats.rv.RandomSymbol or isinstance(first_prior, (list, tuple, np.ndarray))): prior = first_prior prior_list.append(prior) elif n_parameters > 1: prior = transition_model.prior if prior is None: prior_list.extend([None] * n_parameters) elif isinstance(prior, Iterable) and not isinstance(prior, (str, np.ndarray)): if len(prior) != n_parameters: raise ConfigurationError( '{} priors are defined for transition model "{}", but model contains {} ' 'hyper-parameters.'.format(len(prior), transition_model, n_parameters)) prior_list.extend(list(prior)) else: raise ConfigurationError( 'Transition model "{}" contains {} hyper-parameters, but its hyper-prior is not a ' 'matching list or tuple.'.format(transition_model, n_parameters)) elif str(transition_model) == 'Break-point': prior_list.append(transition_model.prior) return prior_list def _unpack_all_hyper_priors(self): """ Returns a flattened list of all hyper-priors of the current transition model. Returns: list: all hyper-priors of the current transition model """ return self._unpack_hyper_priors(self.transition_model)
[docs] def get_hyper_parameter_distribution(self, name, plot=False, **kwargs): """ Computes marginal hyper-parameter distribution of a single hyper-parameter in a HyperStudy fit. Args: name(str): Name of the hyper-parameter to display (first model hyper-parameter) plot(bool): If True, a bar chart of the distribution is created **kwargs: All further keyword-arguments are passed to the bar-plot (see matplotlib documentation) Returns: ndarray, ndarray: The first array contains the hyper-parameter values, the second one the corresponding probability values """ # check if only a standard fit has been carried out if len(self.hyper_grid_values) < 2: raise PostProcessingError('At least two combinations of hyper-parameter values need to be fitted to ' 'evaluate a hyper-parameter distribution. Check transition model.') param_index = self._get_hyper_parameter_index(self.transition_model, name) axes_to_marginalize = list(range(len(self.flat_hyper_parameter_names))) axes_to_marginalize.remove(param_index) # reshape hyper-parameter distribution for easy marginalizing hyper_grid_steps = [] for x in self.flat_hyper_parameters: if isinstance(x, Iterable): hyper_grid_steps.append(len(x)) else: hyper_grid_steps.append(1) distribution = self.hyper_parameter_distribution.reshape(hyper_grid_steps, order='C') marginal_distribution = np.squeeze(np.apply_over_axes(np.sum, distribution, axes_to_marginalize)) marginal_distribution *= np.prod(self.hyper_grid_constant) # convert to probability (from density) x = self.flat_hyper_parameters[param_index] if plot: # check if categorical if np.any(np.abs(np.diff(np.diff(x))) > 10 ** -10): plt.bar(np.arange(len(x)), marginal_distribution, align='center', width=1., **kwargs) plt.xticks(np.arange(len(x)), x) plt.ylabel('probability') # regular spacing else: plt.bar(x, marginal_distribution, align='center', width=self.hyper_grid_constant[param_index], **kwargs) plt.ylabel('probability') plt.xlabel(self.flat_hyper_parameter_names[param_index]) return x, marginal_distribution
[docs] def get_joint_hyper_parameter_distribution(self, names, plot=False, figure=None, subplot=111, **kwargs): """ Computes the joint distribution of two hyper-parameters of a HyperStudy and optionally creates a 3D bar chart. Note that the 3D plot can only be included in an existing plot by passing a figure object and subplot specification. Args: names(list): List of two hyper-parameter names to display plot(bool): If True, a 3D-bar chart of the distribution is created figure: In case the plot is supposed to be part of an existing figure, it can be passed to the method. By default, a new figure is created. subplot: Characterization of subplot alignment, as in matplotlib. Default: 111 **kwargs: all further keyword-arguments are passed to the bar3d-plot (see matplotlib documentation) Returns: ndarray, ndarray, ndarray: The first and second array contains the hyper-parameter values, the third one the corresponding probability values """ # check if only a standard fit has been carried out if len(self.hyper_grid_values) < 2: raise PostProcessingError('At least two combinations of hyper-parameter values need to be fitted to ' 'evaluate a hyper-parameter distribution. Check transition model.') # check if list with two elements is provided if not isinstance(names, Iterable): raise PostProcessingError('A list of exactly two hyper-parameters has to be provided.') elif not len(names) == 2: raise PostProcessingError('A list of exactly two hyper-parameters has to be provided.') param_indices = [self._get_hyper_parameter_index(self.transition_model, n) for n in names] # put parameter indices in ascending order for marginalization and plotting # original order is restored for returned values switch = False if not param_indices[0] < param_indices[1]: switch = True param_indices = param_indices[::-1] axes_to_marginalize = list(range(len(self.flat_hyper_parameter_names))) for p in param_indices: axes_to_marginalize.remove(p) # reshape hyper-parameter distribution for easy marginalizing hyper_grid_steps = [] for x in self.flat_hyper_parameters: if isinstance(x, Iterable): hyper_grid_steps.append(len(x)) else: hyper_grid_steps.append(1) distribution = self.hyper_parameter_distribution.reshape(hyper_grid_steps, order='C') marginal_distribution = np.squeeze(np.apply_over_axes(np.sum, distribution, axes_to_marginalize)) marginal_distribution *= np.prod(self.hyper_grid_constant) # convert to probability (from density) x, y = [self.flat_hyper_parameters[i] for i in param_indices] if np.any(np.abs(np.diff(np.diff(x))) > 10 ** -10): x2 = np.tile(np.arange(len(x)), (len(y), 1)).T else: x2 = np.tile(x, (len(y), 1)).T if np.any(np.abs(np.diff(np.diff(y))) > 10 ** -10): y2 = np.tile(np.arange(len(y)), (len(x), 1)) else: y2 = np.tile(y, (len(x), 1)) z = marginal_distribution if plot: # allow to add plot to predefined figure if figure is None: fig = plt.figure() else: fig = figure ax = fig.add_subplot(subplot, projection='3d') ax.bar3d(x2.flatten() - self.hyper_grid_constant[param_indices[0]]/2., y2.flatten() - self.hyper_grid_constant[param_indices[1]]/2., z.flatten()*0., self.hyper_grid_constant[param_indices[0]], self.hyper_grid_constant[param_indices[1]], z.flatten(), zsort='max', **kwargs ) # check for categorical hyper-parameter values if np.any(np.abs(np.diff(np.diff(x))) > 10 ** -10): plt.xticks(np.arange(len(x)), x) if np.any(np.abs(np.diff(np.diff(y))) > 10 ** -10): plt.yticks(np.arange(len(y)), y) ax.set_xlabel(self.flat_hyper_parameter_names[param_indices[0]]) ax.set_ylabel(self.flat_hyper_parameter_names[param_indices[1]]) ax.set_zlabel('probability') # restore original order of parameters before returning values if switch: x, y = y, x marginal_distribution = marginal_distribution.T return x, y, marginal_distribution
[docs] def plot(self, name, **kwargs): """ Convenience method to plot the temporal evolution of observation model parameters, the distribution of a parameter at a specific time step, or the distribution of a hyper-parameter. Args: name(str): name of the (hyper-)parameter to display color: color from which a light colormap is created (for parameter evolution only) gamma(float): exponent for gamma correction of the displayed marginal distribution; default: 0.5 (for parameter evolution only) t: Time step/stamp for which the parameter distribution is evaluated density(bool): If true, probability density is plotted; if false, probability values. Note: Only availble for parameters, not hyper-parameters. kwargs: all further keyword-arguments are passed to the axes object of the plot """ density = kwargs.pop('density', True) # plot parameter distribution at specific time step if 't' in kwargs.keys(): t = kwargs.pop('t') self.get_parameter_distribution(t, name, plot=True, density=density, **kwargs) else: # check is name belongs to hyper-parameter try: self._get_hyper_parameter_index(self.transition_model, name) hyper = True except PostProcessingError: hyper = False # for hyper-parameters, plot distribution if hyper: self.get_hyper_parameter_distribution(name, plot=True, **kwargs) # for parameters, plot temporal evolution else: # read additional kwargs (or set default values) color = kwargs.pop('color', 'b') gamma = kwargs.pop('gamma', 0.5) self.plot_parameter_evolution(name, color=color, gamma=gamma, **kwargs)
[docs] class ChangepointStudy(HyperStudy): """ Infers change-points and structural breaks. This class builds on the HyperStudy-class and the change-point transition model to perform a series of analyses with varying change point times. It subsequently computes the average model from all possible change points and creates a probability distribution of change point times. It supports any number of change-points and arbitarily combined models. """ def __init__(self, silent=False): super(ChangepointStudy, self).__init__(silent=silent) # store all possible combinations of change-points (even the ones that are assigned a probability of zero), # to reconstruct change-point distribution after analysis self.all_hyper_grid_values = [] self.all_hyper_prior_values = [] self.mask = [] # mask to select valid change-point combinations self.user_defined_grid = False # needed to ensure that user-defined hyper-grid is not overwritten by fit-method self.hyper_grid_backup = [] # needed to reconstruct hyper_grid attribute in the case of break-point model if not silent: print(' --> Change-point analysis')
[docs] def fit(self, forward_only=False, evidence_only=False, silent=False, n_jobs=1, cache_likelihoods='auto', max_cache_size=512): """ This method over-rides the corresponding method of the HyperStudy-class. It runs the algorithm for all possible combinations of change-points (and possible scans a range of values for other hyper-parameters). The posterior sequence represents the average model of all analyses. Posterior mean values are computed from this average model. Args: forward_only(bool): If set to True, the fitting process is terminated after the forward pass. The resulting posterior distributions are so-called "filtering distributions" which - at each time step - only incorporate the information of past data points. This option thus emulates an online analysis. evidence_only(bool): If set to True, only forward pass is run and evidence is calculated. In contrast to the forward_only option, no posterior mean values are computed and no posterior distributions are stored. silent(bool): If set to True, reduced output is generated by the fitting method. n_jobs(int): Number of processes to employ. Multiprocessing is based on the 'joblib' module. cache_likelihoods(bool or str): If set to True, observation likelihoods are cached for all time steps and reused across hyper-parameter values. If set to "auto", caching is enabled only when the estimated cache size fits into max_cache_size. If False, likelihoods are computed on demand. max_cache_size(float): Maximum likelihood cache size in MiB used by cache_likelihoods="auto". Set to None for no limit. """ # format data/timestamps once, so number of data segments is known self.formatted_data = moving_window(self.raw_data, self.observation_model.segment_length) self.formatted_timestamps = self.raw_timestamps[self.observation_model.segment_length - 1:] # nested serial transition models are not supported, as the correct order is not determined correctly if len(list(flatten(self._unpack_serial_transition_models(self.transition_model)))) > 1: raise NotImplementedError('Multiple instances of SerialTransition models are currently not supported by ' 'ChangepointStudy.') # determine names of change/break-points changepoints = list(flatten(self._unpack_changepoint_names(self.transition_model))) breakpoints = list(flatten(self._unpack_breakpoint_names(self.transition_model))) # both types are not allowed at the moment, as the correct order is not determined correctly if len(changepoints) > 0 and len(breakpoints) > 0: raise NotImplementedError('Detected both change-points (Changepoint transition model) and break-points ' '(SerialTransitionModel). Currently, only one type is supported in a single ' 'transition model.') # at least one change/break-point should be present if len(changepoints) == 0 and len(breakpoints) == 0: raise ConfigurationError('No change-points or break-points detected in transition model. Check transition ' 'model.') self.flat_hyper_parameters = self._unpack_all_hyper_parameters() self.flat_hyper_parameter_names = self._unpack_all_hyper_parameters(values=False) # create hyper_grid in the case of change-points if len(changepoints) > 0: print('+ Detected {} change-point(s) in transition model: {}'.format(len(changepoints), changepoints)) points = changepoints else: print('+ Detected {} break-point(s) in transition model: {}'.format(len(breakpoints), breakpoints)) points = breakpoints # first create standard hyper-grid self._create_hyper_grid() self.all_hyper_grid_values = self.hyper_grid_values[:] self.all_hyper_prior_values = self.flat_hyper_prior_values[:] # extract hyper-grid values that belong to changepoints point_mask = np.sum([np.array(self.flat_hyper_parameter_names) == p for p in points], axis=0).astype(bool) masked_hyper_grid_values = self.all_hyper_grid_values[:, point_mask] # only accept if change-point values are ordered (and not equal) self.mask = np.array( [all(x[i] < x[i + 1] for i in range(len(points) - 1)) for x in masked_hyper_grid_values], dtype=bool) self.hyper_grid_values = self.all_hyper_grid_values[self.mask] self.flat_hyper_prior_values = self.all_hyper_prior_values[self.mask] # correct prior values to ensure correct normalization after sorting out combinations self.flat_hyper_prior_values *= np.sum(self.all_hyper_prior_values)/np.sum(self.all_hyper_prior_values[self.mask]) # call fit method of hyper-study HyperStudy.fit(self, forward_only=forward_only, evidence_only=evidence_only, silent=silent, n_jobs=n_jobs, custom_hyper_grid=True, cache_likelihoods=cache_likelihoods, max_cache_size=max_cache_size) # for proper plotting, hyper_grid_values must include all possible combinations of hyper-parameter values. We # therefore have to include invalid combinations and assign the probability zero to them. temp = np.zeros(len(self.all_hyper_grid_values)) temp[self.mask] = self.hyper_parameter_distribution self.hyper_parameter_distribution = temp temp = np.zeros(len(self.all_hyper_prior_values)) temp[self.mask] = self.flat_hyper_prior_values self.flat_hyper_prior_values = temp
def _unpack_serial_transition_models(self, transition_model): """ Returns list of all occurrences of serial transition models in the transition model, nested like the transition model. Returns: list: all serial transition models """ model_list = [] # recursion step for sub-models if hasattr(transition_model, 'models'): for m in transition_model.models: model_list.append(self._unpack_serial_transition_models(m)) # extend hyper-parameter based on current (sub-)model if hasattr(transition_model, 'hyper_parameter_names'): if str(transition_model) == 'Serial transition model': model_list.append(transition_model) return model_list
[docs] def get_duration_distribution(self, names, plot=False, **kwargs): """ Computes the distribution of the number of time steps between two change/break-points. This distribution of duration is created from the joint distribution of the two specified change/break-points. Args: names(list): List of two parameter names of change/break-points to display (first and second model parameter) plot(bool): If True, a bar chart of the distribution is created **kwargs: All further keyword-arguments are passed to the bar-plot (see matplotlib documentation) Returns: ndarray, ndarray: The first array contains the number of time steps, the second one the corresponding probability values. """ # check if list with two elements is provided if not isinstance(names, Iterable): raise PostProcessingError('A list of exactly two hyper-parameters has to be provided.') elif not len(names) == 2: raise PostProcessingError('A list of exactly two hyper-parameters has to be provided.') param_indices = [self._get_hyper_parameter_index(self.transition_model, n) for n in names] # check if parameter indices are in ascending order (so axes are labeled correctly) if not param_indices[0] < param_indices[1]: print('! WARNING: Switching order of change-/breakpoints to obtain positive values for duration.') param_indices = param_indices[::-1] values = self.hyper_grid_values[:, param_indices].T duration = np.unique(values[1] - values[0]) # get all possible differences between time points duration_distribution = np.zeros(len(duration)) # initialize array for distribution # loop over all hyper-grid points and collect probabilities for different durations for i, values in enumerate(self.all_hyper_grid_values[:, param_indices]): if values[1] > values[0]: # get matching index in duration (rounding needed because of finite precision) idx = np.where(duration.round(10) == (values[1]-values[0]).round(10))[0][0] duration_distribution[idx] += self.hyper_parameter_distribution[i] # properly normalize duration distribution duration_distribution /= np.sum(duration_distribution) if plot: plt.bar(duration, duration_distribution, align='center', width=duration[0], **kwargs) plt.xlabel('duration between {} and {} (in time steps)' .format(self.flat_hyper_parameter_names[param_indices[0]], self.flat_hyper_parameter_names[param_indices[1]])) plt.ylabel('probability') return duration, duration_distribution
[docs] class OnlineStudy(HyperStudy): """ Enables model selection for online data streams. This class builds on the Study-class and features a step-method to include new data points in the study as they arrive from a data stream. This online-analysis is performed in an forward-only way, resulting in filtering-distributions only. In contrast to a normal study, however, one can add multiple transition models to account for different types of parameter dynamics (similar to a Hyper study). The Online study then computes the probability distribution over all transition models for each new data point (or all past data points), enabling real-time model selection. Args: store_history(bool): If true, posterior distributions and their mean values, as well as hyper-posterior distributions are stored for all time steps. """ def __init__(self, store_history=False, silent=False): super(OnlineStudy, self).__init__(silent=silent) self.first_step = True self.transition_models = [] self.transition_model_names = [] self.tm_count = None self.tm_counts = [] self.hyper_parameter_values = [] self.all_flat_hyper_parameter_values = [] self.hyper_parameter_names = [] self.hyper_grid_constants = [] self.log_evidence_list = None self.hyper_log_evidence_list = None self.hyper_prior = [] self.hyper_prior_values = [] self.transition_model_prior = None self.parameter_posterior = None self.transition_model_posterior = None self.marginalized_posterior = None self.hyper_parameter_distribution = None self.transition_model_distribution = None self.local_transition_model_distribution = None self.store_history = store_history self.posterior_mean_values = [] self.posterior_sequence = [] self.hyper_parameter_sequence = [] self.transition_model_sequence = [] self.local_transition_model_sequence = [] if not silent: print(' --> Online study')
[docs] def add_transition_model(self, name, transition_model): """ Adds a transition model to the list of transition models that are fitted in each time step. Note that a list of hyper-parameter values can be supplied. Args: name(str): a custom name for this transition model to identify it in post-processing methods transition_model: instance of a transition model class. Example:: S.set_observation_model(bl.om.Poisson('lambda', bl.oint(0, 6, 1000))) T = bl.tm.GaussianRandomWalk('sigma', [0, 0.1, 0.2, 0.3], target='lambda') S.add_transition_model('dynamic', T) """ # extract hyper-parameter values and names self.set_transition_model(transition_model, silent=True) self._create_hyper_grid(silent=True) self.transition_models.append(transition_model) self.transition_model_names.append(name) self.hyper_parameter_values.append(self.hyper_grid_values[:]) self.all_flat_hyper_parameter_values.append(self.flat_hyper_parameters) self.hyper_parameter_names.append(self.flat_hyper_parameter_names[:]) self.hyper_grid_constants.append(self.hyper_grid_constant[:]) self.hyper_prior.append(self.flat_hyper_priors[:]) self.hyper_prior_values.append(self.flat_hyper_prior_values[:]) # count individual transition models self.tm_counts = [] for hpv in self.hyper_parameter_values: if len(hpv) > 0: self.tm_counts.append(len(hpv)) else: self.tm_counts.append(1) self.tm_count = np.sum(self.tm_counts) if len(self.hyper_grid_values) > 0: print('+ Added transition model: {} ({} combination(s) of the following hyper-parameters: {})' .format(name, len(self.hyper_grid_values), self.hyper_parameter_names[-1])) else: print('+ Added transition model: {} (no hyper-parameters)'.format(name))
[docs] def set_transition_model_prior(self, transition_model_prior, silent=False): """ Sets prior probabilities for transition models added to the online study instance. Args: transition_model_prior: List/Array of probabilities, one for each transition model. If the list does not sum to one, it will be re-normalised. silent: If true, no output is generated by this method. """ if not (isinstance(transition_model_prior, Iterable) and len(transition_model_prior) == len(self.transition_models)): raise ConfigurationError('Length of transition model prior ({}) does not fit number of transition models ' '({})'.format(len(transition_model_prior), len(self.transition_models))) self.transition_model_prior = np.array(transition_model_prior) if not np.sum(transition_model_prior) == 1.: print('+ WARNING: Transition model prior does not sum up to one. Will re-normalize.') self.transition_model_prior /= np.sum(self.transition_model_prior) if not silent: print('+ Set custom transition model prior.')
[docs] def step(self, data_point): """ Update the current parameter distribution by adding a new data point to the data set. Args: data_point(float, int, ndarray): Float, int, or 1D-array of those (for multidimensional data). """ # at least one transition model has to be set or added if (self.tm_count is None) and (self.transition_model is None): raise ConfigurationError('No transition model set or added.') # if one only sets a transition model, but does not use add_transition_model, we add it here if (self.tm_count is None) and (self.transition_model is not None): self.add_transition_model('transition model', self.transition_model) if not isinstance(data_point, list): data_point = [data_point] if len(self.raw_data) == 0: print('+ Start model fit') # consistency check to detect duplicate hyper-parameter names in different transition models all_names = list(flatten(self.hyper_parameter_names)) if len(all_names) != len(np.unique(all_names)): raise ConfigurationError('Detected duplicate hyper-parameter names. Choose unique identifiers.') # check the model consistency the first time that 'step' is called self.raw_data = np.array(data_point) Study._check_consistency(self) self.raw_timestamps = np.array([0]) self.formatted_timestamps = [] else: self.raw_data = np.append(self.raw_data, np.array(data_point), axis=0) self.raw_timestamps = np.append(self.raw_timestamps, self.raw_timestamps[-1]+1) # only proceed if at least one data segment can be created if len(self.raw_data) < self.observation_model.segment_length: print('+ Not enough data points to start analysis. Will wait for more data.') return self.formatted_timestamps.append(self.raw_timestamps[-1]) if self.first_step: # initialize parameter prior distribution prior = self._compute_prior(silent=False) # initialize hyper-prior as flat if self.hyper_prior is None: self.hyper_prior = 'flat hyper-prior' self.hyper_prior_values = [np.ones(tmc) / (tmc * np.prod(hgc)) for tmc, hgc in zip(self.tm_counts, self.hyper_grid_constants)] print(' + Set flat hyper-prior.') # initialize transition model prior as flat if self.transition_model_prior is None: self.transition_model_prior = np.ones(len(self.transition_models))/len(self.transition_models) if len(self.transition_models) > 1: print(' + Set flat transition model prior.') # initialize log_evidence_list as an array of zeros and add log-lattice-constant for proper normalization self.log_evidence_list = [np.zeros(tmc)+np.log(np.prod(self.lattice_constant)) for tmc in self.tm_counts] self.hyper_log_evidence_list = np.array([0. for tmc in self.tm_counts]) # initialize parameter posterior self.parameter_posterior = [np.zeros([tmc] + self.grid_size) for tmc in self.tm_counts] # initialize hyper-parameter distribution self.hyper_parameter_distribution = [np.zeros(tmc) for tmc in self.tm_counts] # initialize (local) transition model distribution self.transition_model_distribution = np.zeros(len(self.transition_models)) self.local_transition_model_distribution = np.zeros(len(self.transition_models)) # initialize transition model posterior self.transition_model_posterior = np.zeros([len(self.transition_models)] + self.grid_size) # initialize marginalized posterior self.marginalized_posterior = np.zeros(self.grid_size) # select data segment data_segment = self.raw_data[-self.observation_model.segment_length:] # compute current likelihood only once likelihood = self.observation_model.processed_pdf(self.grid, data_segment) # loop over all hypotheses/transition models for i, (tm, hpv) in enumerate(zip(self.transition_models, self.hyper_parameter_values)): self.set_transition_model(tm, silent=True) # set current transition model # account for transition models with no hyper-parameters if len(hpv) == 0: hpv = [None] # loop over all hyper-parameter values to fit for j, x in enumerate(hpv): # set current hyper-parameter values if x is not None: self._set_all_hyper_parameters(x) # compute alpha_i if self.first_step: # first time step, so use predefined prior alphai = prior*likelihood else: # in all other time steps transform "old" alpha/posterior alphai = self.transition_model.compute_forward_prior(self.parameter_posterior[i][j], len(self.formatted_data)-1)*likelihood ni = np.sum(alphai) # update log-evidence list self.log_evidence_list[i][j] += np.log(ni) self.hyper_parameter_distribution[i][j] = self.log_evidence_list[i][j] + np.log(self.hyper_prior_values[i][j]) # store parameter posterior self.parameter_posterior[i][j] = alphai/ni # marginalize model evidence wrt hyper-parameters, for all past data points and only for the current one old_hyper_evidence = self.hyper_log_evidence_list[i] self.hyper_log_evidence_list[i] = logsumexp(self.log_evidence_list[i] + np.log(self.hyper_prior_values[i])) self.transition_model_distribution[i] = self.hyper_log_evidence_list[i] self.local_transition_model_distribution[i] = self.hyper_log_evidence_list[i] - old_hyper_evidence + \ np.log(self.transition_model_prior[i]) # normalize hyper-parameter distribution of current transition model self.hyper_parameter_distribution[i] -= np.amax(self.hyper_parameter_distribution[i]) self.hyper_parameter_distribution[i] = np.exp(self.hyper_parameter_distribution[i]) self.hyper_parameter_distribution[i] /= np.sum(self.hyper_parameter_distribution[i]) if len(self.hyper_grid_constants[i]) > 0: self.hyper_parameter_distribution[i] /= np.prod(self.hyper_grid_constants[i]) # compute parameter posterior, marginalized wrt the hyper-parameters of the current transition model hpd = deepcopy(self.hyper_parameter_distribution[i]) while len(self.parameter_posterior[i].shape) > len(hpd.shape): hpd = np.expand_dims(hpd, axis=-1) self.transition_model_posterior[i] = np.sum(self.parameter_posterior[i] * hpd * np.prod(self.hyper_grid_constants[i]), axis=0) # normalize (local) transition model distribution (and transform to linear space) self.transition_model_distribution -= np.amax(self.transition_model_distribution) self.transition_model_distribution = np.exp(self.transition_model_distribution) self.transition_model_distribution /= np.sum(self.transition_model_distribution) self.local_transition_model_distribution -= np.amax(self.local_transition_model_distribution) self.local_transition_model_distribution = np.exp(self.local_transition_model_distribution) self.local_transition_model_distribution /= np.sum(self.local_transition_model_distribution) # normalize marginalized parameter posterior tmd = deepcopy(self.transition_model_distribution) while len(self.transition_model_posterior.shape) > len(tmd.shape): tmd = np.expand_dims(tmd, axis=-1) self.marginalized_posterior = np.sum(self.transition_model_posterior * tmd, axis=0) # compute log-evidence value marginalized over different transition models self.log_evidence = logsumexp(self.hyper_log_evidence_list + np.log(self.transition_model_prior)) # store results for future plotting if self.store_history: self.posterior_mean_values.append(np.array([np.sum(self.marginalized_posterior*g) for g in self.grid])) self.posterior_sequence.append(self.marginalized_posterior.copy()) self.hyper_parameter_sequence.append(deepcopy(self.hyper_parameter_distribution)) self.transition_model_sequence.append(deepcopy(self.transition_model_distribution)) self.local_transition_model_sequence.append(deepcopy(self.local_transition_model_distribution)) if self.first_step: self.first_step = False
[docs] def fit(self, *args, **kwargs): raise NotImplementedError('OnlineStudy object has no "fit" method. Use "step" instead.')
[docs] def get_parameter_distribution(self, t, name, plot=False, density=True, **kwargs): """ Compute the marginal parameter distribution at a given time step. Only available if Online Study is created with flag 'store_history=True'. Args: t(int, float): Time step/stamp for which the parameter distribution is evaluated name(str): Name of the parameter to display plot(bool): If True, a plot of the distribution is created density(bool): If true, probability density is plotted; if false, probability values. **kwargs: All further keyword-arguments are passed to the plot (see matplotlib documentation) Returns: ndarray, ndarray: The first array contains the parameter values, the second one the corresponding probability values """ if not self.store_history: raise PostProcessingError('To get past parameter distributions, Online Study must be called with flag' '"store_history=True". Use "get_current_parameter_distribution" instead.') # plotting function of Study class can only handle arrays, not lists self.formatted_timestamps = np.array(self.formatted_timestamps) self.posterior_sequence = np.array(self.posterior_sequence) try: return Study.get_parameter_distribution(self, t, name, plot=plot, density=density, **kwargs) finally: # re-transform arrays to lists, so online study may continue to append values self.formatted_timestamps = list(self.formatted_timestamps) self.posterior_sequence = list(self.posterior_sequence)
[docs] def get_current_parameter_distribution(self, name, plot=False, density=True, **kwargs): """ Compute the current marginal parameter distribution. Args: name(str): Name of the parameter to display plot(bool): If True, a plot of the distribution is created density(bool): If true, probability density is plotted; if false, probability values. **kwargs: All further keyword-arguments are passed to the plot (see matplotlib documentation) Returns: ndarray, ndarray: The first array contains the parameter values, the second one the corresponding probability values """ # get parameter index param_index = -1 for i, n in enumerate(self.observation_model.parameter_names): if n == name: param_index = i # check if match was found if param_index == -1: raise PostProcessingError('Wrong parameter name. Available options: {0}' .format(self.observation_model.parameter_names)) axes_to_marginalize = list(range(len(self.observation_model.parameter_names))) try: axes_to_marginalize.remove(param_index) except ValueError: raise PostProcessingError('Wrong parameter index. Available indices: {}'.format(axes_to_marginalize)) x = self.marginal_grid[param_index] dx = self.lattice_constant[param_index] marginal_distribution = np.squeeze( np.apply_over_axes(np.sum, self.marginalized_posterior, axes_to_marginalize)).copy() if density: marginal_distribution /= dx if plot: plt.fill_between(x, 0, marginal_distribution, **kwargs) plt.xlabel(self.observation_model.parameter_names[param_index]) if density: plt.ylabel('probability density') else: plt.ylabel('probability') return x, marginal_distribution
[docs] def get_parameter_distributions(self, name, plot=False, density=True, **kwargs): """ Computes the time series of marginal posterior distributions with respect to a given model parameter. Only available if Online Study is created with flag 'store_history=True'. Args: name(str): Name of the parameter to display plot(bool): If True, a plot of the series of distributions is created (density map) density(bool): If true, probability density is plotted; if false, probability values. **kwargs: All further keyword-arguments are passed to the plot (see matplotlib documentation) Returns: ndarray, ndarray: The first array contains the parameter values, the second one the sequence of corresponding posterior distributions. """ if not self.store_history: raise PostProcessingError('To get past parameter distributions, Online Study must be called with flag' '"store_history=True". Use "get_current_parameter_distribution" instead.') # plotting function of Study class can only handle arrays, not lists self.formatted_timestamps = np.array(self.formatted_timestamps) self.posterior_sequence = np.array(self.posterior_sequence) try: return Study.get_parameter_distributions(self, name, plot=plot, density=density, **kwargs) finally: # re-transform arrays to lists, so online study may continue to append values self.formatted_timestamps = list(self.formatted_timestamps) self.posterior_sequence = list(self.posterior_sequence)
[docs] def plot_parameter_evolution(self, name, color='b', gamma=0.5, **kwargs): """ Plots a series of marginal posterior distributions corresponding to a single model parameter, together with the posterior mean values. Only available if Online Study is created with flag 'store_history=True'. Args: name(str): Name of the parameter to display color: color from which a light colormap is created gamma(float): exponent for gamma correction of the displayed marginal distribution; default: 0.5 kwargs: all further keyword-arguments are passed to the plot of the posterior mean values """ if not self.store_history: raise PostProcessingError('To plot past parameter distributions, Online Study must be called with flag' '"store_history=True". Use "get_current_parameter_distribution" instead.') # plotting function of Study class can only handle arrays, not lists self.formatted_timestamps = np.array(self.formatted_timestamps) self.posterior_mean_values = np.array(self.posterior_mean_values).T self.posterior_sequence = np.array(self.posterior_sequence) try: Study.plot_parameter_evolution(self, name, color=color, gamma=gamma, **kwargs) finally: # re-transform arrays to lists, so online study may continue to append values self.formatted_timestamps = list(self.formatted_timestamps) self.posterior_mean_values = list(self.posterior_mean_values.T) self.posterior_sequence = list(self.posterior_sequence)
[docs] def get_current_transition_model_distribution(self, local=False): """ Returns the current probabilities for each transition model defined in the Online Study. Args: local(bool): If true, transition model distribution taking into account only the last data point is returned. Returns: ndarray, ndarray: Arrays of transition model names and normalized probabilities. """ if local: return np.array(self.transition_model_names), self.local_transition_model_distribution else: return np.array(self.transition_model_names), self.transition_model_distribution
[docs] def get_current_transition_model_probability(self, transition_model, local=False): """ Returns the current posterior probability for a specified transition model. Args: transition_model(str): Name of the transition model local(bool): If true, transition model probability taking into account only the last data point is returned. Returns: float: Posterior probability value for the specified transition model """ transition_model_index = self.transition_model_names.index(transition_model) if local: return self.local_transition_model_distribution[transition_model_index] else: return self.transition_model_distribution[transition_model_index]
[docs] def get_transition_model_distributions(self, local=False): """ The transition model distribution contains posterior probability values for all transition models included in the online study. This distribution is available for all time steps analyzed. Only available if Online Study is created with flag 'store_history=True'. Args: local(bool): If true, transition model distributions taking into account only the data point at the corresponding time step is returned. Returns: ndarray, ndarray: Arrays containing the names and posterior probability values for all transition models included in the online study for all time steps analyzed """ if not self.store_history: raise PostProcessingError('To get past transition model distributions, Online Study must be called with ' 'flag "store_history=True". Use "get_current_transition_model_distribution" instead.') if local: return np.array(self.transition_model_names), np.array(self.local_transition_model_sequence) else: return np.array(self.transition_model_names), np.array(self.transition_model_sequence)
[docs] def get_transition_model_probabilities(self, transition_model, local=False): """ Returns posterior probability values for a specified transition model. This distribution is available for all time steps analyzed. Only available if Online Study is created with flag 'store_history=True'. Args: transition_model(str): Name of the transition model local(bool): If true, transition model probabilities taking into account only the data point at the corresponding time step is returned. Returns: ndarray: Array containing the posterior probability values for the specified transition model for all time steps analyzed """ if not self.store_history: raise PostProcessingError('To get past transition model distributions, Online Study must be called with ' 'flag "store_history=True". Use "get_current_transition_model_distribution" instead.') transition_model_index = self.transition_model_names.index(transition_model) if local: return np.array(self.local_transition_model_sequence)[:, transition_model_index] else: return np.array(self.transition_model_sequence)[:, transition_model_index]
[docs] def get_current_parameter_mean_value(self, name): """ Returns the posterior mean value for a given parameter of the observation model. Args: name(str): Name of the parameter Returns: float: posterior mean value """ # get parameter index param_index = -1 for i, n in enumerate(self.observation_model.parameter_names): if n == name: param_index = i # check if match was found if param_index == -1: raise PostProcessingError('Wrong parameter name. Available options: {0}' .format(self.observation_model.parameter_names)) mean = np.sum(self.marginalized_posterior*self.grid[param_index]) return mean
[docs] def get_parameter_mean_value(self, t, name): """ Returns the posterior mean value for a given parameter of the observation model at a specified time step. Only available if Online Study is created with flag 'store_history=True'. Args: t(int): Time step at which to compute parameter mean value name(str): Name of the parameter Returns: float: posterior mean value """ if not self.store_history: raise PostProcessingError('To get past parameter mean values, Online Study must be called with ' 'flag "store_history=True". Use "get_current_parameter_mean_value" instead.') # get parameter index param_index = -1 for i, n in enumerate(self.observation_model.parameter_names): if n == name: param_index = i # check if match was found if param_index == -1: raise PostProcessingError('Wrong parameter name. Available options: {0}' .format(self.observation_model.parameter_names)) # access parameter distribution if t not in self.formatted_timestamps: raise PostProcessingError('Supplied time ({}) does not exist in data or is out of range.'.format(t)) time_index = list(self.formatted_timestamps).index(t) # to select corresponding posterior distribution parameter_distribution = self.posterior_sequence[time_index] mean = np.sum(parameter_distribution * self.grid[param_index]) return mean
[docs] def get_parameter_mean_values(self, name): """ Returns the posterior mean value for a given parameter of the observation model for all time steps. Only available if Online Study is created with flag 'store_history=True'. Args: name(str): Name of the parameter Returns: ndarray: posterior mean values """ if not self.store_history: raise PostProcessingError('To get past parameter mean values, Online Study must be called with ' 'flag "store_history=True". Use "get_current_parameter_mean_value" instead.') # get parameter index param_index = -1 for i, n in enumerate(self.observation_model.parameter_names): if n == name: param_index = i # check if match was found if param_index == -1: raise PostProcessingError('Wrong parameter name. Available options: {0}' .format(self.observation_model.parameter_names)) mean = np.array(self.posterior_mean_values).T[param_index] return mean
[docs] def get_hyper_parameter_mean_value(self, t, name): """ Computes the mean value of the joint hyper-parameter distribution for a given hyper-parameter at a given time step. Only available if Online Study is created with flag 'store_history=True'. Args: t(int): Time step at which to compute distribution name(str): name of hyper-parameter Returns: ndarray: Array containing the mean values of all hyper-parameters of the given transition model """ if not self.store_history: raise PostProcessingError('To get past hyper-parameter mean values, Online Study must be called with ' 'flag "store_history=True". Use "getCurrentHyperParameterMeanValue" instead.') # determine indices of transition model and hyper-parameter hp_index = -1 for i, tm in enumerate(self.transition_models): try: hp_index = self._get_hyper_parameter_index(tm, name) tm_index = i except PostProcessingError: pass if hp_index == -1: raise PostProcessingError('No hyper-parameter "{}" found. Check hyper-parameter names.'.format(name)) # access hyper-parameter distribution if t not in self.formatted_timestamps: raise PostProcessingError('Supplied time ({}) does not exist in data or is out of range.'.format(t)) time_index = list(self.formatted_timestamps).index(t) # to select corresponding posterior distribution hyper_parameter_distribution = self.hyper_parameter_sequence[time_index] hyper_parameter_distribution = hyper_parameter_distribution[tm_index][:, None] hyper_parameter_values = self.hyper_parameter_values[tm_index] hyper_grid_constants = self.hyper_grid_constants[tm_index] # compute mean value mean = np.sum(hyper_parameter_values*hyper_parameter_distribution*np.prod(hyper_grid_constants), axis=0) return mean[hp_index]
[docs] def get_hyper_parameter_mean_values(self, name): """ Computes the sequence of mean value of the joint hyper-parameter distribution for a given hyper-parameter for all time steps. Only available if Online Study is created with flag 'store_history=True'. Args: name(str): name of hyper-parameter Returns: ndarray: Array containing the sequences of mean values of the given hyper-parameter """ if not self.store_history: raise PostProcessingError('To get past hyper-parameter mean values, Online Study must be called with ' 'flag "store_history=True". Use "getCurrentHyperParameterMeanValue" instead.') # determine indices of transition model and hyper-parameter hp_index = -1 for i, tm in enumerate(self.transition_models): try: hp_index = self._get_hyper_parameter_index(tm, name) tm_index = i except PostProcessingError: pass if hp_index == -1: raise PostProcessingError('No hyper-parameter "{}" found. Check hyper-parameter names.'.format(name)) hyper_parameter_sequence = np.array([hp[tm_index].tolist() for hp in self.hyper_parameter_sequence])[:, :, None] hyper_parameter_values = self.hyper_parameter_values[tm_index] hyper_grid_constants = self.hyper_grid_constants[tm_index] # compute mean value mean = np.sum(hyper_parameter_sequence * hyper_parameter_values * np.prod(hyper_grid_constants), axis=1).T return mean[hp_index]
[docs] def get_hyper_parameter_distribution(self, t, name, plot=False, **kwargs): """ Computes marginal hyper-parameter distribution of a single hyper-parameter at a specific time step in an OnlineStudy fit. Only available if Online Study is created with flag 'store_history=True'. Args: t(int): Time step at which to compute distribution, or 'avg' for time-averaged distribution name(str): hyper-parameter name plot(bool): If True, a bar chart of the distribution is created **kwargs: All further keyword-arguments are passed to the bar-plot (see matplotlib documentation) Returns: ndarray, ndarray: The first array contains the hyper-parameter values, the second one the corresponding probability values """ if not self.store_history: raise PostProcessingError('To get past hyper-parameter distributions, Online Study must be called with ' 'flag "store_history=True". Use "get_current_hyper_parameter_distribution" instead.') # determine indices of transition model and hyper-parameter hp_index = -1 for i, tm in enumerate(self.transition_models): try: hp_index = self._get_hyper_parameter_index(tm, name) tm_index = i except PostProcessingError: pass if hp_index == -1: raise PostProcessingError('No hyper-parameter "{}" found. Check hyper-parameter names.'.format(name)) # access hyper-parameter distribution if t == 'avg': # compute time-averaged distribution hyper_parameter_distribution = np.sum(self.hyper_parameter_sequence, axis=0)/len(self.hyper_parameter_sequence) else: # try to access distribution of specified time step if t not in self.formatted_timestamps: raise PostProcessingError('Supplied time ({}) does not exist in data or is out of range.'.format(t)) time_index = list(self.formatted_timestamps).index(t) # to select corresponding posterior distribution hyper_parameter_distribution = self.hyper_parameter_sequence[time_index] hyper_parameter_distribution = hyper_parameter_distribution[tm_index] axes_to_marginalize = list(range(len(self.hyper_parameter_names[tm_index]))) axes_to_marginalize.remove(hp_index) # reshape hyper-parameter grid for easy marginalization hyper_grid_steps = [len(x) for x in self.all_flat_hyper_parameter_values[tm_index]] distribution = hyper_parameter_distribution.reshape(hyper_grid_steps, order='C') marginal_distribution = np.squeeze(np.apply_over_axes(np.sum, distribution, axes_to_marginalize)) x = self.all_flat_hyper_parameter_values[tm_index][hp_index] if plot: # check if categorical if np.any(np.abs(np.diff(np.diff(x))) > 10 ** -10): plt.bar(np.arange(len(x)), marginal_distribution, align='center', width=1., **kwargs) plt.xticks(np.arange(len(x)), x) plt.ylabel('probability') # regular spacing else: plt.bar(x, marginal_distribution, align='center', width=self.hyper_grid_constants[tm_index][hp_index], **kwargs) plt.ylabel('probability') plt.xlabel(self.hyper_parameter_names[tm_index][hp_index]) return x, marginal_distribution
[docs] def get_current_hyper_parameter_distribution(self, name, plot=False, **kwargs): """ Computes marginal hyper-parameter distribution of a single hyper-parameter at the current time step in an OnlineStudy fit. Args: name(str): hyper-parameter name plot(bool): If True, a bar chart of the distribution is created **kwargs: All further keyword-arguments are passed to the bar-plot (see matplotlib documentation) Returns: ndarray, ndarray: The first array contains the hyper-parameter values, the second one the corresponding probability values """ # determine indices of transition model and hyper-parameter hp_index = -1 for i, tm in enumerate(self.transition_models): try: hp_index = self._get_hyper_parameter_index(tm, name) tm_index = i except PostProcessingError: pass if hp_index == -1: raise PostProcessingError('No hyper-parameter "{}" found. Check hyper-parameter names.'.format(name)) hyper_parameter_distribution = self.hyper_parameter_distribution[tm_index] axes_to_marginalize = list(range(len(self.hyper_parameter_names[tm_index]))) axes_to_marginalize.remove(hp_index) # reshape hyper-parameter grid for easy marginalization hyper_grid_steps = [len(x) for x in self.all_flat_hyper_parameter_values[tm_index]] distribution = hyper_parameter_distribution.reshape(hyper_grid_steps, order='C') marginal_distribution = np.squeeze(np.apply_over_axes(np.sum, distribution, axes_to_marginalize)) marginal_distribution *= np.prod(self.hyper_grid_constants[tm_index]) x = self.all_flat_hyper_parameter_values[tm_index][hp_index] if plot: # check if categorical if np.any(np.abs(np.diff(np.diff(x))) > 10 ** -10): plt.bar(np.arange(len(x)), marginal_distribution, align='center', width=1., **kwargs) plt.xticks(np.arange(len(x)), x) plt.ylabel('probability') # regular spacing else: plt.bar(x, marginal_distribution, align='center', width=self.hyper_grid_constants[tm_index][hp_index], **kwargs) plt.ylabel('probability') plt.xlabel(self.hyper_parameter_names[tm_index][hp_index]) return x, marginal_distribution
[docs] def get_hyper_parameter_distributions(self, name): """ Computes marginal hyper-parameter distributions of a single hyper-parameter for all time steps in an OnlineStudy fit. Only available if Online Study is created with flag 'store_history=True'. Args: name(str): hyper-parameter name Returns: ndarray, ndarray: The first array contains the hyper-parameter values, the second one the corresponding probability values (first axis is time). """ if not self.store_history: raise PostProcessingError('To get past hyper-parameter distributions, Online Study must be called with ' 'flag "store_history=True". Use "get_current_hyper_parameter_distribution" instead.') # determine indices of transition model and hyper-parameter hp_index = -1 for i, tm in enumerate(self.transition_models): try: hp_index = self._get_hyper_parameter_index(tm, name) tm_index = i except PostProcessingError: pass if hp_index == -1: raise PostProcessingError('No hyper-parameter "{}" found. Check hyper-parameter names.'.format(name)) hyper_parameter_sequence = np.array([x[tm_index] for x in self.hyper_parameter_sequence]) # marginalize the hyper-posterior probabilities hpv = np.array(self.hyper_parameter_values[tm_index]) param_hpv = hpv[:, hp_index] unique_values = np.sort(np.unique(param_hpv)) marginal_distribution = [] for value in unique_values: marginal_distribution.append([]) indices = np.where(param_hpv == value) for hp in hyper_parameter_sequence: probabilities = hp[indices] marginal_distribution[-1].append(np.sum(probabilities)) marginal_distribution = np.array(marginal_distribution).T # renormalize marginal probability distribution marginal_distribution /= np.sum(marginal_distribution, axis=1)[:, None] return unique_values, marginal_distribution
[docs] def plot_hyper_parameter_evolution(self, name, color='b', gamma=0.5, **kwargs): """ Plot method to display a series of marginal posterior distributions corresponding to a single model parameter. This method includes the removal of plotting artefacts, gamma correction as well as an overlay of the posterior mean values. Only available if Online Study is created with flag 'store_history=True'. Args: name(str): hyper-parameter name color: color from which a light colormap is created gamma(float): exponent for gamma correction of the displayed marginal distribution; default: 0.5 kwargs: all further keyword-arguments are passed to the plot of the posterior mean values """ if not self.store_history: raise PostProcessingError('To get past hyper-parameter distributions, Online Study must be called with ' 'flag "store_history=True". Use "get_current_hyper_parameter_distribution" instead.') # determine indices of transition model and hyper-parameter hp_index = -1 for i, tm in enumerate(self.transition_models): try: hp_index = self._get_hyper_parameter_index(tm, name) tm_index = i except PostProcessingError: pass if hp_index == -1: raise PostProcessingError('No hyper-parameter "{}" found. Check hyper-parameter names.'.format(name)) # get sequence of hyper-parameter distributions unique_values, marginal_distribution = self.get_hyper_parameter_distributions(name) # compute hyper-posterior mean values mean_values = self.get_hyper_parameter_mean_values(name) # clean up very small probability values, as they may create image artefacts pmax = np.amax(marginal_distribution) marginal_distribution[marginal_distribution < pmax * (10 ** -20)] = 0 plt.imshow(marginal_distribution.T ** gamma, origin='lower', cmap=create_colormap(color), extent=[self.formatted_timestamps[0], self.formatted_timestamps[-1]] + [unique_values[0], unique_values[-1]], aspect='auto') # set default color of plot to black if ('c' not in kwargs) and ('color' not in kwargs): kwargs['c'] = 'k' # set default linewidth to 1.5 if ('lw' not in kwargs) and ('linewidth' not in kwargs): kwargs['lw'] = 1.5 plt.plot(self.formatted_timestamps, mean_values, **kwargs) plt.ylim(unique_values[0], unique_values[-1]) plt.ylabel(self.hyper_parameter_names[tm_index][hp_index]) plt.xlabel('time step')
[docs] def get_joint_hyper_parameter_distribution(self, names, plot=False, figure=None, subplot=111, **kwargs): raise NotImplementedError('This method is not available in "OnlineStudy".')
[docs] def plot(self, name, **kwargs): """ Convenience method to plot the temporal evolution of (hyper-)parameters, the distribution of a (hyper-)parameter at a specific time step, or the temporal evolution of the probability of a transition model. Args: name(str): name of the (hyper-)parameter or transition model to display color: color from which a light colormap is created (for (hyper-)parameter evolution only) gamma(float): exponent for gamma correction of the displayed marginal distribution; default: 0.5 (for (hyper-)parameter evolution only) t: Time step/stamp for which the parameter distribution is evaluated density(bool): If true, probability density is plotted; if false, probability values. Note: Only availble for parameters, not hyper-parameters. local(bool): If true, transition model probabilities taking into account only the data point at the corresponding time step is returned kwargs: all further keyword-arguments are passed to the axes object of the plot """ density = kwargs.pop('density', True) # check if time-step/stamp is supplied t = kwargs.pop('t', None) # check if results were stored if t is not None and not self.store_history: raise PostProcessingError('Online study has only stored current parameter data ("store_history=False"), ' 'no time step can be specified, only current (hyper-)parameter distributions will' 'be plotted.') # check if name belongs to hyper-parameter hyper = False for tm in self.transition_models: try: self._get_hyper_parameter_index(tm, name) hyper = True except PostProcessingError: pass # check if name belongs to transition model try: transition_model_index = self.transition_model_names.index(name) model = True except ValueError: model = False if not hyper and not model: # parameter evolution plot if t is None and self.store_history: # read additional kwargs (or set default values) color = kwargs.pop('color', 'b') gamma = kwargs.pop('gamma', 0.5) self.plot_parameter_evolution(name, color=color, gamma=gamma, **kwargs) # parameter distribution plot elif t is not None and self.store_history: self.get_parameter_distribution(t, name, plot=True, density=density, **kwargs) else: self.get_current_parameter_distribution(name, plot=True, density=density, **kwargs) elif hyper and not model: # hyper-parameter evolution plot if t is None and self.store_history: # read additional kwargs (or set default values) color = kwargs.pop('color', 'b') gamma = kwargs.pop('gamma', 0.5) self.plot_hyper_parameter_evolution(name, color=color, gamma=gamma, **kwargs) # hyper-parameter distribution plot elif t is not None and self.store_history: self.get_hyper_parameter_distribution(t, name, plot=True, **kwargs) else: self.get_current_hyper_parameter_distribution(name, plot=True, **kwargs) elif model and not hyper: # read additional kwargs (or set default values) local = kwargs.pop('local', False) # plot local transition model probabilities if local: plt.plot(self.formatted_timestamps, np.array(self.local_transition_model_sequence)[:, transition_model_index], **kwargs) # plot transition model probabilities else: plt.plot(self.formatted_timestamps, np.array(self.transition_model_sequence)[:, transition_model_index], **kwargs) else: raise PostProcessingError('Duplicate names of hyper-/parameters/transition models, cannot use "plot" ' 'method.')