From 0656eca9a68ad273af042074a4b8354a46f27738 Mon Sep 17 00:00:00 2001 From: Syed Shabbir Ahmed Date: Fri, 31 May 2024 15:24:28 -0400 Subject: [PATCH] utils folder restructured --- examples/ex_imm_se3.py | 8 +- examples/ex_imm_vector.py | 3 +- navlie/__init__.py | 44 +- navlie/filters.py | 239 +++ navlie/imm.py | 487 ----- navlie/lib/__init__.py | 1 + navlie/lib/states.py | 24 +- navlie/utils/__init__.py | 28 + navlie/{utils.py => utils/common.py} | 1572 +++++++++-------- navlie/utils/mixture.py | 147 ++ navlie/utils/plot.py | 482 +++++ .../test_interacting_multiple_models.py | 5 +- 12 files changed, 1795 insertions(+), 1245 deletions(-) delete mode 100644 navlie/imm.py create mode 100644 navlie/utils/__init__.py rename navlie/{utils.py => utils/common.py} (61%) create mode 100644 navlie/utils/mixture.py create mode 100644 navlie/utils/plot.py diff --git a/examples/ex_imm_se3.py b/examples/ex_imm_se3.py index 6b0dbb48..dc3ac81a 100644 --- a/examples/ex_imm_se3.py +++ b/examples/ex_imm_se3.py @@ -80,7 +80,7 @@ def covariance(self, x, u, dt) -> np.ndarray: off_diag_p = 0.02 Pi = np.ones((n_models, n_models)) * off_diag_p Pi = Pi + (1 - off_diag_p * (n_models)) * np.diag(np.ones(n_models)) -imm = nav.imm.InteractingModelFilter(kf_list, Pi) +imm = nav.InteractingModelFilter(kf_list, Pi) dg = nav.DataGenerator( @@ -102,16 +102,16 @@ def imm_trial(trial_number: int) -> List[nav.GaussianResult]: x0_check = x0.plus(nav.randvec(P0)) - estimate_list = nav.imm.run_imm_filter( + estimate_list = nav.run_imm_filter( imm, x0_check, P0, input_list, meas_list ) results = [ - nav.imm.IMMResult(estimate_list[i], state_true[i]) + nav.IMMResult(estimate_list[i], state_true[i]) for i in range(len(estimate_list)) ] - return nav.imm.IMMResultList(results) + return nav.IMMResultList(results) def ekf_trial(trial_number: int) -> List[nav.GaussianResult]: diff --git a/examples/ex_imm_vector.py b/examples/ex_imm_vector.py index 0d09c42c..51054c7f 100644 --- a/examples/ex_imm_vector.py +++ b/examples/ex_imm_vector.py @@ -6,7 +6,8 @@ import navlie as nav from navlie.lib.models import DoubleIntegrator, RangePointToAnchor, VectorState -from navlie.imm import InteractingModelFilter, run_imm_filter, IMMResultList +from navlie.filters import InteractingModelFilter, run_imm_filter +from navlie.utils import IMMResultList import numpy as np from typing import List from matplotlib import pyplot as plt diff --git a/navlie/__init__.py b/navlie/__init__.py index 45e8085d..8faeec6b 100644 --- a/navlie/__init__.py +++ b/navlie/__init__.py @@ -14,37 +14,51 @@ UnscentedKalmanFilter, CubatureKalmanFilter, GaussHermiteKalmanFilter, + InteractingModelFilter, run_filter, + run_imm_filter, ) from . import batch -from . import imm from . import lib +from . import utils from .batch import BatchEstimator from .datagen import DataGenerator, generate_measurement -from .utils import ( + +from .composite import ( + CompositeState, + CompositeProcessModel, + CompositeMeasurementModel, + CompositeInput, +) + +from .lib.states import StampedValue # for backwards compatibility + +from .utils.common import ( + state_interp, GaussianResult, GaussianResultList, MonteCarloResult, - plot_error, - plot_meas, - plot_poses, - plot_nees, + IMMResult, + IMMResultList, monte_carlo, + randvec, van_loans, - state_interp, + schedule_sequential_measurements, associate_stamps, - set_axes_equal, find_nearest_stamp_idx, - randvec, jacobian, ) -from .composite import ( - CompositeState, - CompositeProcessModel, - CompositeMeasurementModel, - CompositeInput, +from .utils.plot import ( + plot_error, + plot_nees, + plot_meas, + plot_meas_by_model, + plot_poses, + set_axes_equal ) -from .lib.states import StampedValue # for backwards compatibility +from .utils.mixture import ( + gaussian_mixing, +) diff --git a/navlie/filters.py b/navlie/filters.py index 756ebb94..37d0244c 100644 --- a/navlie/filters.py +++ b/navlie/filters.py @@ -10,8 +10,11 @@ Measurement, StateWithCovariance, ) +from navlie.lib.states import IMMState +from navlie.utils.mixture import gaussian_mixing import numpy as np from scipy.stats.distributions import chi2 +from scipy.stats import multivariate_normal from numpy.polynomial.hermite_e import hermeroots from math import factorial from scipy.special import eval_hermitenorm @@ -817,6 +820,166 @@ def __init__( ) +class InteractingModelFilter: + """ + On-manifold Interacting Multiple Model Filter (IMM). + + References for the IMM: + + H. A. P. Blom and Y. Bar-Shalom, "The interacting + multiple model algorithm for systems with Markovian switching coefficients," + in IEEE Transactions on Automatic Control, vol. 33, no. 8, pp. 780-783, Aug. + 1988, doi: 10.1109/9.1299. + + + The IMM involves Gaussian mixtures. Reference for mixing Gaussians on + manifolds: + + J. Ćesić, I. Marković and I. Petrović, "Mixture Reduction on Matrix Lie + Groups," in IEEE Signal Processing Letters, vol. 24, no. 11, pp. + 1719-1723, Nov. 2017, doi: 10.1109/LSP.2017.2723765. + """ + + def __init__( + self, kf_list: List[ExtendedKalmanFilter], transition_matrix: np.ndarray + ): + """ + Initialize InteractingModelFilter. + + Parameters + ---------- + kf_list : List[ExtendedKalmanFilter] + A list of filter instances which correspond to + each model of the IMM. + transition_matrix : np.ndarray + Probability transition matrix corresponding to the IMM models. + """ + self.kf_list = kf_list + self.transition_matrix = transition_matrix + + def interaction( + self, + x: IMMState, + ): + """The interaction (mixing) step of the IMM. + + Parameters + ---------- + x : IMMState + + Returns + ------- + IMMState + """ + + x_km_models = x.model_states.copy() + mu_models = np.array(x.model_probabilities) + + n_modes = self.transition_matrix.shape[0] + c = self.transition_matrix.T @ mu_models.reshape(-1, 1) + + mu_mix = np.zeros((n_modes, n_modes)) + for i in range(n_modes): + for j in range(n_modes): + mu_mix[i, j] = ( + 1.0 / c[j] * self.transition_matrix[i, j] * mu_models[i] + ) + x_mix = [] + + for j in range(n_modes): + weights = list(mu_mix[:, j]) + x_mix.append(gaussian_mixing(weights, x_km_models)) + + return IMMState(x_mix, mu_models) + + def predict(self, x_km: IMMState, u: Input, dt: float): + """ + Carries out prediction step for each model of the IMM. + + Parameters + ---------- + x_km : IMMState + Model estimates from previous timestep, after mixing. + u : Input + Input + dt : Float + Timestep + + Returns + ------- + IMMState + """ + + x_km_models = x_km.model_states.copy() + x_check = [] + for lv1, kf in enumerate(self.kf_list): + x_check.append(kf.predict(x_km_models[lv1], u, dt)) + return IMMState(x_check, x_km.model_probabilities) + + def correct( + self, + x_check: IMMState, + y: Measurement, + u: Input, + ): + """ + Carry out the correction step for each model and update model + probabilities. + + Parameters + ---------- + x_check: IMMState mu_km_models : List[Float] + Probabilities for each model from previous timestep. + y : Measurement + Measurement to be fused into the current state estimate. + u: Input + Most recent input, to be used to predict the state forward if the + measurement stamp is larger than the state stamp. + + + Returns + ------- + IMMState + Corrected state estimates and probabilities + """ + x_models_check = x_check.model_states.copy() + mu_km_models = x_check.model_probabilities.copy() + n_modes = len(x_models_check) + mu_k = np.zeros(n_modes) + + # Compute each model's normalization constant + c_bar = np.zeros(n_modes) + for i in range(n_modes): + for j in range(n_modes): + c_bar[j] = ( + c_bar[j] + self.transition_matrix[i, j] * mu_km_models[i] + ) + + # Correct and update model probabilities + x_hat = [] + for lv1, kf in enumerate(self.kf_list): + x, details_dict = kf.correct( + x_models_check[lv1], y, u, output_details=True + ) + x_hat.append(x) + z = details_dict["z"] + S = details_dict["S"] + z = z.ravel() + model_likelihood = multivariate_normal.pdf( + z, mean=np.zeros(z.shape), cov=S + ) + mu_k[lv1] = model_likelihood * c_bar[lv1] + + # If all model likelihoods are zero to machine tolerance, np.sum(mu_k)=0 and it fails + # Add this fudge factor to get through those cases. + if np.allclose(mu_k, np.zeros(mu_k.shape)): + mu_k = 1e-10 * np.ones(mu_k.shape) + + mu_k = mu_k / np.sum(mu_k) + + return IMMState(x_hat, mu_k) + + def run_filter( filter: ExtendedKalmanFilter, x0: State, @@ -883,3 +1046,79 @@ def run_filter( x = filter.predict(x, u, dt) return results_list + + +def run_imm_filter( + filter: InteractingModelFilter, + x0: State, + P0: np.ndarray, + input_data: List[Input], + meas_data: List[Measurement], +) -> List[StateWithCovariance]: + """ + Executes an InteractingMultipleModel filter + + Parameters + ---------- + filter: An InteractingModelFilter instance: + _description_ + ProcessModel: Callable, must return a process model: + _description_ + Q_profile: Callable, must return a square np.array compatible with process model: + _description_ + x0 : State + _description_ + P0 : np.ndarray + _description_ + input_data : List[Input] + _description_ + meas_data : List[Measurement] + _description_ + """ + x = StateWithCovariance(x0, P0) + if x.state.stamp is None: + raise ValueError("x0 must have a valid timestamp.") + + # Sort the data by time + input_data.sort(key=lambda x: x.stamp) + meas_data.sort(key=lambda x: x.stamp) + + # Remove all that are before the current time + for idx, u in enumerate(input_data): + if u.stamp >= x.state.stamp: + input_data = input_data[idx:] + break + + for idx, y in enumerate(meas_data): + if y.stamp >= x.state.stamp: + meas_data = meas_data[idx:] + break + + meas_idx = 0 + if len(meas_data) > 0: + y = meas_data[meas_idx] + + results_list = [] + n_models = filter.transition_matrix.shape[0] + + x = IMMState( + [StateWithCovariance(x0, P0)] * n_models, + 1.0 / n_models * np.array(np.ones(n_models)), + ) + for k in tqdm(range(len(input_data) - 1)): + results_list.append(x) + u = input_data[k] + # Fuse any measurements that have occurred. + if len(meas_data) > 0: + while y.stamp < input_data[k + 1].stamp and meas_idx < len( + meas_data + ): + x = filter.interaction(x) + x = filter.correct(x, y, u) + meas_idx += 1 + if meas_idx < len(meas_data): + y = meas_data[meas_idx] + dt = input_data[k + 1].stamp - x.model_states[0].stamp + x = filter.predict(x, u, dt) + + return results_list diff --git a/navlie/imm.py b/navlie/imm.py deleted file mode 100644 index 9bb07ea3..00000000 --- a/navlie/imm.py +++ /dev/null @@ -1,487 +0,0 @@ -""" -Module for the Interacting Multiple Model (IMM) filter. -""" - -from typing import List - -from navlie.types import ( - Input, - State, - Measurement, - StateWithCovariance, -) -import numpy as np -from scipy.stats import multivariate_normal -from navlie.utils import GaussianResultList, GaussianResult, state_interp -from navlie.filters import ExtendedKalmanFilter -from tqdm import tqdm - -# TODO. The IMM seems to have an issue when the user accidently modifies the -# provided state in the process model. - - -def gaussian_mixing_vectorspace( - weights: List[float], means: List[np.ndarray], covariances: List[np.ndarray] -): - """ - Calculate the mean and covariance of a Gaussian mixture on a vectorspace. - - Parameters - ---------- - weights : List[Float] - Weights corresponding to each Gaussian. - means: List[np.ndarray] - List containing the means of each Gaussian mixture - covariances - List containing covariances of each Gaussian mixture - - Returns - ------- - np.ndarray - Mean of Gaussian mixture - np.ndarray - Covariance of Gaussian mixture - """ - - x_bar = np.zeros(means[0].shape) - P_bar = np.zeros(covariances[0].shape) - - for weight, x in zip(weights, means): - x_bar = x_bar + weight * x - - for weight, x, P in zip(weights, means, covariances): - dx = (x - x_bar).reshape(-1, 1) - P_bar = P_bar + weight * P + weight * dx @ dx.T - - return x_bar, P_bar - - -def reparametrize_gaussians_about_X_par( - X_par: State, X_list: List[StateWithCovariance] -): - """ - Reparametrize each Lie group Gaussian in X_list about X_par. - A Lie group Gaussian is only relevant in the tangent space of its own mean. - To mix Lie group Gaussians, a specific X, X_par, must be chosen to expand - a tangent space around. Once expanded in this common tangent space, - the Gaussians may be mixed in a vector space fashion. - - Parameters - ---------- - X_par : State - Each member of X_list will be reparametrized as a Gaussian - in the tangent space of X_par. - X_list : List[StateWithCovariance] - List of Lie group Gaussians to be reparametrized. - - Returns - ------- - List[np.ndarray] - Tangent space of X_par mean of each element of X_list. - List[np.ndarray] - Tangent space of X_par covariance of each element of X_list - """ - means_reparametrized = [] - covariances_reparametrized = [] - - for X in X_list: - mu = X.state.minus(X_par) - Jinv = X.state.minus_jacobian(X_par) - Sigma = Jinv @ X.covariance @ Jinv.T - means_reparametrized.append(mu) - covariances_reparametrized.append(Sigma) - - return means_reparametrized, covariances_reparametrized - - -def update_X(X: State, mu: np.ndarray, P: np.ndarray): - """ - Given a Lie group Gaussian with mean mu and covariance P, expressed in the - tangent space of X, compute Lie group StateAndCovariance X_hat such that the - Lie algebra Gaussian around X_hat has zero mean. - - Parameters - ---------- - X : State - A Lie group element. - mu : np.ndarray - Mean of Gaussian in tangent space of X - P: np.ndarray - Covariance of Gaussian in tangent space of X - - Returns - ------- - StateWithCovariance - StateWithCovariance whose state is a Lie group element. - """ - X_hat = StateWithCovariance(X, np.zeros((X.dof, X.dof))) - X_hat.state = X_hat.state.plus(mu) - - J = X.plus_jacobian(mu) - - X_hat.covariance = J @ P @ J.T - - return X_hat - - -def gaussian_mixing(weights: List[float], x_list: List[StateWithCovariance]): - """A Gaussian mixing method that handles both vectorspace Gaussians - and Gaussians on Lie groups. - - Parameters - ---------- - weights : List[Float] - Weights of Gaussians to be mixed. - x_list : List[StateWithCovariance] - List of Gaussians to be mixed. - Returns - ------- - StateWithCovariance - The mixed Gaussian - """ - max_idx = np.argmax(np.array(weights)) - X_par = x_list[max_idx].state - mu_repar, P_repar = reparametrize_gaussians_about_X_par(X_par, x_list) - x_bar, P_bar = gaussian_mixing_vectorspace(weights, mu_repar, P_repar) - X_mix = update_X(X_par, x_bar, P_bar) - return X_mix - - -class IMMState: - __slots__ = ["model_states", "model_probabilities"] - - def __init__( - self, - model_states: List[StateWithCovariance], - model_probabilities: List[float], - ): - self.model_states = model_states - self.model_probabilities = model_probabilities - - @property - def stamp(self): - return self.model_states[0].state.stamp - - def copy(self) -> "IMMState": - x_copy = [x.copy() for x in self.model_states] - return IMMState(x_copy, self.model_probabilities.copy()) - - -class IMMResult(GaussianResult): - __slots__ = [ - "stamp", - "state", - "state_true", - "covariance", - "error", - "ees", - "nees", - "md", - "three_sigma", - "model_probabilities", - ] - - def __init__(self, imm_estimate: IMMState, state_true: State): - super().__init__( - gaussian_mixing( - imm_estimate.model_probabilities, imm_estimate.model_states - ), - state_true, - ) - - self.model_probabilities = imm_estimate.model_probabilities - - -class IMMResultList(GaussianResultList): - __slots__ = [ - "stamp", - "state", - "state_true", - "covariance", - "error", - "ees", - "nees", - "md", - "three_sigma", - "value", - "value_true", - "dof", - "model_probabilities", - ] - - def __init__(self, result_list: List[IMMResult]): - super().__init__(result_list) - self.model_probabilities = np.array( - [r.model_probabilities for r in result_list] - ) - - @staticmethod - def from_estimates( - estimate_list: List[IMMState], - state_true_list: List[State], - method="nearest", - ): - """ - A convenience function that creates a IMMResultList from a list of - IMMState and a list of true State objects - - Parameters - ---------- - estimate_list : List[IMMState] - A list of IMMState objects - state_true_list : List[State] - A list of true State objects - method : "nearest" or "linear", optional - The method used to interpolate the true state when the timestamps - do not line up exactly, by default "nearest". - - Returns - ------- - IMMResultList - A IMMResultList object - """ - stamps = [r.model_states[0].stamp for r in estimate_list] - - state_true_list = state_interp(stamps, state_true_list, method=method) - return IMMResultList( - [ - IMMResult(estimate, state_true) - for estimate, state_true in zip(estimate_list, state_true_list) - ] - ) - - -class InteractingModelFilter: - """ - On-manifold Interacting Multiple Model Filter (IMM). - - References for the IMM: - - H. A. P. Blom and Y. Bar-Shalom, "The interacting - multiple model algorithm for systems with Markovian switching coefficients," - in IEEE Transactions on Automatic Control, vol. 33, no. 8, pp. 780-783, Aug. - 1988, doi: 10.1109/9.1299. - - - The IMM involves Gaussian mixtures. Reference for mixing Gaussians on - manifolds: - - J. Ćesić, I. Marković and I. Petrović, "Mixture Reduction on Matrix Lie - Groups," in IEEE Signal Processing Letters, vol. 24, no. 11, pp. - 1719-1723, Nov. 2017, doi: 10.1109/LSP.2017.2723765. - """ - - def __init__( - self, kf_list: List[ExtendedKalmanFilter], transition_matrix: np.ndarray - ): - """ - Initialize InteractingModelFilter. - - Parameters - ---------- - kf_list : List[ExtendedKalmanFilter] - A list of filter instances which correspond to - each model of the IMM. - transition_matrix : np.ndarray - Probability transition matrix corresponding to the IMM models. - """ - self.kf_list = kf_list - self.transition_matrix = transition_matrix - - def interaction( - self, - x: IMMState, - ): - """The interaction (mixing) step of the IMM. - - Parameters - ---------- - x : IMMState - - Returns - ------- - IMMState - """ - - x_km_models = x.model_states.copy() - mu_models = np.array(x.model_probabilities) - - n_modes = self.transition_matrix.shape[0] - c = self.transition_matrix.T @ mu_models.reshape(-1, 1) - - mu_mix = np.zeros((n_modes, n_modes)) - for i in range(n_modes): - for j in range(n_modes): - mu_mix[i, j] = ( - 1.0 / c[j] * self.transition_matrix[i, j] * mu_models[i] - ) - x_mix = [] - - for j in range(n_modes): - weights = list(mu_mix[:, j]) - x_mix.append(gaussian_mixing(weights, x_km_models)) - - return IMMState(x_mix, mu_models) - - def predict(self, x_km: IMMState, u: Input, dt: float): - """ - Carries out prediction step for each model of the IMM. - - Parameters - ---------- - x_km : IMMState - Model estimates from previous timestep, after mixing. - u : Input - Input - dt : Float - Timestep - - Returns - ------- - IMMState - """ - - x_km_models = x_km.model_states.copy() - x_check = [] - for lv1, kf in enumerate(self.kf_list): - x_check.append(kf.predict(x_km_models[lv1], u, dt)) - return IMMState(x_check, x_km.model_probabilities) - - def correct( - self, - x_check: IMMState, - y: Measurement, - u: Input, - ): - """ - Carry out the correction step for each model and update model - probabilities. - - Parameters - ---------- - x_check: IMMState mu_km_models : List[Float] - Probabilities for each model from previous timestep. - y : Measurement - Measurement to be fused into the current state estimate. - u: Input - Most recent input, to be used to predict the state forward if the - measurement stamp is larger than the state stamp. - - - Returns - ------- - IMMState - Corrected state estimates and probabilities - """ - x_models_check = x_check.model_states.copy() - mu_km_models = x_check.model_probabilities.copy() - n_modes = len(x_models_check) - mu_k = np.zeros(n_modes) - - # Compute each model's normalization constant - c_bar = np.zeros(n_modes) - for i in range(n_modes): - for j in range(n_modes): - c_bar[j] = ( - c_bar[j] + self.transition_matrix[i, j] * mu_km_models[i] - ) - - # Correct and update model probabilities - x_hat = [] - for lv1, kf in enumerate(self.kf_list): - x, details_dict = kf.correct( - x_models_check[lv1], y, u, output_details=True - ) - x_hat.append(x) - z = details_dict["z"] - S = details_dict["S"] - z = z.ravel() - model_likelihood = multivariate_normal.pdf( - z, mean=np.zeros(z.shape), cov=S - ) - mu_k[lv1] = model_likelihood * c_bar[lv1] - - # If all model likelihoods are zero to machine tolerance, np.sum(mu_k)=0 and it fails - # Add this fudge factor to get through those cases. - if np.allclose(mu_k, np.zeros(mu_k.shape)): - mu_k = 1e-10 * np.ones(mu_k.shape) - - mu_k = mu_k / np.sum(mu_k) - - return IMMState(x_hat, mu_k) - - -def run_imm_filter( - filter: InteractingModelFilter, - x0: State, - P0: np.ndarray, - input_data: List[Input], - meas_data: List[Measurement], -) -> List[StateWithCovariance]: - """ - Executes an InteractingMultipleModel filter - - Parameters - ---------- - filter: An InteractingModelFilter instance: - _description_ - ProcessModel: Callable, must return a process model: - _description_ - Q_profile: Callable, must return a square np.array compatible with process model: - _description_ - x0 : State - _description_ - P0 : np.ndarray - _description_ - input_data : List[Input] - _description_ - meas_data : List[Measurement] - _description_ - """ - x = StateWithCovariance(x0, P0) - if x.state.stamp is None: - raise ValueError("x0 must have a valid timestamp.") - - # Sort the data by time - input_data.sort(key=lambda x: x.stamp) - meas_data.sort(key=lambda x: x.stamp) - - # Remove all that are before the current time - for idx, u in enumerate(input_data): - if u.stamp >= x.state.stamp: - input_data = input_data[idx:] - break - - for idx, y in enumerate(meas_data): - if y.stamp >= x.state.stamp: - meas_data = meas_data[idx:] - break - - meas_idx = 0 - if len(meas_data) > 0: - y = meas_data[meas_idx] - - results_list = [] - n_models = filter.transition_matrix.shape[0] - - x = IMMState( - [StateWithCovariance(x0, P0)] * n_models, - 1.0 / n_models * np.array(np.ones(n_models)), - ) - for k in tqdm(range(len(input_data) - 1)): - results_list.append(x) - u = input_data[k] - # Fuse any measurements that have occurred. - if len(meas_data) > 0: - while y.stamp < input_data[k + 1].stamp and meas_idx < len( - meas_data - ): - x = filter.interaction(x) - x = filter.correct(x, y, u) - meas_idx += 1 - if meas_idx < len(meas_data): - y = meas_data[meas_idx] - dt = input_data[k + 1].stamp - x.model_states[0].stamp - x = filter.predict(x, u, dt) - - return results_list diff --git a/navlie/lib/__init__.py b/navlie/lib/__init__.py index 5208f092..51d37087 100644 --- a/navlie/lib/__init__.py +++ b/navlie/lib/__init__.py @@ -10,6 +10,7 @@ SO3State, SE23State, SL3State, + IMMState, CompositeState, MatrixLieGroupState, VectorInput, diff --git a/navlie/lib/states.py b/navlie/lib/states.py index 96e38b87..8bbb08c2 100644 --- a/navlie/lib/states.py +++ b/navlie/lib/states.py @@ -1,8 +1,8 @@ from pymlg import SO2, SO3, SE2, SE3, SE23, SL3 from pymlg.numpy.base import MatrixLieGroup import numpy as np -from navlie.types import State, Input -from typing import Any +from navlie.types import State, Input, StateWithCovariance +from typing import Any, List try: # We do not want to make ROS a hard dependency, so we import it only if @@ -653,6 +653,26 @@ def __init__( super().__init__(value, self.group, stamp, state_id, direction) +class IMMState: + __slots__ = ["model_states", "model_probabilities"] + + def __init__( + self, + model_states: List[StateWithCovariance], + model_probabilities: List[float], + ): + self.model_states = model_states + self.model_probabilities = model_probabilities + + @property + def stamp(self): + return self.model_states[0].state.stamp + + def copy(self) -> "IMMState": + x_copy = [x.copy() for x in self.model_states] + return IMMState(x_copy, self.model_probabilities.copy()) + + class VectorInput(Input): """ A standard vector-based input, with value represented by a 1D numpy array. diff --git a/navlie/utils/__init__.py b/navlie/utils/__init__.py new file mode 100644 index 00000000..81941e7f --- /dev/null +++ b/navlie/utils/__init__.py @@ -0,0 +1,28 @@ +from .common import ( + state_interp, + GaussianResult, + GaussianResultList, + MonteCarloResult, + IMMResult, + IMMResultList, + monte_carlo, + randvec, + van_loans, + schedule_sequential_measurements, + associate_stamps, + find_nearest_stamp_idx, + jacobian, +) + +from .plot import ( + plot_error, + plot_nees, + plot_meas, + plot_meas_by_model, + plot_poses, + set_axes_equal +) + +from .mixture import ( + gaussian_mixing, +) diff --git a/navlie/utils.py b/navlie/utils/common.py similarity index 61% rename from navlie/utils.py rename to navlie/utils/common.py index 150144ec..b86e38fd 100644 --- a/navlie/utils.py +++ b/navlie/utils/common.py @@ -2,14 +2,146 @@ Collection of miscellaneous utility functions and classes. """ -from typing import Callable, List, Tuple, Union, Any, Dict +from typing import Callable, List, Tuple, Union, Any from joblib import Parallel, delayed -from navlie.types import State, Measurement, StateWithCovariance +from navlie.types import State, StateWithCovariance +from navlie.lib.states import IMMState +from navlie.utils.mixture import gaussian_mixing import numpy as np -import matplotlib.pyplot as plt -from scipy.stats.distributions import chi2 from scipy.interpolate import interp1d from scipy.linalg import block_diag, expm +from scipy.stats.distributions import chi2 + +def state_interp( + query_stamps: Union[float, List[float], Any], + state_list: List[State], + method="linear", +) -> Union[State, List[State]]: + """ + Performs "linear" (geodesic) interpolation between ``State`` objects. Multiple + interpolations can be performed at once in a vectorized fashion. If the + query point is out of bounds, the end points are returned. + + ..code-block:: python + + x_data = [SE3State.random(stamp=i) for i in range(10)] + x_query = [0.2, 0.5, 10] + x_interp = state_interp(x_query, x_data) + + Parameters + ---------- + query_stamps : float or object with ``.stamp`` attribute (or Lists thereof) + Query stamps. Can either be a float, or an object containing a ``stamp`` + attribute. If a list is provided, it will be treated as multiple query + points and the return value will be a list of ``State`` objects. + state_list : List[State] + List of ``State`` objects to interpolate between. + + Returns + ------- + ``State`` or List[``State``] + The interpolated state(s). + + Raises + ------ + TypeError + If query point is not a float or object with a ``stamp`` attribute. + """ + + # Handle input + if isinstance(query_stamps, list): + single_query = False + elif isinstance(query_stamps, np.ndarray): + single_query = False + elif isinstance(query_stamps, float): + query_stamps = [query_stamps] + single_query = True + else: + pass + + # if not isinstance(query_stamps, list): + # query_stamps = [query_stamps] + # single_query = True + # else: + # single_query = False + + query_stamps = query_stamps.copy() + for i, stamp in enumerate(query_stamps): + if not isinstance(stamp, (float, int)): + if hasattr(stamp, "stamp"): + stamp = stamp.stamp + query_stamps[i] = stamp + else: + raise TypeError( + "Stamps must be of type float or have a stamp attribute" + ) + + # Get the indices of the states just before and just after. + query_stamps = np.array(query_stamps) + state_list = np.array(state_list) + stamp_list = [state.stamp for state in state_list] + stamp_list.sort() + stamp_list = np.array(stamp_list) + if method == "linear": + idx_middle = np.interp( + query_stamps, stamp_list, np.array(range(len(stamp_list))) + ) + idx_lower = np.floor(idx_middle).astype(int) + idx_upper = idx_lower + 1 + + before_start = query_stamps < stamp_list[0] + after_end = idx_upper >= len(state_list) + inside = np.logical_not(np.logical_or(before_start, after_end)) + + # Return endpoint if out of bounds + idx_upper[idx_upper == len(state_list)] = len(state_list) - 1 + + # ############ Do the interpolation ################# + stamp_lower = stamp_list[idx_lower] + stamp_upper = stamp_list[idx_upper] + + # "Fraction" of the way between the two states + alpha = np.zeros(len(query_stamps)) + alpha[inside] = np.array( + (query_stamps[inside] - stamp_lower[inside]) + / (stamp_upper[inside] - stamp_lower[inside]) + ).ravel() + + # The two neighboring states around the query point + state_lower: List[State] = np.array(state_list[idx_lower]).ravel() + state_upper: List[State] = np.array(state_list[idx_upper]).ravel() + + # Interpolate between the two states + dx = np.array( + [s.minus(state_lower[i]).ravel() for i, s in enumerate(state_upper)] + ) + + out = [] + for i, state in enumerate(state_lower): + if np.isnan(alpha[i]) or np.isinf(alpha[i]) or alpha[i] < 0.0: + raise RuntimeError("wtf") + + state_interp = state.plus(dx[i] * alpha[i]) + + state_interp.stamp = query_stamps[i] + out.append(state_interp) + + elif method == "nearest": + indexes = np.array(range(len(stamp_list))) + nearest_state = interp1d( + stamp_list, + indexes, + "nearest", + bounds_error=False, + fill_value="extrapolate", + ) + state_idx = nearest_state(query_stamps).astype(int) + out = state_list[state_idx].tolist() + + if single_query: + out = out[0] + + return out class GaussianResult: @@ -422,6 +554,90 @@ def nees_upper_bound(self, confidence_interval: float, double_sided=True): ) +class IMMResult(GaussianResult): + __slots__ = [ + "stamp", + "state", + "state_true", + "covariance", + "error", + "ees", + "nees", + "md", + "three_sigma", + "model_probabilities", + ] + + def __init__(self, imm_estimate: IMMState, state_true: State): + super().__init__( + gaussian_mixing( + imm_estimate.model_probabilities, imm_estimate.model_states + ), + state_true, + ) + + self.model_probabilities = imm_estimate.model_probabilities + + +class IMMResultList(GaussianResultList): + __slots__ = [ + "stamp", + "state", + "state_true", + "covariance", + "error", + "ees", + "nees", + "md", + "three_sigma", + "value", + "value_true", + "dof", + "model_probabilities", + ] + + def __init__(self, result_list: List[IMMResult]): + super().__init__(result_list) + self.model_probabilities = np.array( + [r.model_probabilities for r in result_list] + ) + + @staticmethod + def from_estimates( + estimate_list: List[IMMState], + state_true_list: List[State], + method="nearest", + ): + """ + A convenience function that creates a IMMResultList from a list of + IMMState and a list of true State objects + + Parameters + ---------- + estimate_list : List[IMMState] + A list of IMMState objects + state_true_list : List[State] + A list of true State objects + method : "nearest" or "linear", optional + The method used to interpolate the true state when the timestamps + do not line up exactly, by default "nearest". + + Returns + ------- + IMMResultList + A IMMResultList object + """ + stamps = [r.model_states[0].stamp for r in estimate_list] + + state_true_list = state_interp(stamps, state_true_list, method=method) + return IMMResultList( + [ + IMMResult(estimate, state_true) + for estimate, state_true in zip(estimate_list, state_true_list) + ] + ) + + def monte_carlo( trial: Callable[[int], GaussianResultList], num_trials: int, @@ -550,863 +766,753 @@ def van_loans( return A_d, Q_d -def plot_error( - results: GaussianResultList, - axs: List[plt.Axes] = None, - label: str = None, - sharey: bool = False, - color=None, - bounds=True, -) -> Tuple[plt.Figure, List[plt.Axes]]: - """ - A generic three-sigma bound plotter. +def schedule_sequential_measurements(model_list, freq): + """Schedules sequential measurements from a list of MeasurementModels + that cannot generate measurements at the same time. This allows + looping through the measurement model list one at a time. Parameters ---------- - results : GaussianResultList - Contains the data to plot - axs : List[plt.Axes], optional - Axes to draw on, by default None. If None, new axes will be created. - label : str, optional - Text label to add, by default None - sharey : bool, optional - Whether to have a common y axis or not, by default False - color : color, optional - specify the color of the error/bounds. + model_list: List[MeasurementModel] + The list of sequential MeasurementModels. + freq: float + The overall frequency in which all the measurements are generated. Returns ------- - plt.Figure - Handle to figure. - List[plt.Axes] - Handle to axes that were drawn on. + List[float] + The list of initial offsets associated with each MeasurementModel. + float + The reduced frequency at which each individual MeasurementModel + generates measurements. """ + n_models = len(model_list) + offset_list = [None] * n_models + offset_step = 1 / freq + new_freq = freq / n_models - dim = results.error.shape[1] - - if dim < 3: - n_rows = dim - else: - n_rows = 3 + for i in range(n_models): + offset_list[i] = i * offset_step - n_cols = int(np.ceil(dim / 3)) + return offset_list, new_freq - if axs is None: - fig, axs = plt.subplots(n_rows, n_cols, sharex=True, sharey=sharey) - else: - fig: plt.Figure = axs.ravel("F")[0].get_figure() - - axs_og = axs - kwargs = {} - if color is not None: - kwargs["color"] = color - - axs: List[plt.Axes] = axs.ravel("F") - for i in range(results.three_sigma.shape[1]): - if bounds: - axs[i].fill_between( - results.stamp, - results.three_sigma[:, i], - -results.three_sigma[:, i], - alpha=0.5, - **kwargs, - ) - axs[i].plot(results.stamp, results.error[:, i], label=label, **kwargs) - fig: plt.Figure = fig # For type hinting - return fig, axs_og +def associate_stamps( + first_stamps: List[float], + second_stamps: List[float], + offset: float = 0.0, + max_difference: float = 0.02, +) -> List[Tuple[int, int]]: + """ + Associate timestamps. + Returns a sorted list of matches, of length of the smallest of + first_stamps and second_stamps. -def plot_nees( - results: GaussianResultList, - ax: plt.Axes = None, - label: str = None, - color=None, - confidence_interval: float = 0.95, - normalize: bool = False, - expected_nees_color="r", -) -> Tuple[plt.Figure, plt.Axes]: - """ - Makes a plot of the NEES, showing the actual NEES values, the expected NEES, - and the bounds of the specified confidence interval. + Function taken from rpg_trajectory_evaluation toolbox. Parameters ---------- - results : GaussianResultList or MonteCarloResult - Results to plot - ax : plt.Axes, optional - Axes on which to draw, by default None. If None, new axes will be - created. - label : str, optional - Label to assign to the NEES line, by default None - color : optional - Fed directly to the ``plot(..., color=color)`` function, by default None - confidence_interval : float or None, optional - Desired probability confidence region, by default 0.95. Must lie between - 0 and 1. If None, no confidence interval will be plotted. - normalize : bool, optional - Whether to normalize the NEES by the degrees of freedom, by default False + first_stamps : List[float] + List of first timestamps + second_stamps : List[float] + List of second timestamps + offset : float, optional + Offset between the two lists, by default 0.0. + max_difference : float, optional + Maximum difference between stamps in the two list + to be considered a match, by default 0.02. Returns ------- - plt.Figure - Figure on which the plot was drawn - plt.Axes - Axes on which the plot was drawn + List[Tuple[int, int]] + Sorted list of matches in the form (match_first_idx, match_second_idx). """ + potential_matches = [ + (abs(a - (b + offset)), idx_a, idx_b) + for idx_a, a in enumerate(first_stamps) + for idx_b, b in enumerate(second_stamps) + if abs(a - (b + offset)) < max_difference + ] + potential_matches.sort() # prefer the closest + matches = [] + first_idxes = list(range(len(first_stamps))) + second_idxes = list(range(len(second_stamps))) + for diff, idx_a, idx_b in potential_matches: + if idx_a in first_idxes and idx_b in second_idxes: + first_idxes.remove(idx_a) + second_idxes.remove(idx_b) + matches.append((int(idx_a), int(idx_b))) - if ax is None: - fig, ax = plt.subplots( - 1, - 1, - sharex=True, - ) - else: - fig = ax.get_figure() - - axs_og = ax - kwargs = {} - if color is not None: - kwargs["color"] = color - - if normalize: - s = results.dof - else: - s = 1 - - expected_nees_label = "Expected NEES" - _, exisiting_labels = ax.get_legend_handles_labels() - - if expected_nees_label in exisiting_labels: - expected_nees_label = None - - # fmt:off - ax.plot(results.stamp, results.nees/s, label=label, **kwargs) - if confidence_interval: - ci_label = f"${int(confidence_interval*100)}\%$ conf. bounds" - if ci_label in exisiting_labels: - ci_label = None - ax.plot(results.stamp, results.dof/s, label=expected_nees_label, color=expected_nees_color) - ax.plot(results.stamp, results.nees_upper_bound(confidence_interval)/s, "--", color="k", label=ci_label) - ax.plot(results.stamp, results.nees_lower_bound(confidence_interval)/s, "--", color="k") - # fmt:on - - ax.legend() - - return fig, axs_og + matches.sort() + return matches -def plot_meas( - meas_list: List[Measurement], - state_list: List[State], - axs: List[plt.Axes] = None, - sharey=False, -) -> Tuple[plt.Figure, List[plt.Axes]]: +def find_nearest_stamp_idx( + stamps_list: List[float], stamp: Union[float, List[float]] +) -> int: """ - Given measurement data, make time-domain plots of the measurement values - and their ground-truth model-based values. + Find the index of the nearest stamp in ``stamps_list`` to ``stamp``. If + ``stamp`` is a list or array, then the output is a list of indices. Parameters ---------- - meas_list : List[Measurement] - Measurement data to be plotted. - state_list : List[State] - A list of true State objects with similar timestamp domain. Will be - interpolated if timestamps do not line up perfectly. - axs : List[plt.Axes], optional - Axes to draw on, by default None. If None, new axes will be created. - sharey : bool, optional - Whether to have a common y axis or not, by default False + stamps_list : List[float] + List of timestamps + stamp : float or List[float] or numpy.ndarray + Query stamp(s). Returns ------- - plt.Figure - Handle to figure. - List[plt.Axes] - Handle to axes that were drawn on. + int or List[int] + Index of nearest stamp. """ - # Convert everything to numpy arrays for plotting, and compute the - # ground-truth model-based measurement value. - - meas_list.sort(key=lambda y: y.stamp) + if isinstance(stamp, float): + single_query = True + query_stamps = np.array([stamp]) + else: + single_query = False + query_stamps = np.array(stamp).ravel() - # Find the state of the nearest timestamp to the measurement - y_stamps = np.array([y.stamp for y in meas_list]) - x_stamps = np.array([x.stamp for x in state_list]) - indexes = np.array(range(len(state_list))) - nearest_state = interp1d( - x_stamps, - indexes, + nearest_stamp = interp1d( + stamps_list, + np.array(range(len(stamps_list))), "nearest", bounds_error=False, fill_value="extrapolate", ) - state_idx = nearest_state(y_stamps) - y_meas = [] - y_true = [] - three_sigma = [] - for i in range(len(meas_list)): - data = np.ravel(meas_list[i].value) - y_meas.append(data) - x = state_list[int(state_idx[i])] - y = meas_list[i].model.evaluate(x) - if y is None: - y_true.append(np.zeros_like(data) * np.nan) - three_sigma.append(np.zeros_like(data) * np.nan) - else: - y_true.append(np.ravel(y)) - R = np.atleast_2d(meas_list[i].model.covariance(x)) - three_sigma.append(3 * np.sqrt(np.diag(R))) - - y_meas = np.atleast_2d(np.array(y_meas)) - y_true = np.atleast_2d(np.array(y_true)) - three_sigma = np.atleast_2d(np.array(three_sigma)) - y_stamps = np.array(y_stamps) # why is this necessary? - x_stamps = np.array(x_stamps) - - # Plot - size_y = np.size(meas_list[0].value) - if axs is None: - fig, axs = plt.subplots(size_y, 1, sharex=True, sharey=sharey) - else: - if isinstance(axs, plt.Axes): - axs = np.array([axs]) - - fig = axs.ravel("F")[0].get_figure() - axs = np.atleast_1d(axs) - for i in range(size_y): - axs[i].scatter( - y_stamps, y_meas[:, i], color="b", alpha=0.7, s=2, label="Measured" - ) - axs[i].plot( - y_stamps, y_true[:, i], color="r", alpha=1, label="Modelled" - ) - axs[i].fill_between( - y_stamps, - y_true[:, i] + three_sigma[:, i], - y_true[:, i] - three_sigma[:, i], - alpha=0.5, - color="r", - ) - return fig, axs - -def plot_meas_by_model( - meas_list: List[Measurement], - state_list: List[State], - axs: List[plt.Axes] = None, - sharey=False, -) -> Tuple[plt.Figure, List[plt.Axes]]: - """ - Given measurement data, make time-domain plots of the measurement values - and their ground-truth model-based values. - - Parameters - ---------- - meas_list : List[Measurement] - Measurement data to be plotted. - state_list : List[State] - A list of true State objects with similar timestamp domain. Will be - interpolated if timestamps do not line up perfectly. - axs : List[plt.Axes], optional - Axes to draw on, by default None. If None, new axes will be created. - sharey : bool, optional - Whether to have a common y axis or not, by default False + out = nearest_stamp(query_stamps).astype(int).tolist() - Returns - ------- - plt.Figure - Handle to figure. - List[plt.Axes] - Handle to axes that were drawn on. - """ + if single_query: + out = out[0] - # Create sub-lists for every model ID - meas_by_model: Dict[int, List[Measurement]] = {} - for meas in meas_list: - model_id = id(meas.model) - if model_id not in meas_by_model: - meas_by_model[model_id] = [] - meas_by_model[model_id].append(meas) - - if axs is None: - axs = np.array([None] * len(meas_by_model)) - - axs = axs.ravel("F") - - figs = [None] * len(meas_by_model) - for i, temp in enumerate(meas_by_model.items()): - model_id, meas_list = temp - fig, ax = plot_meas(meas_list, state_list, axs[i], sharey=sharey) - figs[i] = fig - axs[i] = ax - ax[0].set_title(f"{meas_list[0].model} {hex(model_id)}", fontsize=12) - ax[0].tick_params(axis="both", which="major", labelsize=10) - ax[0].tick_params(axis="both", which="minor", labelsize=8) - - return fig, axs - - -def plot_poses( - poses, - ax: plt.Axes = None, - line_color: str = None, - triad_color: str = None, - arrow_length: float = 1, - step: int = 5, - label: str = None, - linewidth=None, - plot_2d: bool =False, -): - """ - Plots a pose trajectory, representing the attitudes by triads - plotted along the trajectory. + return out - The poses may be either elements of SE(2), - representing planar 2D poses, or elements of SE(3), representing 3D poses. - Parameters - ---------- - poses : List[Union[SE2State, SE3State]] - A list objects containing a ``position`` property and an attitude - property, representing the rotation matrix :math:``\mathbf{C}_{ab}``. - Can either be 2D or 3D poses. - ax : plt.Axes, optional - Axes to plot on, if none, 3D axes are created. - line_color : str, optional - Color of the position trajectory. - triad_color : str, optional - Triad color. If none are specified, defaults to RGB. - arrow_length : int, optional - Triad arrow length, by default 1. - step : int or None, optional - Step size in list of poses, by default 5. If None, no triads are plotted. - label : str, optional - Optional label for the triad - plot_2d: bool, optional - Flag to plot a 3D pose trajectory in 2D bird's eye view. +def jacobian( + fun: Callable, + x: Union[np.ndarray, State], + step_size=None, + method="forward", + *args, + **kwargs, +) -> np.ndarray: """ - if isinstance(poses, GaussianResultList): - poses = poses.state - - if isinstance(poses, StateWithCovariance): - poses = [poses.state] - - if isinstance(poses, np.ndarray): - poses = poses.tolist() + Compute the Jacobian of a function. Example use: - if not isinstance(poses, list): - poses = [poses] + .. code-block:: python - # Check if poses are in 2D or 3D - if poses[0].position.size == 2: - plot_2d = True + x = np.array([1, 2]).reshape((-1,1)) - # Check if provided axes are in 3D - if ax is not None: - if ax.name == "3d": - plot_2d = False + A = np.array([[1, 2], [3, 4]]) + def fun(x): + return 1/np.sqrt(x.T @ A.T @ A @ x) - if ax is None: - fig = plt.figure() - if plot_2d: - ax = plt.axes() - else: - ax = plt.axes(projection="3d") - else: - fig = ax.get_figure() + jac_test = jacobian(fun, x, method=method) + jac_true = (- x.T @ A.T @ A)/((x.T @ A.T @ A @ x)**(3/2)) - if triad_color is None: - colors = ["tab:red", "tab:green", "tab:blue"] # Default to RGB - else: - colors = [triad_color] * 3 + assert np.allclose(jac_test, jac_true, atol=1e-6) - # Plot a line for the positions - r = np.array([pose.position for pose in poses]) - if plot_2d: - ax.plot(r[:, 0], r[:, 1], color=line_color, label=label) - else: - ax.plot3D(r[:, 0], r[:, 1], r[:, 2], color=line_color, label=label) - - # Plot triads using quiver - if step is not None: - C = np.array([poses[i].attitude.T for i in range(0, len(poses), step)]) - r = np.array([poses[i].position for i in range(0, len(poses), step)]) - if plot_2d: - x, y = r[:, 0], r[:, 1] - ax.quiver( - x, y, - C[:, 0, 0], - C[:, 0, 1], - color=colors[0], - scale=20.0, - headwidth=2, - ) + This function is also compatible with ``State`` objects, and hence + can compute on-manifold derivatives. Example use: - ax.quiver( - x, y, - C[:, 1, 0], - C[:, 1, 1], - color=colors[1], - scale=20.0, - headwidth=2, - ) - else: - x, y, z = r[:, 0], r[:, 1], r[:, 2] - ax.quiver( - x, - y, - z, - C[:, 0, 0], - C[:, 0, 1], - C[:, 0, 2], - color=colors[0], - length=arrow_length, - arrow_length_ratio=0.1, - linewidths=linewidth, - ) - ax.quiver( - x, - y, - z, - C[:, 1, 0], - C[:, 1, 1], - C[:, 1, 2], - color=colors[1], - length=arrow_length, - arrow_length_ratio=0.1, - linewidths=linewidth, - ) - ax.quiver( - x, - y, - z, - C[:, 2, 0], - C[:, 2, 1], - C[:, 2, 2], - color=colors[2], - length=arrow_length, - arrow_length_ratio=0.1, - linewidths=linewidth, - ) + .. code-block:: python - if plot_2d: - ax.axis("equal") - else: - set_axes_equal(ax) - return fig, ax + T = SE23State([0.1,0.2,0.3,4,5,6,7,8,9], direction="right") + def fun(T: SE23State): + # Returns the normalized body-frame velocity vector + C_ab = T.attitude + v_zw_a = T.velocity + v_zw_b = C_ab.T @ v_zw_a + return v_zw_b/np.linalg.norm(v_zw_b) -def set_axes_equal(ax: plt.Axes): - """ - Sets the axes of a 3D plot to have equal scale. + jac_fd = jacobian(fun, T) Parameters ---------- - ax : plt.Axes - Matplotlib axes. - """ - x_limits = ax.get_xlim3d() - y_limits = ax.get_ylim3d() - z_limits = ax.get_zlim3d() - x_range = abs(x_limits[1] - x_limits[0]) - x_middle = np.mean(x_limits) - y_range = abs(y_limits[1] - y_limits[0]) - y_middle = np.mean(y_limits) - z_range = abs(z_limits[1] - z_limits[0]) - z_middle = np.mean(z_limits) - length = 0.5 * max([x_range, y_range, z_range]) - ax.set_xlim3d([x_middle - length, x_middle + length]) - ax.set_ylim3d([y_middle - length, y_middle + length]) - ax.set_zlim3d([z_middle - length, z_middle + length]) - + fun : Callable + function to compute the Jacobian of + x : Union[np.ndarray, State] + input to the function + step_size : float, optional + finite difference step size, by default 1e-6 + method : str, optional + "forward", "central" or "cs", by default "forward". "forward" calculates + using a forward finite difference procedure. "central" calculates using + a central finite difference procedure. "cs" calculates using the + complex-step procedure. If using "cs", you must be careful to ensure + that the function can handle and propagate through complex + components. -def state_interp( - query_stamps: Union[float, List[float], Any], - state_list: List[State], - method="linear", -) -> Union[State, List[State]]: + Returns + ------- + np.ndarray with shape (M, N) + Jacobian of the function, where ``M`` is the DOF of the output and + ``N`` is the DOF of the input. """ - Performs "linear" (geodesic) interpolation between ``State`` objects. Multiple - interpolations can be performed at once in a vectorized fashion. If the - query point is out of bounds, the end points are returned. + x = x.copy() - ..code-block:: python + if step_size is None: + if method == "cs": + step_size = 1e-16 + else: + step_size = 1e-6 - x_data = [SE3State.random(stamp=i) for i in range(10)] - x_query = [0.2, 0.5, 10] - x_interp = state_interp(x_query, x_data) + # Check if input has a plus method. otherwise, assume it will behave + # like a numpy array + if hasattr(x, "plus"): + input_plus = lambda x, dx: x.plus(dx) + else: + input_plus = lambda x, dx: x + dx.reshape(x.shape) - Parameters - ---------- - query_stamps : float or object with ``.stamp`` attribute (or Lists thereof) - Query stamps. Can either be a float, or an object containing a ``stamp`` - attribute. If a list is provided, it will be treated as multiple query - points and the return value will be a list of ``State`` objects. - state_list : List[State] - List of ``State`` objects to interpolate between. + Y_bar: State = fun(x.copy(), *args, **kwargs) - Returns - ------- - ``State`` or List[``State``] - The interpolated state(s). + # Check if output has a minus method. otherwise, assume it will behave + # like a numpy array + if hasattr(Y_bar, "minus"): + output_diff = lambda Y, Y_bar: Y.minus(Y_bar) + else: + output_diff = lambda Y, Y_bar: Y - Y_bar - Raises - ------ - TypeError - If query point is not a float or object with a ``stamp`` attribute. - """ + func_to_diff = lambda dx: output_diff( + fun(input_plus(x, dx), *args, **kwargs), Y_bar + ) - # Handle input - if isinstance(query_stamps, list): - single_query = False - elif isinstance(query_stamps, np.ndarray): - single_query = False - elif isinstance(query_stamps, float): - query_stamps = [query_stamps] - single_query = True + # Check if input/output has a dof attribute. otherwise, assume it will + # behave like a numpy array and use the ``.size`` attribute to get + # the DOF of the input/output + if hasattr(x, "dof"): + N = x.dof else: - pass + N = x.size - # if not isinstance(query_stamps, list): - # query_stamps = [query_stamps] - # single_query = True - # else: - # single_query = False + if hasattr(Y_bar, "dof"): + M = Y_bar.dof + else: + M = Y_bar.size - query_stamps = query_stamps.copy() - for i, stamp in enumerate(query_stamps): - if not isinstance(stamp, (float, int)): - if hasattr(stamp, "stamp"): - stamp = stamp.stamp - query_stamps[i] = stamp - else: - raise TypeError( - "Stamps must be of type float or have a stamp attribute" - ) + Y_bar_diff = func_to_diff(np.zeros((N,))) + jac_fd = np.zeros((M, N)) - # Get the indices of the states just before and just after. - query_stamps = np.array(query_stamps) - state_list = np.array(state_list) - stamp_list = [state.stamp for state in state_list] - stamp_list.sort() - stamp_list = np.array(stamp_list) - if method == "linear": - idx_middle = np.interp( - query_stamps, stamp_list, np.array(range(len(stamp_list))) - ) - idx_lower = np.floor(idx_middle).astype(int) - idx_upper = idx_lower + 1 + # Main loop to calculate jacobian + for i in range(N): + dx = np.zeros((N)) + dx[i] = step_size - before_start = query_stamps < stamp_list[0] - after_end = idx_upper >= len(state_list) - inside = np.logical_not(np.logical_or(before_start, after_end)) + if method == "forward": + Y_plus: State = func_to_diff(dx.copy()) + jac_fd[:, i] = (Y_plus - Y_bar_diff).ravel() / step_size - # Return endpoint if out of bounds - idx_upper[idx_upper == len(state_list)] = len(state_list) - 1 + elif method == "central": + Y_plus = func_to_diff(dx.copy()) + Y_minus = func_to_diff(-dx.copy()) + jac_fd[:, i] = (Y_plus - Y_minus).ravel() / (2 * step_size) - # ############ Do the interpolation ################# - stamp_lower = stamp_list[idx_lower] - stamp_upper = stamp_list[idx_upper] + elif method == "cs": + Y_imag: State = func_to_diff(1j * dx.copy()) + jac_fd[:, i] = np.imag(Y_imag).ravel() / step_size - # "Fraction" of the way between the two states - alpha = np.zeros(len(query_stamps)) - alpha[inside] = np.array( - (query_stamps[inside] - stamp_lower[inside]) - / (stamp_upper[inside] - stamp_lower[inside]) - ).ravel() + else: + raise ValueError( + f"Unknown method '{method}'. " + "Must be 'forward', 'central' or 'cs" + ) - # The two neighboring states around the query point - state_lower: List[State] = np.array(state_list[idx_lower]).ravel() - state_upper: List[State] = np.array(state_list[idx_upper]).ravel() + return jac_fd - # Interpolate between the two states - dx = np.array( - [s.minus(state_lower[i]).ravel() for i, s in enumerate(state_upper)] - ) - - out = [] - for i, state in enumerate(state_lower): - if np.isnan(alpha[i]) or np.isinf(alpha[i]) or alpha[i] < 0.0: - raise RuntimeError("wtf") - state_interp = state.plus(dx[i] * alpha[i]) - - state_interp.stamp = query_stamps[i] - out.append(state_interp) +class GaussianResult: + """ + A data container that simultaneously computes various interesting metrics + about a Gaussian filter's state estimate, given the ground-truth value of + the state. + """ - elif method == "nearest": - indexes = np.array(range(len(stamp_list))) - nearest_state = interp1d( - stamp_list, - indexes, - "nearest", - bounds_error=False, - fill_value="extrapolate", - ) - state_idx = nearest_state(query_stamps).astype(int) - out = state_list[state_idx].tolist() + __slots__ = [ + "stamp", + "state", + "state_true", + "covariance", + "error", + "ees", + "nees", + "md", + "three_sigma", + "rmse", + ] - if single_query: - out = out[0] + def __init__( + self, + estimate: StateWithCovariance, + state_true: State, + ): + """ + Parameters + ---------- + estimate : StateWithCovariance + Estimated state and corresponding covariance. + state_true : State + The true state, which will be used to compute various error metrics. + """ - return out + state = estimate.state + covariance = estimate.covariance + #:float: timestamp + self.stamp = state.stamp + #:State: estimated state + self.state = state + #:State: true state + self.state_true = state_true + #:numpy.ndarray: covariance associated with estimated state + self.covariance = covariance -def schedule_sequential_measurements(model_list, freq): - """Schedules sequential measurements from a list of MeasurementModels - that cannot generate measurements at the same time. This allows - looping through the measurement model list one at a time. + e = state_true.minus(state).reshape((-1, 1)) + #:numpy.ndarray: error vector between estimated and true state + self.error = e.ravel() + #:float: sum of estimation error squared (EES) + self.ees = np.ndarray.item(e.T @ e) + #:float: normalized estimation error squared (NEES) + self.nees = np.ndarray.item(e.T @ np.linalg.solve(covariance, e)) + #:float: root mean squared error (RMSE) + self.rmse = np.sqrt(self.ees / state.dof) + #:float: Mahalanobis distance + self.md = np.sqrt(self.nees) + #:numpy.ndarray: three-sigma bounds on each error component + self.three_sigma = 3 * np.sqrt(np.diag(covariance)) - Parameters - ---------- - model_list: List[MeasurementModel] - The list of sequential MeasurementModels. - freq: float - The overall frequency in which all the measurements are generated. - Returns - ------- - List[float] - The list of initial offsets associated with each MeasurementModel. - float - The reduced frequency at which each individual MeasurementModel - generates measurements. +class GaussianResultList: """ - n_models = len(model_list) - offset_list = [None] * n_models - offset_step = 1 / freq - new_freq = freq / n_models + A data container that accepts a list of ``GaussianResult`` objects and + stacks the attributes in numpy arrays. Convenient for plotting. This object + does nothing more than array-ifying the attributes of ``GaussianResult``. - for i in range(n_models): - offset_list[i] = i * offset_step + This object also supports slicing, which will return a new ``GaussianResultList`` + object with the sliced attributes either through time or through the degrees + of freedom themselves. For example, - return offset_list, new_freq + .. code-block:: python + results = GaussianResultList.from_estimates(estimates, state_true_list) -def associate_stamps( - first_stamps: List[float], - second_stamps: List[float], - offset: float = 0.0, - max_difference: float = 0.02, -) -> List[Tuple[int, int]]: - """ - Associate timestamps. + results[0:10] # returns the first 10 time steps + results[:, 0] # returns the first degree of freedom + results[0:10, 0] # returns the first degree of freedom for the first 10 time steps + results[0:10, [0, 1]] # returns the first two degrees of freedom for the first 10 time steps + results[:, 3:] # returns all degrees of freedom except the first three - Returns a sorted list of matches, of length of the smallest of - first_stamps and second_stamps. + This can be very useful if you want to examine the nees or rmse of just some + states, or if you want to plot the error of just some states. For example, + if you have an SE3State, where the first 3 degrees of freedom are attitude, + and the last 3 are position, you can plot only the attitude error with - Function taken from rpg_trajectory_evaluation toolbox. + .. code-block:: python - Parameters - ---------- - first_stamps : List[float] - List of first timestamps - second_stamps : List[float] - List of second timestamps - offset : float, optional - Offset between the two lists, by default 0.0. - max_difference : float, optional - Maximum difference between stamps in the two list - to be considered a match, by default 0.02. + nav.plot_error(results[:, 0:3]) + + and likewise get only the attitude NEES with ``results[:, 0:3].nees``. - Returns - ------- - List[Tuple[int, int]] - Sorted list of matches in the form (match_first_idx, match_second_idx). """ - potential_matches = [ - (abs(a - (b + offset)), idx_a, idx_b) - for idx_a, a in enumerate(first_stamps) - for idx_b, b in enumerate(second_stamps) - if abs(a - (b + offset)) < max_difference + + __slots__ = [ + "stamp", + "state", + "state_true", + "covariance", + "error", + "ees", + "nees", + "md", + "three_sigma", + "value", + "value_true", + "dof", + "rmse", ] - potential_matches.sort() # prefer the closest - matches = [] - first_idxes = list(range(len(first_stamps))) - second_idxes = list(range(len(second_stamps))) - for diff, idx_a, idx_b in potential_matches: - if idx_a in first_idxes and idx_b in second_idxes: - first_idxes.remove(idx_a) - second_idxes.remove(idx_b) - matches.append((int(idx_a), int(idx_b))) - matches.sort() - return matches + def __init__(self, result_list: List[GaussianResult]): + """ + Parameters + ---------- + result_list : List[GaussianResult] + A list of GaussianResult, intended such that each element corresponds + to a different time point -def find_nearest_stamp_idx( - stamps_list: List[float], stamp: Union[float, List[float]] -) -> int: - """ - Find the index of the nearest stamp in ``stamps_list`` to ``stamp``. If - ``stamp`` is a list or array, then the output is a list of indices. + Let ``N = len(result_list)`` + """ + #:numpy.ndarray with shape (N,): timestamp + self.stamp = np.array([r.stamp for r in result_list]) + #:numpy.ndarray with shape (N,): numpy array of State objects + self.state: List[State] = np.array([r.state for r in result_list]) + #:numpy.ndarray with shape (N,): numpy array of true State objects + self.state_true: List[State] = np.array( + [r.state_true for r in result_list] + ) + #:numpy.ndarray with shape (N,dof,dof): covariance + self.covariance: np.ndarray = np.array( + [r.covariance for r in result_list] + ) + #:numpy.ndarray with shape (N, dof): error throughout trajectory + self.error = np.array([r.error for r in result_list]) + #:numpy.ndarray with shape (N,): EES throughout trajectory + self.ees = np.array([r.ees for r in result_list]) + #:numpy.ndarray with shape (N,): EES throughout trajectory + self.rmse = np.array([r.rmse for r in result_list]) + #:numpy.ndarray with shape (N,): NEES throughout trajectory + self.nees = np.array([r.nees for r in result_list]) + #:numpy.ndarray with shape (N,): Mahalanobis distance throughout trajectory + self.md = np.array([r.md for r in result_list]) + #:numpy.ndarray with shape (N, dof): three-sigma bounds + self.three_sigma = np.array([r.three_sigma for r in result_list]) + #:numpy.ndarray with shape (N,): state value. type depends on implementation + self.value = np.array([r.state.value for r in result_list]) + #:numpy.ndarray with shape (N,): dof throughout trajectory + self.dof = np.array([r.state.dof for r in result_list]) + #:numpy.ndarray with shape (N,): true state value. type depends on implementation + self.value_true = np.array([r.state_true.value for r in result_list]) - Parameters - ---------- - stamps_list : List[float] - List of timestamps - stamp : float or List[float] or numpy.ndarray - Query stamp(s). + def __getitem__(self, key): + if isinstance(key, tuple): + if not len(key) == 2: + raise IndexError("Only two dimensional indexing is supported") + else: + key = (key, slice(None, None, None)) - Returns - ------- - int or List[int] - Index of nearest stamp. - """ + key_lists = list(key) # make mutable + for i, k in enumerate(key): + if isinstance(k, int): + key_lists[i] = [k] + elif isinstance(k, slice): + start = k.start if k.start is not None else 0 + stop = k.stop if k.stop is not None else self.error.shape[i] + step = k.step if k.step is not None else 1 + key_lists[i] = list(range(start, stop, step)) + elif isinstance(k, list): + pass + else: + raise TypeError("keys must be int, slice, or list of indices") - if isinstance(stamp, float): - single_query = True - query_stamps = np.array([stamp]) - else: - single_query = False - query_stamps = np.array(stamp).ravel() + key1, key2 = key + out = GaussianResultList([]) + out.stamp = self.stamp[key1] + out.state = self.state[key1] + out.state_true = self.state_true[key1] + out.covariance = self.covariance[ + np.ix_(key_lists[0], key_lists[1], key_lists[1]) + ] # (N, key_size, key_size) + out.error = self.error[key1, key2] # (N, key_size) + out.ees = np.sum(np.atleast_2d(out.error**2), axis=1) - nearest_stamp = interp1d( - stamps_list, - np.array(range(len(stamps_list))), - "nearest", - bounds_error=False, - fill_value="extrapolate", - ) + if len(out.error.shape) == 1: + out.nees = out.error**2 / out.covariance.flatten() + out.dof = np.ones_like(out.stamp) + else: + out.nees = np.sum( + out.error * np.linalg.solve(out.covariance, out.error), axis=1 + ) + out.dof = out.error.shape[1] * np.ones_like(out.stamp) - out = nearest_stamp(query_stamps).astype(int).tolist() + out.md = np.sqrt(out.nees) + out.three_sigma = 3 * np.sqrt( + np.diagonal(out.covariance, axis1=1, axis2=2) + ) + out.rmse = np.sqrt(out.ees / out.dof) + out.value = self.value[key1] + out.value_true = self.value_true[key1] + out.covariance = out.covariance.squeeze() + out.error = out.error.squeeze() + + return out + + def nees_lower_bound(self, confidence_interval: float): + """ + Calculates the NEES lower bound throughout the trajectory. + + Parameters + ---------- + confidence_interval : float + Single-sided cumulative probability threshold that defines the bound. + Must be between 0 and 1 + + Returns + ------- + numpy.ndarray with shape (N,) + NEES value corresponding to confidence interval + + + An example of how to make a NEES plot with both upper and lower bounds: + + .. code-block:: python + + ax.plot(results.stamp, results.nees) + ax.plot(results.stamp, results.nees_lower_bound(0.99)) + ax.plot(results.stamp, results.nees_upper_bound(0.99)) + """ + if confidence_interval >= 1 or confidence_interval <= 0: + raise ValueError("Confidence interval must lie in (0, 1)") + + lower_bound_threshold = (1 - confidence_interval) / 2 + return chi2.ppf(lower_bound_threshold, df=self.dof) + + def nees_upper_bound(self, confidence_interval: float, double_sided=True): + """ + Calculates the NEES upper bound throughout the trajectory + + Parameters + ---------- + confidence_interval : float + Cumulative probability threshold that defines the bound. Must be + between 0 and 1. + double_sided : bool, optional + Whether the provided threshold is single-sided or double sided, + by default True + + Returns + ------- + numpy.ndarray with shape (N,) + NEES value corresponding to confidence interval + + An example of how to make a NEES plot with only upper bounds: + + .. code-block:: python + + ax.plot(results.stamp, results.nees) + ax.plot(results.stamp, results.nees_upper_bound(0.99, double_sided=False)) + + """ + if confidence_interval >= 1 or confidence_interval <= 0: + raise ValueError("Confidence interval must lie in (0, 1)") + + upper_bound_threshold = confidence_interval + if double_sided: + upper_bound_threshold += (1 - confidence_interval) / 2 + + return chi2.ppf(upper_bound_threshold, df=self.dof) + + @staticmethod + def from_estimates( + estimate_list: List[StateWithCovariance], + state_true_list: List[State], + method="nearest", + ): + """ + A convenience function that creates a GaussianResultList from a list of + StateWithCovariance and a list of true State objects + + Parameters + ---------- + estimate_list : List[StateWithCovariance] + A list of StateWithCovariance objects + state_true_list : List[State] + A list of true State objects + method : "nearest" or "linear", optional + The method used to interpolate the true state when the timestamps + do not line up exactly, by default "nearest". - if single_query: - out = out[0] + Returns + ------- + GaussianResultList + A GaussianResultList object + """ + stamps = [r.stamp for r in estimate_list] - return out + state_true_list = state_interp(stamps, state_true_list, method=method) + return GaussianResultList( + [ + GaussianResult(estimate, state_true) + for estimate, state_true in zip(estimate_list, state_true_list) + ] + ) -def jacobian( - fun: Callable, - x: Union[np.ndarray, State], - step_size=None, - method="forward", - *args, - **kwargs, -) -> np.ndarray: +class MonteCarloResult: + """ + A data container which computes various interesting metrics associated with + Monte Carlo experiments, such as the average estimation error squared (EES) + and the average normalized EES. """ - Compute the Jacobian of a function. Example use: - - .. code-block:: python - x = np.array([1, 2]).reshape((-1,1)) + def __init__(self, trial_results: List[GaussianResultList]): + """ + Parameters + ---------- + trial_results : List[GaussianResultList] + Each GaussianResultList corresponds to a trial. This object assumes + that the timestamps in each trial are identical. - A = np.array([[1, 2], [3, 4]]) - def fun(x): - return 1/np.sqrt(x.T @ A.T @ A @ x) - jac_test = jacobian(fun, x, method=method) - jac_true = (- x.T @ A.T @ A)/((x.T @ A.T @ A @ x)**(3/2)) + Let ``N`` denote the number of time steps in a trial. + """ - assert np.allclose(jac_test, jac_true, atol=1e-6) + #:List[GaussianResultList]: raw trial results + self.trial_results = trial_results + #:int: number of trials + self.num_trials = len(trial_results) + #:numpy.ndarray with shape (N,): timestamps throughout trajectory + self.stamp = trial_results[0].stamp + #:numpy.ndarray with shape (N,): average NEES throughout trajectory + self.average_nees: np.ndarray = np.average( + np.array([t.nees for t in trial_results]), axis=0 + ) + self.nees = self.average_nees + #:numpy.ndarray with shape (N,): average EES throughout trajectory + self.average_ees: np.ndarray = np.average( + np.array([t.ees for t in trial_results]), axis=0 + ) + self.ees = self.average_ees + #:numpy.ndarray with shape (N,dof): root-mean-squared error of each component + self.rmse: np.ndarray = np.sqrt( + np.average( + np.power(np.array([t.error for t in trial_results]), 2), axis=0 + ) + ) + #:numpy.ndarray with shape (N,): Total RMSE, this can be meaningless if units differ in a state + self.total_rmse: np.ndarray = np.sqrt(self.average_ees) + #:numpy.ndarray with shape (N,1): expected NEES value throughout trajectory + self.expected_nees: np.ndarray = np.array(trial_results[0].dof) + #:numpy.ndarray with shape (N): dof throughout trajectory + self.dof: np.ndarray = trial_results[0].dof - This function is also compatible with ``State`` objects, and hence - can compute on-manifold derivatives. Example use: + def nees_lower_bound(self, confidence_interval: float): + """ + Calculates the NEES lower bound throughout the trajectory. - .. code-block:: python + Parameters + ---------- + confidence_interval : float + Single-sided cumulative probability threshold that defines the bound. + Must be between 0 and 1 - T = SE23State([0.1,0.2,0.3,4,5,6,7,8,9], direction="right") + Returns + ------- + numpy.ndarray with shape (N,) + NEES value corresponding to confidence interval + """ + if confidence_interval >= 1 or confidence_interval <= 0: + raise ValueError("Confidence interval must lie in (0, 1)") - def fun(T: SE23State): - # Returns the normalized body-frame velocity vector - C_ab = T.attitude - v_zw_a = T.velocity - v_zw_b = C_ab.T @ v_zw_a - return v_zw_b/np.linalg.norm(v_zw_b) + lower_bound_threshold = (1 - confidence_interval) / 2 + return ( + chi2.ppf(lower_bound_threshold, df=self.num_trials * self.dof) + / self.num_trials + ) - jac_fd = jacobian(fun, T) + def nees_upper_bound(self, confidence_interval: float, double_sided=True): + """ + Calculates the NEES upper bound throughout the trajectory - Parameters - ---------- - fun : Callable - function to compute the Jacobian of - x : Union[np.ndarray, State] - input to the function - step_size : float, optional - finite difference step size, by default 1e-6 - method : str, optional - "forward", "central" or "cs", by default "forward". "forward" calculates - using a forward finite difference procedure. "central" calculates using - a central finite difference procedure. "cs" calculates using the - complex-step procedure. If using "cs", you must be careful to ensure - that the function can handle and propagate through complex - components. + Parameters + ---------- + confidence_interval : float + Cumulative probability threshold that defines the bound. Must be + between 0 and 1. + double_sided : bool, optional + Whether the provided threshold is single-sided or double sided, + by default True - Returns - ------- - np.ndarray with shape (M, N) - Jacobian of the function, where ``M`` is the DOF of the output and - ``N`` is the DOF of the input. - """ - x = x.copy() + Returns + ------- + numpy.ndarray with shape (N,) + NEES value corresponding to confidence interval - if step_size is None: - if method == "cs": - step_size = 1e-16 - else: - step_size = 1e-6 + """ + if confidence_interval >= 1 or confidence_interval <= 0: + raise ValueError("Confidence interval must lie in (0, 1)") - # Check if input has a plus method. otherwise, assume it will behave - # like a numpy array - if hasattr(x, "plus"): - input_plus = lambda x, dx: x.plus(dx) - else: - input_plus = lambda x, dx: x + dx.reshape(x.shape) + upper_bound_threshold = confidence_interval + if double_sided: + upper_bound_threshold += (1 - confidence_interval) / 2 - Y_bar: State = fun(x.copy(), *args, **kwargs) + return ( + chi2.ppf(upper_bound_threshold, df=self.num_trials * self.dof) + / self.num_trials + ) - # Check if output has a minus method. otherwise, assume it will behave - # like a numpy array - if hasattr(Y_bar, "minus"): - output_diff = lambda Y, Y_bar: Y.minus(Y_bar) - else: - output_diff = lambda Y, Y_bar: Y - Y_bar - func_to_diff = lambda dx: output_diff( - fun(input_plus(x, dx), *args, **kwargs), Y_bar - ) +class IMMResult(GaussianResult): + __slots__ = [ + "stamp", + "state", + "state_true", + "covariance", + "error", + "ees", + "nees", + "md", + "three_sigma", + "model_probabilities", + ] - # Check if input/output has a dof attribute. otherwise, assume it will - # behave like a numpy array and use the ``.size`` attribute to get - # the DOF of the input/output - if hasattr(x, "dof"): - N = x.dof - else: - N = x.size + def __init__(self, imm_estimate: IMMState, state_true: State): + super().__init__( + gaussian_mixing( + imm_estimate.model_probabilities, imm_estimate.model_states + ), + state_true, + ) - if hasattr(Y_bar, "dof"): - M = Y_bar.dof - else: - M = Y_bar.size + self.model_probabilities = imm_estimate.model_probabilities - Y_bar_diff = func_to_diff(np.zeros((N,))) - jac_fd = np.zeros((M, N)) - # Main loop to calculate jacobian - for i in range(N): - dx = np.zeros((N)) - dx[i] = step_size +class IMMResultList(GaussianResultList): + __slots__ = [ + "stamp", + "state", + "state_true", + "covariance", + "error", + "ees", + "nees", + "md", + "three_sigma", + "value", + "value_true", + "dof", + "model_probabilities", + ] - if method == "forward": - Y_plus: State = func_to_diff(dx.copy()) - jac_fd[:, i] = (Y_plus - Y_bar_diff).ravel() / step_size + def __init__(self, result_list: List[IMMResult]): + super().__init__(result_list) + self.model_probabilities = np.array( + [r.model_probabilities for r in result_list] + ) - elif method == "central": - Y_plus = func_to_diff(dx.copy()) - Y_minus = func_to_diff(-dx.copy()) - jac_fd[:, i] = (Y_plus - Y_minus).ravel() / (2 * step_size) + @staticmethod + def from_estimates( + estimate_list: List[IMMState], + state_true_list: List[State], + method="nearest", + ): + """ + A convenience function that creates a IMMResultList from a list of + IMMState and a list of true State objects - elif method == "cs": - Y_imag: State = func_to_diff(1j * dx.copy()) - jac_fd[:, i] = np.imag(Y_imag).ravel() / step_size + Parameters + ---------- + estimate_list : List[IMMState] + A list of IMMState objects + state_true_list : List[State] + A list of true State objects + method : "nearest" or "linear", optional + The method used to interpolate the true state when the timestamps + do not line up exactly, by default "nearest". - else: - raise ValueError( - f"Unknown method '{method}'. " - "Must be 'forward', 'central' or 'cs" - ) + Returns + ------- + IMMResultList + A IMMResultList object + """ + stamps = [r.model_states[0].stamp for r in estimate_list] - return jac_fd + state_true_list = state_interp(stamps, state_true_list, method=method) + return IMMResultList( + [ + IMMResult(estimate, state_true) + for estimate, state_true in zip(estimate_list, state_true_list) + ] + ) diff --git a/navlie/utils/mixture.py b/navlie/utils/mixture.py new file mode 100644 index 00000000..f1ebe27a --- /dev/null +++ b/navlie/utils/mixture.py @@ -0,0 +1,147 @@ +""" +Collection of function required for mixing Gaussian distributions. +""" + +from typing import List + +from navlie.types import ( + State, + Input, + Measurement, + StateWithCovariance, +) +import numpy as np +from navlie.lib import IMMState +from tqdm import tqdm + + +# TODO. The IMM seems to have an issue when the user accidently modifies the +# provided state in the process model. + + + +def gaussian_mixing_vectorspace( + weights: List[float], means: List[np.ndarray], covariances: List[np.ndarray] +): + """ + Calculate the mean and covariance of a Gaussian mixture on a vectorspace. + + Parameters + ---------- + weights : List[Float] + Weights corresponding to each Gaussian. + means: List[np.ndarray] + List containing the means of each Gaussian mixture + covariances + List containing covariances of each Gaussian mixture + + Returns + ------- + np.ndarray + Mean of Gaussian mixture + np.ndarray + Covariance of Gaussian mixture + """ + + x_bar = np.zeros(means[0].shape) + P_bar = np.zeros(covariances[0].shape) + + for weight, x in zip(weights, means): + x_bar = x_bar + weight * x + + for weight, x, P in zip(weights, means, covariances): + dx = (x - x_bar).reshape(-1, 1) + P_bar = P_bar + weight * P + weight * dx @ dx.T + + return x_bar, P_bar + + +def reparametrize_gaussians_about_X_par( + X_par: State, X_list: List[StateWithCovariance] +): + """ + Reparametrize each Lie group Gaussian in X_list about X_par. + A Lie group Gaussian is only relevant in the tangent space of its own mean. + To mix Lie group Gaussians, a specific X, X_par, must be chosen to expand + a tangent space around. Once expanded in this common tangent space, + the Gaussians may be mixed in a vector space fashion. + + Parameters + ---------- + X_par : State + Each member of X_list will be reparametrized as a Gaussian + in the tangent space of X_par. + X_list : List[StateWithCovariance] + List of Lie group Gaussians to be reparametrized. + + Returns + ------- + List[np.ndarray] + Tangent space of X_par mean of each element of X_list. + List[np.ndarray] + Tangent space of X_par covariance of each element of X_list + """ + means_reparametrized = [] + covariances_reparametrized = [] + + for X in X_list: + mu = X.state.minus(X_par) + Jinv = X.state.minus_jacobian(X_par) + Sigma = Jinv @ X.covariance @ Jinv.T + means_reparametrized.append(mu) + covariances_reparametrized.append(Sigma) + + return means_reparametrized, covariances_reparametrized + + +def update_X(X: State, mu: np.ndarray, P: np.ndarray): + """ + Given a Lie group Gaussian with mean mu and covariance P, expressed in the + tangent space of X, compute Lie group StateAndCovariance X_hat such that the + Lie algebra Gaussian around X_hat has zero mean. + + Parameters + ---------- + X : State + A Lie group element. + mu : np.ndarray + Mean of Gaussian in tangent space of X + P: np.ndarray + Covariance of Gaussian in tangent space of X + + Returns + ------- + StateWithCovariance + StateWithCovariance whose state is a Lie group element. + """ + X_hat = StateWithCovariance(X, np.zeros((X.dof, X.dof))) + X_hat.state = X_hat.state.plus(mu) + + J = X.plus_jacobian(mu) + + X_hat.covariance = J @ P @ J.T + + return X_hat + + +def gaussian_mixing(weights: List[float], x_list: List[StateWithCovariance]): + """A Gaussian mixing method that handles both vectorspace Gaussians + and Gaussians on Lie groups. + + Parameters + ---------- + weights : List[Float] + Weights of Gaussians to be mixed. + x_list : List[StateWithCovariance] + List of Gaussians to be mixed. + Returns + ------- + StateWithCovariance + The mixed Gaussian + """ + max_idx = np.argmax(np.array(weights)) + X_par = x_list[max_idx].state + mu_repar, P_repar = reparametrize_gaussians_about_X_par(X_par, x_list) + x_bar, P_bar = gaussian_mixing_vectorspace(weights, mu_repar, P_repar) + X_mix = update_X(X_par, x_bar, P_bar) + return X_mix diff --git a/navlie/utils/plot.py b/navlie/utils/plot.py new file mode 100644 index 00000000..005997b7 --- /dev/null +++ b/navlie/utils/plot.py @@ -0,0 +1,482 @@ +""" +Collection of miscellaneous plotting functions. +""" + +from typing import List, Tuple, Dict +from navlie.types import State, Measurement, StateWithCovariance +import numpy as np +import matplotlib.pyplot as plt +from scipy.interpolate import interp1d +from navlie.utils.common import GaussianResultList + +def plot_error( + results: GaussianResultList, + axs: List[plt.Axes] = None, + label: str = None, + sharey: bool = False, + color=None, + bounds=True, +) -> Tuple[plt.Figure, List[plt.Axes]]: + """ + A generic three-sigma bound plotter. + + Parameters + ---------- + results : GaussianResultList + Contains the data to plot + axs : List[plt.Axes], optional + Axes to draw on, by default None. If None, new axes will be created. + label : str, optional + Text label to add, by default None + sharey : bool, optional + Whether to have a common y axis or not, by default False + color : color, optional + specify the color of the error/bounds. + + Returns + ------- + plt.Figure + Handle to figure. + List[plt.Axes] + Handle to axes that were drawn on. + """ + + dim = results.error.shape[1] + + if dim < 3: + n_rows = dim + else: + n_rows = 3 + + n_cols = int(np.ceil(dim / 3)) + + if axs is None: + fig, axs = plt.subplots(n_rows, n_cols, sharex=True, sharey=sharey) + else: + fig: plt.Figure = axs.ravel("F")[0].get_figure() + + axs_og = axs + kwargs = {} + if color is not None: + kwargs["color"] = color + + axs: List[plt.Axes] = axs.ravel("F") + for i in range(results.three_sigma.shape[1]): + if bounds: + axs[i].fill_between( + results.stamp, + results.three_sigma[:, i], + -results.three_sigma[:, i], + alpha=0.5, + **kwargs, + ) + axs[i].plot(results.stamp, results.error[:, i], label=label, **kwargs) + + fig: plt.Figure = fig # For type hinting + return fig, axs_og + + +def plot_nees( + results: GaussianResultList, + ax: plt.Axes = None, + label: str = None, + color=None, + confidence_interval: float = 0.95, + normalize: bool = False, + expected_nees_color="r", +) -> Tuple[plt.Figure, plt.Axes]: + """ + Makes a plot of the NEES, showing the actual NEES values, the expected NEES, + and the bounds of the specified confidence interval. + + Parameters + ---------- + results : GaussianResultList or MonteCarloResult + Results to plot + ax : plt.Axes, optional + Axes on which to draw, by default None. If None, new axes will be + created. + label : str, optional + Label to assign to the NEES line, by default None + color : optional + Fed directly to the ``plot(..., color=color)`` function, by default None + confidence_interval : float or None, optional + Desired probability confidence region, by default 0.95. Must lie between + 0 and 1. If None, no confidence interval will be plotted. + normalize : bool, optional + Whether to normalize the NEES by the degrees of freedom, by default False + + Returns + ------- + plt.Figure + Figure on which the plot was drawn + plt.Axes + Axes on which the plot was drawn + """ + + if ax is None: + fig, ax = plt.subplots( + 1, + 1, + sharex=True, + ) + else: + fig = ax.get_figure() + + axs_og = ax + kwargs = {} + if color is not None: + kwargs["color"] = color + + if normalize: + s = results.dof + else: + s = 1 + + expected_nees_label = "Expected NEES" + _, exisiting_labels = ax.get_legend_handles_labels() + + if expected_nees_label in exisiting_labels: + expected_nees_label = None + + # fmt:off + ax.plot(results.stamp, results.nees/s, label=label, **kwargs) + if confidence_interval: + ci_label = f"${int(confidence_interval*100)}\%$ conf. bounds" + if ci_label in exisiting_labels: + ci_label = None + ax.plot(results.stamp, results.dof/s, label=expected_nees_label, color=expected_nees_color) + ax.plot(results.stamp, results.nees_upper_bound(confidence_interval)/s, "--", color="k", label=ci_label) + ax.plot(results.stamp, results.nees_lower_bound(confidence_interval)/s, "--", color="k") + # fmt:on + + ax.legend() + + return fig, axs_og + + +def plot_meas( + meas_list: List[Measurement], + state_list: List[State], + axs: List[plt.Axes] = None, + sharey=False, +) -> Tuple[plt.Figure, List[plt.Axes]]: + """ + Given measurement data, make time-domain plots of the measurement values + and their ground-truth model-based values. + + Parameters + ---------- + meas_list : List[Measurement] + Measurement data to be plotted. + state_list : List[State] + A list of true State objects with similar timestamp domain. Will be + interpolated if timestamps do not line up perfectly. + axs : List[plt.Axes], optional + Axes to draw on, by default None. If None, new axes will be created. + sharey : bool, optional + Whether to have a common y axis or not, by default False + + Returns + ------- + plt.Figure + Handle to figure. + List[plt.Axes] + Handle to axes that were drawn on. + """ + + # Convert everything to numpy arrays for plotting, and compute the + # ground-truth model-based measurement value. + + meas_list.sort(key=lambda y: y.stamp) + + # Find the state of the nearest timestamp to the measurement + y_stamps = np.array([y.stamp for y in meas_list]) + x_stamps = np.array([x.stamp for x in state_list]) + indexes = np.array(range(len(state_list))) + nearest_state = interp1d( + x_stamps, + indexes, + "nearest", + bounds_error=False, + fill_value="extrapolate", + ) + state_idx = nearest_state(y_stamps) + y_meas = [] + y_true = [] + three_sigma = [] + for i in range(len(meas_list)): + data = np.ravel(meas_list[i].value) + y_meas.append(data) + x = state_list[int(state_idx[i])] + y = meas_list[i].model.evaluate(x) + if y is None: + y_true.append(np.zeros_like(data) * np.nan) + three_sigma.append(np.zeros_like(data) * np.nan) + else: + y_true.append(np.ravel(y)) + R = np.atleast_2d(meas_list[i].model.covariance(x)) + three_sigma.append(3 * np.sqrt(np.diag(R))) + + y_meas = np.atleast_2d(np.array(y_meas)) + y_true = np.atleast_2d(np.array(y_true)) + three_sigma = np.atleast_2d(np.array(three_sigma)) + y_stamps = np.array(y_stamps) # why is this necessary? + x_stamps = np.array(x_stamps) + + # Plot + size_y = np.size(meas_list[0].value) + if axs is None: + fig, axs = plt.subplots(size_y, 1, sharex=True, sharey=sharey) + else: + if isinstance(axs, plt.Axes): + axs = np.array([axs]) + + fig = axs.ravel("F")[0].get_figure() + axs = np.atleast_1d(axs) + for i in range(size_y): + axs[i].scatter( + y_stamps, y_meas[:, i], color="b", alpha=0.7, s=2, label="Measured" + ) + axs[i].plot( + y_stamps, y_true[:, i], color="r", alpha=1, label="Modelled" + ) + axs[i].fill_between( + y_stamps, + y_true[:, i] + three_sigma[:, i], + y_true[:, i] - three_sigma[:, i], + alpha=0.5, + color="r", + ) + return fig, axs + + +def plot_meas_by_model( + meas_list: List[Measurement], + state_list: List[State], + axs: List[plt.Axes] = None, + sharey=False, +) -> Tuple[plt.Figure, List[plt.Axes]]: + """ + Given measurement data, make time-domain plots of the measurement values + and their ground-truth model-based values. + + Parameters + ---------- + meas_list : List[Measurement] + Measurement data to be plotted. + state_list : List[State] + A list of true State objects with similar timestamp domain. Will be + interpolated if timestamps do not line up perfectly. + axs : List[plt.Axes], optional + Axes to draw on, by default None. If None, new axes will be created. + sharey : bool, optional + Whether to have a common y axis or not, by default False + + Returns + ------- + plt.Figure + Handle to figure. + List[plt.Axes] + Handle to axes that were drawn on. + """ + + # Create sub-lists for every model ID + meas_by_model: Dict[int, List[Measurement]] = {} + for meas in meas_list: + model_id = id(meas.model) + if model_id not in meas_by_model: + meas_by_model[model_id] = [] + meas_by_model[model_id].append(meas) + + if axs is None: + axs = np.array([None] * len(meas_by_model)) + + axs = axs.ravel("F") + + figs = [None] * len(meas_by_model) + for i, temp in enumerate(meas_by_model.items()): + model_id, meas_list = temp + fig, ax = plot_meas(meas_list, state_list, axs[i], sharey=sharey) + figs[i] = fig + axs[i] = ax + ax[0].set_title(f"{meas_list[0].model} {hex(model_id)}", fontsize=12) + ax[0].tick_params(axis="both", which="major", labelsize=10) + ax[0].tick_params(axis="both", which="minor", labelsize=8) + + return fig, axs + + +def plot_poses( + poses, + ax: plt.Axes = None, + line_color: str = None, + triad_color: str = None, + arrow_length: float = 1, + step: int = 5, + label: str = None, + linewidth=None, + plot_2d: bool =False, +): + """ + Plots a pose trajectory, representing the attitudes by triads + plotted along the trajectory. + + The poses may be either elements of SE(2), + representing planar 2D poses, or elements of SE(3), representing 3D poses. + + Parameters + ---------- + poses : List[Union[SE2State, SE3State]] + A list objects containing a ``position`` property and an attitude + property, representing the rotation matrix :math:``\mathbf{C}_{ab}``. + Can either be 2D or 3D poses. + ax : plt.Axes, optional + Axes to plot on, if none, 3D axes are created. + line_color : str, optional + Color of the position trajectory. + triad_color : str, optional + Triad color. If none are specified, defaults to RGB. + arrow_length : int, optional + Triad arrow length, by default 1. + step : int or None, optional + Step size in list of poses, by default 5. If None, no triads are plotted. + label : str, optional + Optional label for the triad + plot_2d: bool, optional + Flag to plot a 3D pose trajectory in 2D bird's eye view. + """ + if isinstance(poses, GaussianResultList): + poses = poses.state + + if isinstance(poses, StateWithCovariance): + poses = [poses.state] + + if isinstance(poses, np.ndarray): + poses = poses.tolist() + + if not isinstance(poses, list): + poses = [poses] + + # Check if poses are in 2D or 3D + if poses[0].position.size == 2: + plot_2d = True + + # Check if provided axes are in 3D + if ax is not None: + if ax.name == "3d": + plot_2d = False + + if ax is None: + fig = plt.figure() + if plot_2d: + ax = plt.axes() + else: + ax = plt.axes(projection="3d") + else: + fig = ax.get_figure() + + if triad_color is None: + colors = ["tab:red", "tab:green", "tab:blue"] # Default to RGB + else: + colors = [triad_color] * 3 + + # Plot a line for the positions + r = np.array([pose.position for pose in poses]) + if plot_2d: + ax.plot(r[:, 0], r[:, 1], color=line_color, label=label) + else: + ax.plot3D(r[:, 0], r[:, 1], r[:, 2], color=line_color, label=label) + + # Plot triads using quiver + if step is not None: + C = np.array([poses[i].attitude.T for i in range(0, len(poses), step)]) + r = np.array([poses[i].position for i in range(0, len(poses), step)]) + if plot_2d: + x, y = r[:, 0], r[:, 1] + ax.quiver( + x, y, + C[:, 0, 0], + C[:, 0, 1], + color=colors[0], + scale=20.0, + headwidth=2, + ) + + ax.quiver( + x, y, + C[:, 1, 0], + C[:, 1, 1], + color=colors[1], + scale=20.0, + headwidth=2, + ) + else: + x, y, z = r[:, 0], r[:, 1], r[:, 2] + ax.quiver( + x, + y, + z, + C[:, 0, 0], + C[:, 0, 1], + C[:, 0, 2], + color=colors[0], + length=arrow_length, + arrow_length_ratio=0.1, + linewidths=linewidth, + ) + ax.quiver( + x, + y, + z, + C[:, 1, 0], + C[:, 1, 1], + C[:, 1, 2], + color=colors[1], + length=arrow_length, + arrow_length_ratio=0.1, + linewidths=linewidth, + ) + ax.quiver( + x, + y, + z, + C[:, 2, 0], + C[:, 2, 1], + C[:, 2, 2], + color=colors[2], + length=arrow_length, + arrow_length_ratio=0.1, + linewidths=linewidth, + ) + + if plot_2d: + ax.axis("equal") + else: + set_axes_equal(ax) + return fig, ax + + +def set_axes_equal(ax: plt.Axes): + """ + Sets the axes of a 3D plot to have equal scale. + + Parameters + ---------- + ax : plt.Axes + Matplotlib axes. + """ + x_limits = ax.get_xlim3d() + y_limits = ax.get_ylim3d() + z_limits = ax.get_zlim3d() + x_range = abs(x_limits[1] - x_limits[0]) + x_middle = np.mean(x_limits) + y_range = abs(y_limits[1] - y_limits[0]) + y_middle = np.mean(y_limits) + z_range = abs(z_limits[1] - z_limits[0]) + z_middle = np.mean(z_limits) + length = 0.5 * max([x_range, y_range, z_range]) + ax.set_xlim3d([x_middle - length, x_middle + length]) + ax.set_ylim3d([y_middle - length, y_middle + length]) + ax.set_zlim3d([z_middle - length, z_middle + length]) + \ No newline at end of file diff --git a/tests/integration/test_interacting_multiple_models.py b/tests/integration/test_interacting_multiple_models.py index 34a7bfc7..6a0eda6c 100644 --- a/tests/integration/test_interacting_multiple_models.py +++ b/tests/integration/test_interacting_multiple_models.py @@ -21,9 +21,8 @@ import numpy as np from typing import List -from navlie.imm import InteractingModelFilter, run_imm_filter -from navlie.imm import IMMResultList -from navlie.imm import IMMResult +from navlie.filters import InteractingModelFilter, run_imm_filter +from navlie.utils import IMMResult, IMMResultList # TODO this test is very complicated. we need to simplify this.