#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Filters.py
# Copyright (c) 2016-2017, Alexander Winterl
#
# This file is part of PenguTrack
#
# PenguTrack is free software: you can redistribute and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the Licensem, or
# (at your option) any later version.
#
# PenguTrack is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with PenguTrack. If not, see <http://www.gnu.org/licenses/>.
"""
Module containing filter classes to be used with pengu-track detectors and models.
"""
from __future__ import print_function, division
import numpy as np
import scipy.stats as ss
import sys
import copy
from PenguTrack.Detectors import Measurement
from .Assignment import *
import scipy.integrate as integrate
# import scipy.optimize as opt
from .Detectors import array_to_measurement, array_to_pandasDF, pandasDF_to_array, pandasDF_to_measurement, measurements_to_array, measurements_to_pandasDF
import pandas
[docs]class Filter(object):
"""
This Class describes the abstract function of a filter in the pengu-track package.
It is only meant for subclassing.
Attributes
----------
Model: PenguTrack.model object
A physical model to gain predictions from data.
Measurement_Distribution: scipy.stats.distributions object
The distribution which describes measurement uncertainty.
State_Distribution: scipy.stats.distributions object
The distribution which describes state vector fluctuations.
X: dict
The time series of believes calculated for this filter. The keys equal the time stamp.
X_error: dict
The time series of errors on the corresponding believes. The keys equal the time stamp.
Predicted_X: dict
The time series of predictions made from the associated data. The keys equal the time stamp.
Predicted_X_error: dict
The time series of estimated prediction errors. The keys equal the time stamp.
Measurements: dict
The time series of measurements assigned to this filter. The keys equal the time stamp.
Controls: dict
The time series of control-vectors assigned to this filter. The keys equal the time stamp.
"""
def __init__(self, model, no_dist=False, meas_dist=ss.uniform(), state_dist=ss.uniform(), prob_update=True,
*args, **kwargs):
"""
This Class describes the abstract function of a filter in the pengu-track package.
It is only meant for subclassing.
Parameters
----------
model: PenguTrack.model object
A physical model to gain predictions from data.
meas_dist: scipy.stats.distributions object
The distribution which describes measurement uncertainty.
state_dist: scipy.stats.distributions object
The distribution which describes state vector fluctuations.
"""
self._meas_cov_ = None
self._meas_mu_ = None
self._meas_sig_1 = None
self._meas_norm_ = None
self._meas_is_gaussian_ = False
self._state_cov_ = None
self._state_mu_ = None
self._state_sig_1 = None
self._state_norm_ = None
self._state_is_gaussian_ = False
self.Model = model
self.Measurement_Distribution = meas_dist
self.State_Distribution = state_dist
self.X = {}
self.X_error = {}
self.Predicted_X = {}
self.Predicted_X_error = {}
self.Measurements = {}
self.Controls = {}
self.NoDist = bool(no_dist)
self.ProbUpdate = bool(prob_update)
[docs] def predict(self, u=None, i=None):
"""
Function to get predictions from the corresponding model. Handles time-stamps and control-vectors.
Parameters
----------
u: array_like, optional
Recent control-vector.
i: int
Recent/corresponding time-stamp
Returns
----------
u: array_like
Recent control-vector.
i: int
Recent/corresponding time-stamp.
"""
assert not (u is None and i is None), "One of control vector or time stamp must be specified"
# Generate i
if i is None:
i = max(self.Predicted_X.keys())+1
# Generate u
if u is None:
try:
u = self.Controls[i-1]
except KeyError:
u = np.zeros((self.Model.Control_dim, 1))
else:
self.Controls.update({i-1: u})
# Try to get previous x from believe
try:
x = self.X[i-1]
except KeyError:
# Try to get previous prediction
try:
x = self.Predicted_X[i-1]
except KeyError:
# Try recursive prediction from previous timesteps
if np.any([k < i for k in self.Predicted_X.keys()]) or np.any([k < i for k in self.X.keys()]) :
print('Recursive Prediction, i = %s' % i)
u_, i_ = self.predict(u=u, i=i-1)
x = self.Predicted_X[i-1]
else:
raise KeyError("Nothing to predict from. Need initial value")
# Make simplest possible Prediction (can be overwritten)
self.Predicted_X.update({i: self.Model.predict(x, u)})
return u, i
[docs] def update(self, z=None, i=None):
"""
Function to get updates to the corresponding model. Handles time-stamps and measurement-vectors.
Parameters
----------
z: PenguTrack.Measurement, optional
Recent measurement.
i: int
Recent/corresponding time-stamp
Returns
----------
z: PenguTrack.Measurement
Recent measurement.
i: int
Recent/corresponding time-stamp.
"""
assert not (z is None and i is None), "One of measurement vector or time stamp must be specified"
# Generate i
if i is None:
i = max(self.Measurements.keys())+1
# Generate z
if z is None:
try:
z = self.Measurements[i]
except KeyError:
raise KeyError("No measurement for timepoint %s." % i)
else:
if not isinstance(z, Measurement):
z = np.asarray(z).flatten()
assert self.Model.Meas_dim == len(z),\
"Measurement input shape %s is not equal to model measurement dimension %s"%(
len(z),self.Model.Meas_dim)
z = Measurement(1.0, position=z)
measurement = z.copy()
self.Measurements.update({i: measurement})
# measurement = copy(z)
# simplest possible update
# try:
# self.X.update({i: np.asarray([z.PositionX, z.PositionY, z.PositionZ])})
# except(ValueError, AttributeError):
# try:
# self.X.update({i: np.asarray([z.PositionX, z.PositionY])})
# except(ValueError, AttributeError):
# self.X.update({i: np.asarray([z.PositionX])})
#
z = np.asarray(self.Model.vec_from_meas(measurement))
# if len(self.Model.Extensions) > 0:
# z = np.asarray(np.hstack([np.asarray([measurement[v] for v in self.Model.Measured_Variables]),
# np.asarray([measurement.Data[var] for var in self.Model.Extensions])]))
# else:
# z = np.asarray([measurement[v] for v in self.Model.Measured_Variables])
self.X.update({i: z})
return measurement, i
[docs] def filter(self, u=None, z=None, i=None):
"""
Function to get predictions from the model and update the same timestep i.
Parameters
----------
u: array_like, optional
Recent control-vector.
z: PenguTrack.Measurement, optional
Recent measurement.
i: int
Recent/corresponding time-stamp
Returns
----------
u: array_like, optional
Recent control-vector.
z: PenguTrack.Measurement
Recent measurement.
i: int
Recent/corresponding time-stamp.
"""
self.predict(u=u, i=i)
x = self.update(z=z, i=i)
return x
def log_prob(self, keys=None, measurements=None, update=False):
if keys is None:
keys = self.X.keys()
if measurements is None:
try:
measurements = dict([[k,self.Measurements[k]] for k in keys])
except KeyError:
return None
prob = 0
pending_downdates = []
pending_downpredicts =[]
for k in keys:
if k in self.X and k in self.Predicted_X:
if update:
prob += self._log_prob_(k)
else:
prob += self._meas_log_prob(k, measurements[k])
elif k in self.X:
self.predict(i=k)
if update:
prob += self._log_prob_(k)
else:
prob += self._meas_log_prob(k,measurements[k])
elif k in self.Predicted_X and k in measurements:
if update:
self.update(z=measurements[k],i=k)
pending_downdates.append(k)
prob += self._log_prob_(k)
else:
prob += self._meas_log_prob(k, measurements[k])
elif k in self.Predicted_X and k in self.Measurements:
if update:
self.update(z=self.Measurements[k],i=k)
pending_downdates.append(k)
prob += self._log_prob_(k)
else:
prob += self._meas_log_prob(k)
elif k in measurements:
self.predict(i=k)
pending_downpredicts.append(k)
if update:
self.update(z=measurements[k], i=k)
pending_downdates.append(k)
prob += self._log_prob_(k)
else:
prob += self._meas_log_prob(k, measurements[k])
elif k in self.Measurements:
self.predict(i=k)
pending_downpredicts.append(k)
if update:
self.update(z=self.Measurements[k], i=k)
pending_downdates.append(k)
prob += self._log_prob_(k)
else:
prob += self._meas_log_prob(k)
else:
raise ValueError("Probability for key %s could not be calculated! (missing states)"%k)
for k in pending_downdates:
self.downdate(k)
for k in pending_downpredicts:
self.unpredict(k)
return prob
def _log_prob_(self, key):
return self._state_log_prob_(key)
def _state_log_prob_(self, key):
if self.NoDist:
# return np.linalg.norm(self.X[key]-self.Predicted_X[key])
return np.linalg.norm(self.Model.pos_from_state(self.X[key])-self.Model.pos_from_state(self.Predicted_X[key]))
if self._state_is_gaussian_:
x = self.X[key]-self.Predicted_X[key] - self._state_mu_[:, None]
return float(-np.dot(x.T, np.dot(self._state_sig_1**2, x)))
# return float(self._state_norm_ - 0.5 * np.dot(x.T, np.dot(self._state_sig_1, x)))
# return float(self._state_norm_ - 0.5 * np.dot(x.T, np.dot(self._state_sig_1, x)))
return self.State_Distribution.logcdf((self.X[key]-self.Predicted_X[key]).T)
# return self.State_Distribution.logpdf((self.X[key]-self.Predicted_X[key]).T)
def _meas_log_prob(self, key, measurement=None):
if measurement is None:
measurement = self.Measurements[key]
if self.NoDist:
# return np.linalg.norm(self.Model.vec_from_meas(measurement)-self.Model.measure(self.Predicted_X[key]))
return np.linalg.norm(self.Model.pos_from_measurement(measurement)-self.Model.pos_from_state(self.Predicted_X[key]))
if self._meas_is_gaussian_:
x = self.Model.vec_from_meas(measurement)-self.Model.measure(self.Predicted_X[key]) - self._meas_mu_[:,None]
return float(- np.dot(x.T, np.dot(self._meas_sig_1**2, x)/self.Model.Meas_dim))
# return float(self._meas_norm_ -0.5*np.dot(x.T, np.dot(self._meas_sig_1, x)))
# return float(self._meas_norm_ -0.5*np.dot(x.T, np.dot(self._meas_sig_1, x)))
return self.Measurement_Distribution.logcdf((self.Model.vec_from_meas(measurement)-self.Model.measure(self.Predicted_X[key])).T)
# return self.Measurement_Distribution.logpdf((self.Model.vec_from_meas(measurement)-self.Model.measure(self.Predicted_X[key])).T)
def __setattr__(self, key, value):
if key == "Measurement_Distribution":
if isinstance(value, ss._multivariate.multivariate_normal_frozen):
self._meas_cov_ = value.cov
self._meas_mu_ = value.mean
self._meas_sig_1 = np.linalg.inv(value.cov)
self._meas_norm_ = -0.5*np.log(((2*np.pi)**max(value.cov.shape)*np.linalg.det(self._meas_cov_)))
self._meas_is_gaussian_ = True
if key == "State_Distribution":
if isinstance(value, ss._multivariate.multivariate_normal_frozen):
self._state_cov_ = value.cov
self._state_mu_ = value.mean
self._state_sig_1 = np.linalg.inv(value.cov)
self._state_norm_ = -0.5*np.log((2*np.pi)**max(value.cov.shape)*np.linalg.det(self._state_cov_))
self._state_is_gaussian_ = True
super(Filter, self).__setattr__(key, value)
# def _log_prob_(self, key):
# measurement = self.Measurements[key]
# try:
# position = np.asarray([measurement.PositionX,
# measurement.PositionY,
# measurement.PositionZ])
# except (ValueError, AttributeError):
# try:
# position = np.asarray([measurement.PositionX,
# measurement.PositionY])
# except (ValueError, AttributeError):
# position = np.asarray([measurement.PositionX])
# return self.Measurement_Distribution.logpdf(position-self.Model.measure(self.X[key]))
[docs] def downfilter(self, t=None):
"""
Function erases the timestep t from the class dictionaries (Measurement, X, Preditcion)
Parameters
----------
t: int, optional
Time-steps for which filtering should be erased.
"""
# Generate t
if t is None:
t = max([max(self.X.keys()),
max(self.Predicted_X.keys()),
max(self.Measurements.keys()),
self.Controls.keys()])
# Remove all entries for this timestep
self.Measurements.pop(t, None)
self.Controls.pop(t, None)
self.X.pop(t, None)
self.X_error.pop(t, None)
self.Predicted_X.pop(t, None)
self.Predicted_X_error.pop(t, None)
[docs] def downdate(self, t=None):
"""
Function erases the time-step t from the Measurements and Believes.
Parameters
----------
t: int, optional
Time-steps for which filtering should be erased.
"""
# Generate t
if t is None:
t = max(self.X.keys())
# Remove believe and Measurement entries for this timestep
self.X.pop(t, None)
self.X_error.pop(t, None)
self.Measurements.pop(t, None)
[docs] def unpredict(self, t=None):
"""
Function erases the time-step t from the Believes, Predictions and Controls.
Parameters
----------
t: int, optional
Time-steps for which filtering should be erased.
"""
# Generate t
if t is None:
t = max(self.Predicted_X.keys())
# Remove believe and Prediction entries for this timestep
self.Controls.pop(t, None)
self.X.pop(t, None)
self.X_error.pop(t, None)
self.Predicted_X.pop(t, None)
self.Predicted_X_error.pop(t, None)
[docs] def fit(self, u, z):
"""
Function to auto-evaluate all measurements z with control-vectors u and starting probability p.
It returns the believed values x, the corresponding probabilities p and the predictions x_tilde.
Parameters
----------
u: array_like
List of control-vectors.
z: array_like
List of measurement-vectors.
"""
u = np.asarray(u)
z = np.asarray(z)
assert u.shape[0] == z.shape[0]
for i in range(z.shape[0]):
self.predict(u=u[i], i=i+1)
self.update(z=z[i], i=i+1)
print(self.log_prob())
return np.asarray(self.X.values(), dtype=float),\
np.asarray(self.X_error.values(), dtype=float),\
np.asarray(self.Predicted_X.values(), dtype=float),\
np.asarray(self.Predicted_X_error.values(), dtype=float)
def cost_from_logprob(self, log_prob, **kwargs):
return cost_from_logprob(log_prob, **kwargs)
[docs]class KalmanBaseFilter(Filter):
"""
This Class describes a kalman-filter in the pengu-track package. It calculates actual believed values from
predictions and measurements.
Attributes
----------
Model: PenguTrack.model object
A physical model to gain predictions from data.
Measurement_Distribution: scipy.stats.distributions object
The distribution which describes measurement uncertainty.
State_Distribution: scipy.stats.distributions object
The distribution which describes state vector fluctuations.
X: dict
The time series of believes calculated for this filter. The keys equal the time stamp.
X_error: dict
The time series of errors on the corresponding believes. The keys equal the time stamp.
Predicted_X: dict
The time series of predictions made from the associated data. The keys equal the time stamp.
Predicted_X_error: dict
The time series of estimated prediction errors. The keys equal the time stamp.
Measurements: dict
The time series of measurements assigned to this filter. The keys equal the time stamp.
Controls: dict
The time series of control-vectors assigned to this filter. The keys equal the time stamp.
A: np.asarray
State-Transition-Matrix, describing the evolution of the state without fluctuation.
Received from the physical Model.
B: np.asarray
State-Control-Matrix, describing the influence of external-control on the states.
Received from the physical Model.
C: np.asarray
Measurement-Matrix , describing the projection of the measurement-vector from the state-vector.
Received from the physical Model.
G: np.asarray
Evolution-Matrix, describing the evolution of state vectors by fluctuations from the state-distribution.
Received from the physical Model.
Q: np.asarray
Covariance matrix (time-evolving) for the state-distribution.
Q_0: np.asarray
Covariance matrix (initial state) for the state-distribution.
R: np.asarray
Covariance matrix (time-evolving) for the measurement-distribution.
R_0: np.asarray
Covariance matrix (initial state) for the measurement-distribution.
P_0: np.asarray
Covariance matrix (initial state) for the believe-distribution.
"""
def __init__(self, model, evolution_variance, measurement_variance, **kwargs):
"""
This Class describes a kalman-filter in the pengu-track package. It calculates actual believed values from
predictions and measurements.
Parameters
----------
model: PenguTrack.model object
A physical model to gain predictions from data.
evolution_variance: array_like
Vector containing the estimated variances of the state-vector-entries.
measurement_variance: array_like
Vector containing the estimated variances of the measurement-vector-entries.
"""
self.Model = model
evolution_variance = np.asarray(evolution_variance, dtype=float)
if evolution_variance.shape == (1,):
evolution_variance = np.diag(np.ones(int(self.Model.Evolution_dim))*evolution_variance)
# if evolution_variance.shape != (int(self.Model.Evolution_dim),):
# evolution_variance = np.ones(self.Model.Evolution_dim) * np.mean(evolution_variance)
self.Evolution_Variance = evolution_variance
self.Q = evolution_variance
self.Q_0 = evolution_variance
measurement_variance = np.asarray(measurement_variance, dtype=float)
if measurement_variance.shape == (1,):
measurement_variance = np.diag(np.ones(int(self.Model.Meas_dim))*measurement_variance)
# if measurement_variance.shape != (int(self.Model.Meas_dim),):
# measurement_variance = np.ones(self.Model.Meas_dim) * np.mean(measurement_variance)
self.Measurement_Variance = measurement_variance
self.R = measurement_variance
self.R_0 = measurement_variance
self.A = self.Model.State_Matrix
self.B = self.Model.Control_Matrix
self.C = self.Model.Measurement_Matrix
self.G = self.Model.Evolution_Matrix
self.K = None
p = np.dot(np.dot(self.C.T, self.R_0), self.C) + \
np.dot(self.A, np.dot(np.dot(np.dot(self.G, self.Q), self.G.T), self.A.T))#np.diag(np.ones(self.Model.State_dim) * max(measurement_variance))
self.P_0 = p
kwargs.update(dict(meas_dist=ss.multivariate_normal(cov=self.R),
state_dist=ss.multivariate_normal(cov=self.P_0)))
super(KalmanBaseFilter, self).__init__(model, **kwargs)
self.X_error.update({0: p})
self.Predicted_X_error.update({0: p})
def _handle_state_(self, i, allow_predict=False, return_prediction=False):
if i in self.X and not return_prediction:
return self.X[i]
elif i in self.Predicted_X:
return self.Predicted_X[i]
elif allow_predict:
u, i = self.predict(i=i)
return self.Predicted_X[i]
else:
if return_prediction:
raise KeyError("No entry in prediction memory for timestamp %s" % i)
else:
raise KeyError("No entry in prediction or state memory for timestamp %s" % i)
def _handle_error_(self, i, allow_predict=False, return_prediction=False):
if i in self.X_error and not return_prediction:
return self.X_error[i]
elif i in self.Predicted_X_error:
return self.Predicted_X_error[i]
elif allow_predict:
u, i = self.predict(i=i)
return self.Predicted_X_error[i]
else:
if return_prediction:
raise KeyError("No entry in prediction memory for timestamp %s" % i)
else:
return self.P_0
[docs]class KalmanFilter(KalmanBaseFilter):
# def __init__(self, model, evolution_variance, measurement_variance, **kwargs):
# """
# This Class describes a kalman-filter in the pengu-track package. It calculates actual believed values from
# predictions and measurements.
#
# Parameters
# ----------
# model: PenguTrack.model object
# A physical model to gain predictions from data.
# evolution_variance: array_like
# Vector containing the estimated variances of the state-vector-entries.
# measurement_variance: array_like
# Vector containing the estimated variances of the measurement-vector-entries.
# """
# super(KalmanFilter, self).__init__(model, evolution_variance, measurement_variance, **kwargs)
[docs] def predict(self, u=None, i=None):
"""
Function to get predictions from the corresponding model. Handles time-stamps and control-vectors.
Parameters
----------
u: array_like, optional
Recent control-vector.
i: int
Recent/corresponding time-stamp
Returns
----------
u: array_like
Recent control-vector.
i: int
Recent/corresponding time-stamp.
"""
u, i = super(KalmanFilter, self).predict(u=u, i=i)
x = self._handle_state_(i-1)
p = self._handle_error_(i-1)
p_ = np.dot(np.dot(self.A, p), self.A.T) + np.dot(np.dot(self.G, self.Q), self.G.T)
x_ = np.dot(self.A, x) + np.dot(self.B, u)
self.Predicted_X.update({i: x_})
self.Predicted_X_error.update({i: p_})
return u, i
[docs] def update(self, z=None, i=None):
"""
Function to get updates to the corresponding model. Handles time-stamps and measurement-vectors.
Parameters
----------
z: PenguTrack.Measurement, optional
Recent measurement.
i: int
Recent/corresponding time-stamp
Returns
----------
z: PenguTrack.Measurement
Recent measurement.
i: int
Recent/corresponding time-stamp.
"""
z, i = super(KalmanFilter, self).update(z=z, i=i)
measurement = copy(z)
z = np.asarray(self.Model.vec_from_meas(measurement))
x = self._handle_state_(i, allow_predict=True, return_prediction=True)
p = self._handle_error_(i, allow_predict=True, return_prediction=True)
try:
k = np.dot(np.dot(p, self.C.transpose()),
np.linalg.inv(np.dot(np.dot(self.C, p), self.C.transpose()) + self.R))
self.K = k
except np.linalg.LinAlgError:
e = sys.exc_info()[1]
print(p)
raise np.linalg.LinAlgError(e)
y = z - np.dot(self.C, x)
x_ = x + np.dot(k, y)
p_ = p - np.dot(np.dot(k, self.C), p)
self.X.update({i: x_})
self.X_error.update({i: p_})
return measurement, i
[docs]class AdaptedKalmanFilter(KalmanBaseFilter):
def __init__(self, model, evolution_variance, measurement_variance, **kwargs):
"""
This Class describes a kalman-filter in the pengu-track package. It calculates actual believed values from
predictions and measurements.
Parameters
----------
model: PenguTrack.model object
A physical model to gain predictions from data.
"""
super(AdaptedKalmanFilter, self).__init__(model, evolution_variance, measurement_variance, **kwargs)
self.Y = {}
[docs] def predict(self, u=None, i=None):
"""
Function to get predictions from the corresponding model. Handles time-stamps and control-vectors.
Parameters
----------
u: array_like, optional
Recent control-vector.
i: int
Recent/corresponding time-stamp
Returns
----------
u: array_like
Recent control-vector.
i: int
Recent/corresponding time-stamp.
"""
u, i = super(AdaptedKalmanFilter, self).predict(u=u, i=i)
x = self._handle_state_(i-1)
p = self._handle_error_(i-1)
p_ = np.dot(np.dot(self.A, p), self.A.T) + np.dot(np.dot(self.G, self.Q), self.G.T)
x_ = np.dot(self.A, x) + np.dot(self.B, u)
self.Predicted_X.update({i: x_})
self.Predicted_X_error.update({i: p_})
return u, i
[docs] def update(self, z=None, i=None):
"""
Function to get updates to the corresponding model. Handles time-stamps and measurement-vectors.
Parameters
----------
z: PenguTrack.Measurement, optional
Recent measurement.
i: int
Recent/corresponding time-stamp
Returns
----------
z: PenguTrack.Measurement
Recent measurement.
i: int
Recent/corresponding time-stamp.
"""
z, i = super(AdaptedKalmanFilter, self).update(z=z, i=i)
measurement = copy(z)
z = np.asarray(self.Model.vec_from_meas(measurement))
x = self._handle_state_(i, allow_predict=True, return_prediction=True)
p = self._handle_error_(i, allow_predict=True, return_prediction=True)
y = z - np.dot(self.C, x)
self.Y.update({i: y})
sig = y[:, None] * y[None, :]
alpha = np.trace(sig)/np.trace(np.dot(np.dot(self.C, p), self.C.T) + self.R)
if alpha > 1:
alpha = 1
# elif alpha<0:
# alpha = 0
print(alpha)
try:
k = (1./alpha)*np.dot(np.dot(p, self.C.transpose()),
np.linalg.inv(np.dot(np.dot(self.C * (1./alpha), p), self.C.transpose()) + self.R))
self.K = k
except np.linalg.LinAlgError:
e = sys.exc_info()[1]
print(p)
raise np.linalg.LinAlgError(e)
x_ = x + np.dot(k, y)
p_ = (p - np.dot(np.dot(k, self.C), p))*(1./alpha)
self.X.update({i: x_})
self.X_error.update({i: p_})
return measurement, i
[docs]class AdvancedKalmanFilter(KalmanFilter):
"""
This Class describes a advanced, self tuning version of the kalman-filter in the pengu-track package.
It calculates actual believed values from predictions and measurements.
Attributes
----------
Model: PenguTrack.model object
A physical model to gain predictions from data.
Lag: int
The number of state-vectors, which are taken into acount for the self-tuning algorithm.
Measurement_Distribution: scipy.stats.distributions object
The distribution which describes measurement uncertainty.
State_Distribution: scipy.stats.distributions object
The distribution which describes state vector fluctuations.
X: dict
The time series of believes calculated for this filter. The keys equal the time stamp.
X_error: dict
The time series of errors on the corresponding believes. The keys equal the time stamp.
Predicted_X: dict
The time series of predictions made from the associated data. The keys equal the time stamp.
Predicted_X_error: dict
The time series of estimated prediction errors. The keys equal the time stamp.
Measurements: dict
The time series of measurements assigned to this filter. The keys equal the time stamp.
Controls: dict
The time series of control-vectors assigned to this filter. The keys equal the time stamp.
A: np.asarray
State-Transition-Matrix, describing the evolution of the state without fluctuation.
Received from the physical Model.
B: np.asarray
State-Control-Matrix, describing the influence of external-control on the states.
Received from the physical Model.
C: np.asarray
Measurement-Matrix , describing the projection of the measurement-vector from the state-vector.
Received from the physical Model.
G: np.asarray
Evolution-Matrix, describing the evolution of state vectors by fluctuations from the state-distribution.
Received from the physical Model.
Q: np.asarray
Covariance matrix (time-evolving) for the state-distribution.
Q_0: np.asarray
Covariance matrix (initial state) for the state-distribution.
R: np.asarray
Covariance matrix (time-evolving) for the measurement-distribution.
R_0: np.asarray
Covariance matrix (initial state) for the measurement-distribution.
P_0: np.asarray
Covariance matrix (initial state) for the believe-distribution.
"""
def __init__(self, *args, **kwargs):
"""
This Class describes a advanced, self tuning version of the kalman-filter in the pengu-track package.
It calculates actual believed values from predictions and measurements.
Parameters
----------
model: PenguTrack.model object
A physical model to gain predictions from data.
evolution_variance: array_like
Vector containing the estimated variances of the state-vector-entries.
measurement_variance: array_like
Vector containing the estimated variances of the measurement-vector-entries.
"""
super(AdvancedKalmanFilter, self).__init__(*args, **kwargs)
self.Lag = -1 * int(kwargs.pop('lag', -1))
self._is_obs_ = self.is_observable()
# def predict(self, *args, **kwargs):
# """
# Function to get predictions from the corresponding model. Handles time-stamps and control-vectors.
#
# Parameters
# ----------
# u: array_like, optional
# Recent control-vector.
# i: int
# Recent/corresponding time-stamp
#
# Returns
# ----------
# u: array_like
# Recent control-vector.
# i: int
# Recent/corresponding time-stamp.
# """
# if self.Lag == 1 or (-1 * self.Lag > len(self.Predicted_X.keys())):
# lag = 0
# else:
# lag = self.Lag
# self.Q = np.dot(np.dot(self.G.T, np.cov(np.asarray(self.Predicted_X.values()[lag:]).T)), self.G)
# print("Q at %s"%self.Q)
# if np.any(np.isnan(self.Q)):# or np.any(np.linalg.eigvals(self.Q) < np.diag(self.Q_0)):
# self.Q = self.Q_0
# # print("Q at %s"%self.Q)
# return super(AdvancedKalmanFilter, self).predict(*args, **kwargs)
[docs] def update(self, *args, **kwargs):
"""
Function to get updates to the corresponding model. Handles time-stamps and measurement-vectors.
Parameters
----------
z: PenguTrack.Measurement, optional
Recent measurement.
i: int
Recent/corresponding time-stamp
Returns
----------
z: PenguTrack.Measurement
Recent measurement.
i: int
Recent/corresponding time-stamp.
"""
z, i = super(AdvancedKalmanFilter, self).update(*args, **kwargs)
frames = set(self.Measurements.keys()).intersection(self.Predicted_X.keys())
if self._is_obs_ and len(frames) >= self.Model.State_dim:
frames = sorted(frames)[-self.Model.State_dim:]
print("prev", np.diag(self.Q), np.diag(self.R))
cor_vals = np.asarray([self.Model.vec_from_meas(self.Measurements[f])-self.Model.measure(self.Predicted_X[f])
for f in frames])
cor = np.mean([cor_vals[-i, None, :] * cor_vals[-i, :, None] for i in range(self.Model.State_dim)], axis=0)[:,:,0]
# cor = self.COR(cor_vals, max_size=self.Model.State_dim)
self.R = cor - np.dot(np.dot(self.C, self.X_error[i]), self.C.T)
if np.any(np.diag(self.R)<0):
if np.any(np.diag(self.R)>0):
v, w = np.linalg.eig(self.R)
if np.any(v<0):
self.R = np.dot(np.dot(w,np.abs(np.diag(v))), np.linalg.inv(w))
else:
self.R *= -1.
# self.R = np.abs(self.R)
self.Q = np.dot(np.dot(np.dot(np.dot(self.G.T, self.K), cor), self.K.T), self.G)
print("post", np.diag(self.Q), np.diag(self.R))
# dif = np.asarray([np.dot(self.C, np.asarray(self.X.get(k, None)).T).T
# - np.asarray([self.Measurements[k].PositionX,
# self.Measurements[k].PositionY]) for k in self.Measurements.keys()])
# self.R = np.cov(dif.T)
# if np.any(np.isnan(self.R)) or np.any(np.linalg.eigvals(self.R) < np.diag(self.R_0)):
# self.R = self.R_0
# print("R at %s"%self.R)
return z, i
# def optimize(self):
# n = len(self.A)
# M_0 =
# K0 = np.dot(np.dot(M_0, self.C), np.linalg.inv(np.dot(self.C, np.dot(M_0, self.C.T)) - self.R))
# K = np.dot(np.dot(p, self.C.transpose()),
# np.linalg.inv(np.dot(np.dot(self.C, p), self.C.transpose()) + self.R))
# AIKC = np.dot(self.A, np.diag(np.ones(n)) - np.dot(K, self.C))
# aikc = {0: np.diag(np.ones(n))}
# for i in range(2, n):
# aikc.update({i: np.dot(AIKC, aikc[i-1])})
# A = np.vstack([np.dot(self.C, np.dot(aikc[i], self.A)) for i in range(n)])
# A_cross = np.dot(np.linalg.inv(np.dot(A.T, A)), A.T)
#
# frames = set(self.X.keys()).intersection(self.Predicted_X.keys())
# deltas = np.asarray([self.X[f]-self.Predicted_X[f] for f in frames])
# C = np.vstack([c for c in self.COR(deltas, max_size=n)])
#
# MH_T = np.dot(A_cross, C)
# # B = np.dot(np.vstack([np.dot(self.C, A[i]) for i in range(n)]), self.A)
# # B_cross = np.dot(np.linalg.inv(np.dot(B.T, B)), B.T)
def is_observable(self):
n = len(self.A)
A = {0: np.diag(np.ones(n)), 1: self.A}
for i in range(2, n):
A.update({i: np.dot(self.A, A[i-1])})
one = np.hstack([np.dot(self.C, A[i]).T for i in range(n)])
two = np.hstack([np.dot(A[i], self.G) for i in range(n)])
return (np.linalg.matrix_rank(one) == n) & (np.linalg.matrix_rank(two) == n)
def is_optimal(self):
frames = set(self.X.keys()).intersection(self.Predicted_X.keys())
deltas = np.asarray([self.X[f]-self.Predicted_X[f] for f in frames])
P = self.ACOR(deltas)
N = len(frames)
return np.all(np.sum([np.abs(np.diag(PP[:,:,0])) > (1.96/N**2) for PP in P[1:]], axis=0) < (0.05*N))
def COR(self, vals, max_size=-1):
vals = np.asarray(vals)
if max_size < 0:
return np.asarray([np.sum(vals[f:, None, :]*vals[f:, :, None], axis=0) for f in range(len(vals))])
else:
return np.asarray([np.sum(vals[f:, None, :]*vals[f:, :, None], axis=0) for f in range(max_size)])
def ACOR(self, vals):
c = self.COR(vals)
cc = (np.diag(c[0,:,:,0])[None,:]*np.diag(c[0,:,:,0])[:,None])**0.5
return c/cc
[docs]class ParticleFilter(Filter):
"""
This Class describes a particle-filter in the pengu-track package.
It calculates actual believed values from predictions and measurements.
Attributes
----------
Model: PenguTrack.model object
A physical model to gain predictions from data.
N: int
The number of particles.
Particles: dict
The current particle state-vectors.
Weights: dict
Weights for every particle. Calculated from probability.
Measurement_Distribution: scipy.stats.distributions object
The distribution which describes measurement uncertainty.
State_Distribution: scipy.stats.distributions object
The distribution which describes state vector fluctuations.
X: dict
The time series of believes calculated for this filter. The keys equal the time stamp.
X_error: dict
The time series of errors on the corresponding believes. The keys equal the time stamp.
Predicted_X: dict
The time series of predictions made from the associated data. The keys equal the time stamp.
Predicted_X_error: dict
The time series of estimated prediction errors. The keys equal the time stamp.
Measurements: dict
The time series of measurements assigned to this filter. The keys equal the time stamp.
Controls: dict
The time series of control-vectors assigned to this filter. The keys equal the time stamp.
"""
def __init__(self, model, n=100, meas_dist=ss.uniform(), state_dist=ss.uniform()):
"""
This Class describes a particle-filter in the pengu-track package.
It calculates actual believed values from predictions and measurements.
Parameters
----------
model: PenguTrack.model object
A physical model to gain predictions from data.
n: int, optional
The number of particles.
meas_dist: scipy.stats.distributions object
The distribution which describes measurement uncertainty.
state_dist: scipy.stats.distributions object
The distribution which describes state vector fluctuations.
"""
super(ParticleFilter, self).__init__(model, state_dist=state_dist, meas_dist=meas_dist)
self.N = n
self.Particles = {}
self.Weights = {}
[docs] def predict(self, u=None, i=None):
"""
Function to get predictions from the corresponding model. Handles time-stamps and control-vectors.
Parameters
----------
u: array_like, optional
Recent control-vector.
i: int
Recent/corresponding time-stamp
Returns
----------
u: array_like
Recent control-vector.
i: int
Recent/corresponding time-stamp.
"""
u, i = super(ParticleFilter, self).predict(u=u, i=i)
for j in self.Particles.keys():
mean = self.Model.predict(self.Particles[j], u)
self.Particles.update({j: self.Model.evolute(self.State_Distribution.rvs()) + mean})
self.Predicted_X.update({i: np.mean(self.Particles.values(), axis=0)})
self.Predicted_X_error.update({i: np.std(self.Particles.values(), axis=0)})
return u, i
[docs] def update(self, z=None, i=None):
"""
Function to get updates to the corresponding model. Handles time-stamps and measurement-vectors.
Parameters
----------
z: PenguTrack.Measurement, optional
Recent measurement.
i: int
Recent/corresponding time-stamp
Returns
----------
z: PenguTrack.Measurement
Recent measurement.
i: int
Recent/corresponding time-stamp.
"""
z, i = super(ParticleFilter, self).update(z=z, i=i)
measurement = copy.copy(z)
z = np.asarray([z.PositionX, z.PositionY])
for j in range(self.N):
self.Weights.update({j: self.Measurement_Distribution.logpdf(z-self.Model.measure(self.Particles[j]))})
weights = self.Weights.values()
w_min = np.amin(weights)
w_max = np.amax(weights)
if w_max > w_min:
weights = (weights - w_min)
weights = np.cumsum(np.exp(weights))
weights = weights/weights[-1]
print("Standard")
else:
weights = np.cumsum(weights)
weights = weights/weights[-1]
print("Workaround")
idx = np.sum(np.asarray(np.tile(weights, self.N).reshape((self.N, -1)) <
np.tile(np.random.rand(self.N), self.N).reshape((self.N, -1)).T, dtype=int), axis=1)
print(len(set(idx)))
values = self.Particles.values()
for k, j in enumerate(idx):
try:
self.Particles.update({k: values[j]})
except IndexError:
self.Particles.update({k: values[j-1]})
self.X.update({i: np.mean(self.Particles.values(), axis=0)})
self.X_error.update({i: np.std(self.Particles.values(), axis=0)})
return measurement, i
[docs]class MultiFilter(Filter):
"""
This Class describes a filter, which is capable of assigning measurements to tracks, which again are represented by
sub-filters. The type of these can be specified, as well as a physical model for predictions. With these objects it
is possible to assign possibilities to combinations of measurement and prediction.
Attributes
----------
Model: PenguTrack.model object
A physical model to gain predictions from data.
Filter_Class: PenguTrack.Filter object
A Type of Filter from which all subfilters should be built.
Filters: dict
Dictionary containing all sub-filters as PenguTrack.Filter objects.
Active_Filters: dict
Dictionary containing all sub-filters, which are currently updated.
FilterThreshold: int
Number of time steps, before a filter is set inactive.
LogProbabilityThreshold: float
Threshold, under which log-probabilities are concerned negligible.
filter_args: list
Filter-Type specific arguments from the Multi-Filter initialisation can be stored here.
filter_kwargs: dict
Filter-Type specific keyword-arguments from the Multi-Filter initialisation can be stored here.
Measurement_Distribution: scipy.stats.distributions object
The distribution which describes measurement uncertainty.
State_Distribution: scipy.stats.distributions object
The distribution which describes state vector fluctuations.
X: dict
The time series of believes calculated for this filter. The keys equal the time stamp.
X_error: dict
The time series of errors on the corresponding believes. The keys equal the time stamp.
Predicted_X: dict
The time series of predictions made from the associated data. The keys equal the time stamp.
Predicted_X_error: dict
The time series of estimated prediction errors. The keys equal the time stamp.
Measurements: dict
The time series of measurements assigned to this filter. The keys equal the time stamp.
Controls: dict
The time series of control-vectors assigned to this filter. The keys equal the time stamp.
"""
def __init__(self, _filter, model, *args, **kwargs):
"""
This Class describes a filter, which is capable of assigning measurements to tracks, which again are represented by
sub-filters. The type of these can be specified, as well as a physical model for predictions. With these objects it
is possible to assign possibilities to combinations of measurement and prediction.
Sub-filter specific arguments are handles by *args and **kwargs.
Parameters
----------
model: PenguTrack.model object
A physical model to gain predictions from data.
meas_dist: scipy.stats.distributions object
The distribution which describes measurement uncertainty.
state_dist: scipy.stats.distributions object
The distribution which describes state vector fluctuations.
"""
super(MultiFilter, self).__init__(model)
self.Filter_Class = _filter
self.Filters = {}
self.ActiveFilters = {}
self.FilterThreshold = 3
# TODO: rename to CostThreshold
self.LogProbabilityThreshold = -np.inf#0.99#-200#-18.
self.MeasurementProbabilityThreshold = 0.05#0.99#-200#-18.
self.AssignmentProbabilityThreshold = 0.02#0.99#-200#-18.
self.filter_args = args
self.filter_kwargs = kwargs
self.CriticalIndex = None
self.Probability_Gain = {}
self.Probability_Gain_Dicts = {}
self.Probability_Assignment_Dicts = {}
self.ProbUpdate = kwargs.get("prob_update", False)
[docs] def predict(self, u=None, i=None):
"""
Function to get predictions from the corresponding sub-filter models. Handles time-stamps and control-vectors.
Parameters
----------
u: array_like, optional
Recent control-vector.
i: int
Recent/corresponding time-stamp
Returns
----------
u: array_like
Recent control-vector.
i: int
Recent/corresponding time-stamp.
"""
assert not (u is None and i is None), "One of control vector or time stamp must be specified"
# for j in list(self.ActiveFilters.keys()):
# _filter = self.ActiveFilters[j]
# if np.amax(list(_filter.Predicted_X.keys()))-np.amax(list(_filter.X.keys())) >= self.FilterThreshold:
# self.ActiveFilters.pop(j)
# print("Stopped track %s in frame %s with no updates for %s frames"%(j, i, self.FilterThreshold))
predicted_x = {}
predicted_x_error = {}
for j in self.ActiveFilters.keys():
self.ActiveFilters[j].predict(u=u, i=i)
if i in self.ActiveFilters[j].Predicted_X.keys():
predicted_x.update({j: self.ActiveFilters[j].Predicted_X[i]})
predicted_x_error.update({j: self.ActiveFilters[j].Predicted_X_error[i]})
self.Predicted_X.update({i: predicted_x})
self.Predicted_X_error.update({i: predicted_x_error})
return u, i
[docs] def initial_update(self, z, i):
"""
Function to get updates to the corresponding model in the first time step.
Handles time-stamps and measurement-vectors.
This function also handles the assignment of all incoming measurements to the active sub-filters.
Parameters
----------
z: list of PenguTrack.Measurement objects
Recent measurements.
i: int
Recent/corresponding time-stamp.
Returns
----------
z: list of PenguTrack.Measurement objects
Recent measurements.
i: int
Recent/corresponding time-stamp.
"""
print("Initial Filter Update")
if len(z)<1:
raise ValueError("No Measurements found!")
measurements = list(z)
z = np.asarray([self.Model.vec_from_meas(m) for m in measurements], dtype=float)
M = z.shape[0]
for j in range(M):
_filter = self.Filter_Class(self.Model, *self.filter_args, **self.filter_kwargs)
inferred_state = _filter.Model.infer_state(z[j])
_filter.Predicted_X.update({i: inferred_state})
_filter.X.update({i: inferred_state})
_filter.Measurements.update({i: measurements[j]})
try:
J = max(self.Filters.keys()) + 1
except ValueError:
J = 0
self.ActiveFilters.update({J: _filter})
self.Filters.update({J: _filter})
[docs] def update(self, z=None, i=None, big_jumps=False, verbose=False):
"""
Function to get updates to the corresponding model. Handles time-stamps and measurement-vectors.
This function also handles the assignment of all incoming measurements to the active sub-filters.
Parameters
----------
z: list of PenguTrack.Measurement objects
Recent measurements.
i: int
Recent/corresponding time-stamp.
Returns
----------
z: list of PenguTrack.Measurement objects
Recent measurements.
i: int
Recent/corresponding time-stamp.
"""
assert not (z is None and i is None), "One of measurement vector or time stamp must be specified"
# convert from pandas to measurement objects if necessary
if not isinstance(z, np.ndarray) and isinstance(z, pandas.DataFrame):
measurements = pandasDF_to_measurement(z)
# self.Measurements.update({i: measurements})
elif not isinstance(z, np.ndarray) and isinstance(z[0], Measurement):
measurements = list(z)
# self.Measurements.update({i:measurements})
# meas_logp = np.asarray([m.Log_Probability for m in z])
# if isinstance(z, tuple) and len(z)==2:
# z = z[0]
# z = np.asarray([self.Model.vec_from_meas(m) for m in measurements], ndmin=2)
elif isinstance(z, np.ndarray):
measurements = array_to_measurement(z, dim=self.Model.Meas_dim)
# if len(z.shape) == 2:
# z = z[:, :, None]
# meas_logp = np.ones(z.shape[0])
# measurements = [Measurement(1, pos) for pos in z]
# self.Measurements.update({i: measurements})
else:
raise ValueError("Input Positions are not of type array or pengutrack measurement!")
# TODO: zjzibki7u
self.Measurements.update({i: measurements})
# meas_logp = measurements.Log_Probability
meas_logp = np.asarray([m.Log_Probability for m in measurements])
if verbose:
print("Total number of detections: %d, valid detections: %d"%(len(meas_logp),(~np.isneginf(meas_logp)).sum()))
# z = self.Model.vec_from_pandas(measurements)
z = np.asarray([self.Model.vec_from_meas(m) for m in measurements])
mask = ~np.isneginf(meas_logp)
if not np.all(~mask):
# meas_logp[~mask] = np.nanmin(meas_logp[mask])
# mask &= (meas_logp - np.nanmin(meas_logp) >=
# (self.MeasurementProbabilityThreshold * (np.nanmax(meas_logp) - np.nanmin(meas_logp)))).astype(bool)
z = z[mask]
# measurements = list(np.asarray(measurements)[mask])
measurements = np.asarray(measurements)[mask]
else:
# TODO: will ich das wirklich machen?
self.Measurements.pop(i, None)
return measurements, i
M = z.shape[0]
N = len(dict(self.ActiveFilters))
# First frame or no active tracks: initialize tracker
if N == 0 and M > 0:
self.initial_update(measurements, i)
return measurements, i
# Dictionary to keep correspondence between track number and matrix entry
gain_dict = dict(enumerate(self.ActiveFilters.keys()))
# Start with neginf as default for prob gain (cost) matrix
# probability_gain = np.ones((max(M, N), M)) * -np.inf
probability_gain = np.ones((N, M)) * -np.inf
# iter over tracks and corresponing matrix indices
for j, k in gain_dict.items():
# iter over measurements and corresponding indices
for m, meas in enumerate(measurements):
# built (cost)matrix entry for every entry
probability_gain[j, m] = self.ActiveFilters[k].log_prob(keys=[i],
measurements={i: meas},
update=self.ProbUpdate)
# If LogProbThreshold at neg inf set it to one under matrix minimum
if self.LogProbabilityThreshold == -np.inf:
LogProbabilityThreshold = np.nextafter(np.nanmin(probability_gain), -np.inf)
else:
LogProbabilityThreshold = self.LogProbabilityThreshold
if not np.all(np.isneginf(probability_gain)):
# set positive inf to max of array
probability_gain[np.isposinf(probability_gain)] = np.amax(probability_gain[~np.isposinf(probability_gain)])
# set negative inf to min of array
probability_gain[np.isneginf(probability_gain)] = np.amin(probability_gain[~np.isneginf(probability_gain)])
# now set nan results to negative inf
probability_gain[np.isnan(probability_gain)] = -np.inf
# matrix now contains no pos inf or nans.
# neg inf masks the unusable values and threshold is over neg inf, but below lowest valid value
# TODO: change name of logp to cost
if self.filter_kwargs.get("no_dist"):
cost_matrix = -np.exp(-probability_gain)
threshold = -np.exp(-self.LogProbabilityThreshold)
else:
cost_matrix = self.cost_from_logprob(probability_gain)
threshold = self.cost_from_logprob(probability_gain, value=self.LogProbabilityThreshold)
rows, cols = self.assign(cost_matrix, threshold=threshold)
else:
rows, cols = [], []
self.Probability_Gain.update({i: np.asarray(probability_gain)})
track_length = dict(zip(gain_dict.values(), np.zeros(len(gain_dict), dtype=int)))
track_length.update(dict([[k, len(self.ActiveFilters[k].X)] for k in self.ActiveFilters]))
track_inactivity_time = dict(zip(gain_dict.values(), np.zeros(len(gain_dict), dtype=int)))
track_inactivity_time.update(dict([[k, i-max(self.ActiveFilters[k].X)-1] for k in self.ActiveFilters]))
# assignments = dict([(gain_dict[rows[i]], cols[i]) for i in range(len(rows))
# if (probability_gain[rows[i], cols[i]] > threshold) | (track_length[gain_dict[rows[i]]] < 2)])
assignments = dict([(gain_dict[rows[i]], cols[i]) for i in range(len(rows))
if (cost_matrix[rows[i], cols[i]] < threshold)])
not_updated_tracks = set(self.ActiveFilters.keys()).difference(assignments.keys())
stopped_tracks = set([k for k in not_updated_tracks if track_inactivity_time[k] > self.FilterThreshold])
not_updated_tracks = not_updated_tracks.difference(stopped_tracks)
spawned_tracks = dict(zip(max(self.Filters.keys()) + 1 + np.arange(len(measurements)-len(assignments)),
set(range(M)).difference(assignments.values())))
print("Assigned %d, not updated %d, stopped %d and spawned %d Tracks."%(len(assignments),len(not_updated_tracks), len(stopped_tracks), len(spawned_tracks)))
for k in assignments:
m = assignments[k]
self.ActiveFilters[k].update(z=measurements[m], i=i)
if verbose:
print("Updated track %s with prob %s in frame %s" % (k, probability_gain[reverse_dict(gain_dict)[k], m], i))
for k in not_updated_tracks:
if verbose:
print("No update for track %s with best prob %s in frame %s"%(reverse_dict(gain_dict)[k],
np.amax(probability_gain[reverse_dict(gain_dict)[k]]), i))
for k in stopped_tracks:
self.ActiveFilters.pop(k)
if verbose:
print("Stopped track %s with probability %s in frame %s" % (k, probability_gain[reverse_dict(gain_dict)[k], m], i))
for k in spawned_tracks:
m = spawned_tracks[k]
_filter = self.Filter_Class(self.Model, *self.filter_args, **self.filter_kwargs)
_filter.Predicted_X.update({i: self.Model.infer_state(z[m])})
_filter.X.update({i: self.Model.infer_state(z[m])})
_filter.Measurements.update({i: measurements[m]})
self.ActiveFilters.update({k: _filter})
self.Filters.update({k: _filter})
self.Probability_Gain_Dicts.update({i: gain_dict})
self.Probability_Assignment_Dicts.update({i: dict([[k, c] for k,c in assignments.items()])})
return measurements, i
[docs] def fit(self, u, z):
"""Function to auto-evaluate all measurements z with control-vectors u and starting probability p.
It returns the believed values x, the corresponding probabilities p and the predictions x_tilde.
Parameters
----------
u: array_like
List of control-vectors.
z: list of PenguTrack.Measurement objects
Recent measurements.
"""
u = np.asarray(u)
z = np.asarray(z)
assert u.shape[0] == z.shape[0]
for i in range(z.shape[0]):
self.predict(u=u[i], i=i+1)
self.update(z=z[i], i=i+1)
return self.X, self.X_error, self.Predicted_X, self.Predicted_X_error
[docs] def downfilter(self, t=None):
"""
Function erases the timestep t from the class dictionaries (Measurement, X, Preditcion)
Parameters
----------
t: int, optional
Time-steps for which filtering should be erased.
"""
for k in self.Filters.keys():
self.Filters[k].downfilter(t=t)
[docs] def downdate(self, t=None):
"""
Function erases the time-step t from the Measurements and Believes.
Parameters
----------
t: int, optional
Time-steps for which filtering should be erased.
"""
for k in self.Filters.keys():
self.Filters[k].downdate(t=t)
[docs] def unpredict(self, t=None):
"""
Function erases the time-step t from the Believes, Predictions and Controls.
Parameters
----------
t: int, optional
Time-steps for which filtering should be erased.
"""
for k in self.Filters.keys():
self.Filters[k].downdate(t=t)
[docs] def log_prob(self, **kwargs):
"""
Function to calculate the probability measure by predictions, measurements and corresponding distributions.
Parameters
----------
keys: list of int, optional
Time-steps for which probability should be calculated.
Returns
----------
prob : float
Probability of measurements at the given time-keys.
"""
prob = 0
for j in self.Filters.keys():
prob += self.Filters[j].log_prob(**kwargs)
return prob
def assign(self, cost_matrix, **kwargs):
return greedy_assignment(cost_matrix)
def name(self):
return str(self.__class__).split("'")[1].split(".")[-1] + "_" + str(self.Filter_Class).split("'")[1].split(".")[-1]
[docs]class Tracker(MultiFilter):
pass
[docs]class HungarianTracker(Tracker):
def assign(self, cost_matrix, **kwargs):
return hungarian_assignment(cost_matrix)
[docs]class NetworkTracker(Tracker):
"""
This Class describes a filter, which is capable of assigning measurements to tracks, which again are represented by
sub-filters. The type of these can be specified, as well as a physical model for predictions. With these objects it
is possible to assign possibilities to combinations of measurement and prediction.
Attributes
----------
Model: PenguTrack.model object
A physical model to gain predictions from data.
Filter_Class: PenguTrack.Filter object
A Type of Filter from which all subfilters should be built.
Filters: dict
Dictionary containing all sub-filters as PenguTrack.Filter objects.
Active_Filters: dict
Dictionary containing all sub-filters, which are currently updated.
FilterThreshold: int
Number of time steps, before a filter is set inactive.
LogProbabilityThreshold: float
Threshold, under which log-probabilities are concerned negligible.
filter_args: list
Filter-Type specific arguments from the Multi-Filter initialisation can be stored here.
filter_kwargs: dict
Filter-Type specific keyword-arguments from the Multi-Filter initialisation can be stored here.
Measurement_Distribution: scipy.stats.distributions object
The distribution which describes measurement uncertainty.
State_Distribution: scipy.stats.distributions object
The distribution which describes state vector fluctuations.
X: dict
The time series of believes calculated for this filter. The keys equal the time stamp.
X_error: dict
The time series of errors on the corresponding believes. The keys equal the time stamp.
Predicted_X: dict
The time series of predictions made from the associated data. The keys equal the time stamp.
Predicted_X_error: dict
The time series of estimated prediction errors. The keys equal the time stamp.
Measurements: dict
The time series of measurements assigned to this filter. The keys equal the time stamp.
Controls: dict
The time series of control-vectors assigned to this filter. The keys equal the time stamp.
Order: int
The order of network assignments. The higher, the more costy and precise the assignment will be.
"""
def __init__(self, *args, **kwargs):
"""
This Class describes a filter, which is capable of assigning measurements to tracks, which again are represented by
sub-filters. The type of these can be specified, as well as a physical model for predictions. With these objects it
is possible to assign possibilities to combinations of measurement and prediction.
The network filter uses a network representation of the assignment problem and solves for a number of neighbours
for each node, called order.
Sub-filter specific arguments are handles by *args and **kwargs.
Parameters
----------
model: PenguTrack.model object
A physical model to gain predictions from data.
meas_dist: scipy.stats.distributions object
The distribution which describes measurement uncertainty.
state_dist: scipy.stats.distributions object
The distribution which describes state vector fluctuations.
order: int, optioinal
Order Parameter, number of next-neighbour layer to be taken into account for assignment
"""
super(NetworkTracker, self).__init__(*args, **kwargs)
self.Order = kwargs.get("order", 2)
def assign(self, cost_matrix, threshold=None, method="linear", **kwargs):
return network_assignment(cost_matrix, threshold=threshold
, method="linear", order=self.Order)
[docs]class HybridSolver(Tracker):
"""
This Class describes a filter, which is capable of assigning measurements to tracks, which again are represented by
sub-filters. The type of these can be specified, as well as a physical model for predictions. With these objects it
is possible to assign possibilities to combinations of measurement and prediction.
Attributes
----------
Model: PenguTrack.model object
A physical model to gain predictions from data.
Filter_Class: PenguTrack.Filter object
A Type of Filter from which all subfilters should be built.
Filters: dict
Dictionary containing all sub-filters as PenguTrack.Filter objects.
Active_Filters: dict
Dictionary containing all sub-filters, which are currently updated.
FilterThreshold: int
Number of time steps, before a filter is set inactive.
LogProbabilityThreshold: float
Threshold, under which log-probabilities are concerned negligible.
filter_args: list
Filter-Type specific arguments from the Multi-Filter initialisation can be stored here.
filter_kwargs: dict
Filter-Type specific keyword-arguments from the Multi-Filter initialisation can be stored here.
Measurement_Distribution: scipy.stats.distributions object
The distribution which describes measurement uncertainty.
State_Distribution: scipy.stats.distributions object
The distribution which describes state vector fluctuations.
X: dict
The time series of believes calculated for this filter. The keys equal the time stamp.
X_error: dict
The time series of errors on the corresponding believes. The keys equal the time stamp.
Predicted_X: dict
The time series of predictions made from the associated data. The keys equal the time stamp.
Predicted_X_error: dict
The time series of estimated prediction errors. The keys equal the time stamp.
Measurements: dict
The time series of measurements assigned to this filter. The keys equal the time stamp.
Controls: dict
The time series of control-vectors assigned to this filter. The keys equal the time stamp.
"""
def __init__(self, *args, **kwargs):
"""
This Class describes a filter, which is capable of assigning measurements to tracks, which again are represented by
sub-filters. The type of these can be specified, as well as a physical model for predictions. With these objects it
is possible to assign possibilities to combinations of measurement and prediction.
Sub-filter specific arguments are handles by *args and **kwargs.
Parameters
----------
model: PenguTrack.model object
A physical model to gain predictions from data.
meas_dist: scipy.stats.distributions object
The distribution which describes measurement uncertainty.
state_dist: scipy.stats.distributions object
The distribution which describes state vector fluctuations.
dominance_threshold: float, optioinal
Threshold between 0 and 1. definining which part of the assignments will be done by hungarian and greedy solver.
"""
super(HybridSolver, self).__init__(*args, **kwargs)
self.DominanceThreshold = kwargs.get("dominance_threshold")
# import multiprocessing
# class ThreadedMultiFilter(MultiFilter):
# def __init__(self, *args, **kwargs):
# super(ThreadedMultiFilter, self).__init__(*args, **kwargs)
# self.Pool = multiprocessing.Pool(multiprocessing.cpu_count())
# self.gain_dict = None
# self.probability_gain = None
#
# def update(self, z=None, i=None):
# """
# Function to get updates to the corresponding model. Handles time-stamps and measurement-vectors.
# This function also handles the assignment of all incoming measurements to the active sub-filters.
#
# Parameters
# ----------
# z: list of PenguTrack.Measurement objects
# Recent measurements.
# i: int
# Recent/corresponding time-stamp.
#
# Returns
# ----------
# z: list of PenguTrack.Measurement objects
# Recent measurements.
# i: int
# Recent/corresponding time-stamp.
# """
# measurements = list(z)
#
# meas_logp = np.asarray([m.Log_Probability for m in z])
# try:
# z = np.asarray([np.asarray([m.PositionX, m.PositionY, m.PositionZ]) for m in z], ndmin=2)
# except (ValueError, AttributeError):
# try:
# z = np.asarray([np.asarray([m.PositionX, m.PositionY]) for m in z], ndmin=2)
# except (ValueError, AttributeError):
# z = np.asarray([np.asarray([m.PositionX]) for m in z], ndmin=2)
# # print(np.mean(meas_logp), np.amin(meas_logp), np.amax(meas_logp))
# print(len(z))
# z = z[meas_logp - np.amin(meas_logp) >= (self.MeasurementProbabilityThreshold * (np.amax(meas_logp)-np.amin(meas_logp)))]
# measurements = list(np.asarray(measurements)[meas_logp - np.amin(meas_logp) >= (self.MeasurementProbabilityThreshold *
# (np.amax(meas_logp)-np.amin(meas_logp)))])
# print(len(z))
# M = z.shape[0]
# N = len(self.ActiveFilters.keys())
#
# if N == 0 and M > 0:
# self.initial_update(measurements, i)
# return measurements, i
#
# self.gain_dict = {}
# self.probability_gain = np.ones((max(M, N), M))/0.#*self.LogProbabilityThreshold
# gain_dict = self.gain_dict
# probability_gain = self.probability_gain
#
# pool_jobs=[]
# for j, k in enumerate(self.ActiveFilters.keys()):
# self.gain_dict.update({j: k})
# # for m, meas in enumerate(measurements):
# # probability_gain[j, m] = self.ActiveFilters[k].log_prob(keys=[i], measurements={i: meas})
# for m in range(len(measurements)):
# pool_jobs.append([i, self.ActiveFilters[k], measurements[m], j, m])
#
# for j, m, value in self.Pool.map(ThreadedUpdate, pool_jobs):
# self.probability_gain[j,m] = value
#
# probability_gain[np.isinf(probability_gain)] = np.nan
# probability_gain[np.isnan(probability_gain)] = np.nanmin(probability_gain)
# if self.LogProbabilityThreshold == -np.inf:
# LogProbabilityThreshold = np.nanmin(probability_gain)
# else:
# LogProbabilityThreshold = self.LogProbabilityThreshold
#
# # print(np.mean(probability_gain), np.amin(probability_gain), np.amax(probability_gain))
# # print(probability_gain)
# # print(np.amin(probability_gain),np.nanmax(probability_gain), np.mean(probability_gain))
# # norm = np.linalg.det(np.exp(probability_gain))#np.sum(np.exp(probability_gain), axis=1)
# # print(norm)
# # probability_gain = probability_gain-norm #(probability_gain.T - np.log(norm)).T
# # print(probability_gain)
# self.Probability_Gain.update({i: np.asarray(probability_gain)})
# # self.CriticalIndex = gain_dict[np.nanargmax([np.sort(a)[-2]/np.sort(a)[-1] for a in probability_gain[:N]])]
# x = {}
# x_err = {}
# from scipy.optimize import linear_sum_assignment
# rows, cols = linear_sum_assignment(-1*probability_gain)
#
# for j, k in enumerate(rows):
# # if not np.all(np.isnan(probability_gain)+np.isinf(probability_gain)):
# # k, m = np.unravel_index(np.nanargmax(probability_gain), probability_gain.shape)
# # else:
# # k, m = np.unravel_index(np.nanargmin(probability_gain), probability_gain.shape)
# k = rows[j]
# m = cols[j]
#
# if N > M and m >= M:
# continue
#
# # print(np.amin(probability_gain))
# # if (probability_gain[k, m] - np.amin(probability_gain) >=
# # (self.LogProbabilityThreshold *(np.amax(probability_gain)-np.amin(probability_gain)))
# # and gain_dict.has_key(k)):
# if probability_gain[k,m] > LogProbabilityThreshold and k in gain_dict:
# # if probability_gain[k, m] - MIN > LIMIT and gain_dict.has_key(k):
# self.ActiveFilters[gain_dict[k]].update(z=measurements[m], i=i)
# x.update({gain_dict[k]: self.ActiveFilters[gain_dict[k]].X[i]})
# x_err.update({gain_dict[k]: self.ActiveFilters[gain_dict[k]].X_error[i]})
#
# else:
# print("DEPRECATED TRACK WITH PROB %s IN FRAME %s" % (probability_gain[k, m], i))
# try:
# n = len(self.ActiveFilters[gain_dict[k]].X.keys())
# except KeyError:
# n = np.inf
#
# l = max(self.Filters.keys()) + 1
# _filter = self.Filter_Class(self.Model, *self.filter_args, **self.filter_kwargs)
# _filter.Predicted_X.update({i: self.Model.infer_state(z[m])})
# _filter.X.update({i: self.Model.infer_state(z[m])})
# _filter.Measurements.update({i: measurements[m]})
#
# self.ActiveFilters.update({l: _filter})
# self.Filters.update({l: _filter})
#
# probability_gain[k, :] = np.nan
# probability_gain[:, m] = np.nan
#
# # if len(self.ActiveFilters.keys()) < M:
# # raise RuntimeError('Lost Filters on the way. This should never happen')
# return measurements, i
def ThreadedUpdate(arg):
i, filter, measurement, j, m = arg
return j, m, filter.log_prob(keys=[i], measurements={i: measurement})
# class AdvancedMultiFilter(MultiFilter):
# def __init__(self, *args, **kwargs):
# super(AdvancedMultiFilter, self).__init__(*args,**kwargs)
# def _critical_(self, ProbMat):
# critical_i =[]
# for i,n in enumerate(ProbMat):