From 84bac95a23317d8f4c3c4bbce2f8154820f26b86 Mon Sep 17 00:00:00 2001 From: Syed Shabbir Ahmed Date: Tue, 25 Jun 2024 15:13:49 -0400 Subject: [PATCH] cleaned up utils.common --- navlie/utils/common.py | 586 ++++------------------------------------- 1 file changed, 46 insertions(+), 540 deletions(-) diff --git a/navlie/utils/common.py b/navlie/utils/common.py index b86e38fd..4804dc5f 100644 --- a/navlie/utils/common.py +++ b/navlie/utils/common.py @@ -144,546 +144,6 @@ def state_interp( return out -class GaussianResult: - """ - A data container that simultaneously computes various interesting metrics - about a Gaussian filter's state estimate, given the ground-truth value of - the state. - """ - - __slots__ = [ - "stamp", - "state", - "state_true", - "covariance", - "error", - "ees", - "nees", - "md", - "three_sigma", - "rmse", - ] - - def __init__( - self, - estimate: StateWithCovariance, - state_true: State, - ): - """ - Parameters - ---------- - estimate : StateWithCovariance - Estimated state and corresponding covariance. - state_true : State - The true state, which will be used to compute various error metrics. - """ - - state = estimate.state - covariance = estimate.covariance - - #:float: timestamp - self.stamp = state.stamp - #:State: estimated state - self.state = state - #:State: true state - self.state_true = state_true - #:numpy.ndarray: covariance associated with estimated state - self.covariance = covariance - - e = state_true.minus(state).reshape((-1, 1)) - #:numpy.ndarray: error vector between estimated and true state - self.error = e.ravel() - #:float: sum of estimation error squared (EES) - self.ees = np.ndarray.item(e.T @ e) - #:float: normalized estimation error squared (NEES) - self.nees = np.ndarray.item(e.T @ np.linalg.solve(covariance, e)) - #:float: root mean squared error (RMSE) - self.rmse = np.sqrt(self.ees / state.dof) - #:float: Mahalanobis distance - self.md = np.sqrt(self.nees) - #:numpy.ndarray: three-sigma bounds on each error component - self.three_sigma = 3 * np.sqrt(np.diag(covariance)) - - -class GaussianResultList: - """ - A data container that accepts a list of ``GaussianResult`` objects and - stacks the attributes in numpy arrays. Convenient for plotting. This object - does nothing more than array-ifying the attributes of ``GaussianResult``. - - This object also supports slicing, which will return a new ``GaussianResultList`` - object with the sliced attributes either through time or through the degrees - of freedom themselves. For example, - - .. code-block:: python - - results = GaussianResultList.from_estimates(estimates, state_true_list) - - results[0:10] # returns the first 10 time steps - results[:, 0] # returns the first degree of freedom - results[0:10, 0] # returns the first degree of freedom for the first 10 time steps - results[0:10, [0, 1]] # returns the first two degrees of freedom for the first 10 time steps - results[:, 3:] # returns all degrees of freedom except the first three - - This can be very useful if you want to examine the nees or rmse of just some - states, or if you want to plot the error of just some states. For example, - if you have an SE3State, where the first 3 degrees of freedom are attitude, - and the last 3 are position, you can plot only the attitude error with - - .. code-block:: python - - nav.plot_error(results[:, 0:3]) - - and likewise get only the attitude NEES with ``results[:, 0:3].nees``. - - """ - - __slots__ = [ - "stamp", - "state", - "state_true", - "covariance", - "error", - "ees", - "nees", - "md", - "three_sigma", - "value", - "value_true", - "dof", - "rmse", - ] - - def __init__(self, result_list: List[GaussianResult]): - """ - Parameters - ---------- - result_list : List[GaussianResult] - A list of GaussianResult, intended such that each element corresponds - to a different time point - - - Let ``N = len(result_list)`` - """ - #:numpy.ndarray with shape (N,): timestamp - self.stamp = np.array([r.stamp for r in result_list]) - #:numpy.ndarray with shape (N,): numpy array of State objects - self.state: List[State] = np.array([r.state for r in result_list]) - #:numpy.ndarray with shape (N,): numpy array of true State objects - self.state_true: List[State] = np.array( - [r.state_true for r in result_list] - ) - #:numpy.ndarray with shape (N,dof,dof): covariance - self.covariance: np.ndarray = np.array( - [r.covariance for r in result_list] - ) - #:numpy.ndarray with shape (N, dof): error throughout trajectory - self.error = np.array([r.error for r in result_list]) - #:numpy.ndarray with shape (N,): EES throughout trajectory - self.ees = np.array([r.ees for r in result_list]) - #:numpy.ndarray with shape (N,): EES throughout trajectory - self.rmse = np.array([r.rmse for r in result_list]) - #:numpy.ndarray with shape (N,): NEES throughout trajectory - self.nees = np.array([r.nees for r in result_list]) - #:numpy.ndarray with shape (N,): Mahalanobis distance throughout trajectory - self.md = np.array([r.md for r in result_list]) - #:numpy.ndarray with shape (N, dof): three-sigma bounds - self.three_sigma = np.array([r.three_sigma for r in result_list]) - #:numpy.ndarray with shape (N,): state value. type depends on implementation - self.value = np.array([r.state.value for r in result_list]) - #:numpy.ndarray with shape (N,): dof throughout trajectory - self.dof = np.array([r.state.dof for r in result_list]) - #:numpy.ndarray with shape (N,): true state value. type depends on implementation - self.value_true = np.array([r.state_true.value for r in result_list]) - - def __getitem__(self, key): - if isinstance(key, tuple): - if not len(key) == 2: - raise IndexError("Only two dimensional indexing is supported") - else: - key = (key, slice(None, None, None)) - - key_lists = list(key) # make mutable - for i, k in enumerate(key): - if isinstance(k, int): - key_lists[i] = [k] - elif isinstance(k, slice): - start = k.start if k.start is not None else 0 - stop = k.stop if k.stop is not None else self.error.shape[i] - step = k.step if k.step is not None else 1 - key_lists[i] = list(range(start, stop, step)) - elif isinstance(k, list): - pass - else: - raise TypeError("keys must be int, slice, or list of indices") - - key1, key2 = key - out = GaussianResultList([]) - out.stamp = self.stamp[key1] - out.state = self.state[key1] - out.state_true = self.state_true[key1] - out.covariance = self.covariance[ - np.ix_(key_lists[0], key_lists[1], key_lists[1]) - ] # (N, key_size, key_size) - out.error = self.error[key1, key2] # (N, key_size) - out.ees = np.sum(np.atleast_2d(out.error**2), axis=1) - - if len(out.error.shape) == 1: - out.nees = out.error**2 / out.covariance.flatten() - out.dof = np.ones_like(out.stamp) - else: - out.nees = np.sum( - out.error * np.linalg.solve(out.covariance, out.error), axis=1 - ) - out.dof = out.error.shape[1] * np.ones_like(out.stamp) - - out.md = np.sqrt(out.nees) - out.three_sigma = 3 * np.sqrt( - np.diagonal(out.covariance, axis1=1, axis2=2) - ) - out.rmse = np.sqrt(out.ees / out.dof) - out.value = self.value[key1] - out.value_true = self.value_true[key1] - out.covariance = out.covariance.squeeze() - out.error = out.error.squeeze() - - return out - - def nees_lower_bound(self, confidence_interval: float): - """ - Calculates the NEES lower bound throughout the trajectory. - - Parameters - ---------- - confidence_interval : float - Single-sided cumulative probability threshold that defines the bound. - Must be between 0 and 1 - - Returns - ------- - numpy.ndarray with shape (N,) - NEES value corresponding to confidence interval - - - An example of how to make a NEES plot with both upper and lower bounds: - - .. code-block:: python - - ax.plot(results.stamp, results.nees) - ax.plot(results.stamp, results.nees_lower_bound(0.99)) - ax.plot(results.stamp, results.nees_upper_bound(0.99)) - """ - if confidence_interval >= 1 or confidence_interval <= 0: - raise ValueError("Confidence interval must lie in (0, 1)") - - lower_bound_threshold = (1 - confidence_interval) / 2 - return chi2.ppf(lower_bound_threshold, df=self.dof) - - def nees_upper_bound(self, confidence_interval: float, double_sided=True): - """ - Calculates the NEES upper bound throughout the trajectory - - Parameters - ---------- - confidence_interval : float - Cumulative probability threshold that defines the bound. Must be - between 0 and 1. - double_sided : bool, optional - Whether the provided threshold is single-sided or double sided, - by default True - - Returns - ------- - numpy.ndarray with shape (N,) - NEES value corresponding to confidence interval - - An example of how to make a NEES plot with only upper bounds: - - .. code-block:: python - - ax.plot(results.stamp, results.nees) - ax.plot(results.stamp, results.nees_upper_bound(0.99, double_sided=False)) - - """ - if confidence_interval >= 1 or confidence_interval <= 0: - raise ValueError("Confidence interval must lie in (0, 1)") - - upper_bound_threshold = confidence_interval - if double_sided: - upper_bound_threshold += (1 - confidence_interval) / 2 - - return chi2.ppf(upper_bound_threshold, df=self.dof) - - @staticmethod - def from_estimates( - estimate_list: List[StateWithCovariance], - state_true_list: List[State], - method="nearest", - ): - """ - A convenience function that creates a GaussianResultList from a list of - StateWithCovariance and a list of true State objects - - Parameters - ---------- - estimate_list : List[StateWithCovariance] - A list of StateWithCovariance objects - state_true_list : List[State] - A list of true State objects - method : "nearest" or "linear", optional - The method used to interpolate the true state when the timestamps - do not line up exactly, by default "nearest". - - Returns - ------- - GaussianResultList - A GaussianResultList object - """ - stamps = [r.stamp for r in estimate_list] - - state_true_list = state_interp(stamps, state_true_list, method=method) - return GaussianResultList( - [ - GaussianResult(estimate, state_true) - for estimate, state_true in zip(estimate_list, state_true_list) - ] - ) - - -class MonteCarloResult: - """ - A data container which computes various interesting metrics associated with - Monte Carlo experiments, such as the average estimation error squared (EES) - and the average normalized EES. - """ - - def __init__(self, trial_results: List[GaussianResultList]): - """ - Parameters - ---------- - trial_results : List[GaussianResultList] - Each GaussianResultList corresponds to a trial. This object assumes - that the timestamps in each trial are identical. - - - Let ``N`` denote the number of time steps in a trial. - """ - - #:List[GaussianResultList]: raw trial results - self.trial_results = trial_results - #:int: number of trials - self.num_trials = len(trial_results) - #:numpy.ndarray with shape (N,): timestamps throughout trajectory - self.stamp = trial_results[0].stamp - #:numpy.ndarray with shape (N,): average NEES throughout trajectory - self.average_nees: np.ndarray = np.average( - np.array([t.nees for t in trial_results]), axis=0 - ) - self.nees = self.average_nees - #:numpy.ndarray with shape (N,): average EES throughout trajectory - self.average_ees: np.ndarray = np.average( - np.array([t.ees for t in trial_results]), axis=0 - ) - self.ees = self.average_ees - #:numpy.ndarray with shape (N,dof): root-mean-squared error of each component - self.rmse: np.ndarray = np.sqrt( - np.average( - np.power(np.array([t.error for t in trial_results]), 2), axis=0 - ) - ) - #:numpy.ndarray with shape (N,): Total RMSE, this can be meaningless if units differ in a state - self.total_rmse: np.ndarray = np.sqrt(self.average_ees) - #:numpy.ndarray with shape (N,1): expected NEES value throughout trajectory - self.expected_nees: np.ndarray = np.array(trial_results[0].dof) - #:numpy.ndarray with shape (N): dof throughout trajectory - self.dof: np.ndarray = trial_results[0].dof - - def nees_lower_bound(self, confidence_interval: float): - """ - Calculates the NEES lower bound throughout the trajectory. - - Parameters - ---------- - confidence_interval : float - Single-sided cumulative probability threshold that defines the bound. - Must be between 0 and 1 - - Returns - ------- - numpy.ndarray with shape (N,) - NEES value corresponding to confidence interval - """ - if confidence_interval >= 1 or confidence_interval <= 0: - raise ValueError("Confidence interval must lie in (0, 1)") - - lower_bound_threshold = (1 - confidence_interval) / 2 - return ( - chi2.ppf(lower_bound_threshold, df=self.num_trials * self.dof) - / self.num_trials - ) - - def nees_upper_bound(self, confidence_interval: float, double_sided=True): - """ - Calculates the NEES upper bound throughout the trajectory - - Parameters - ---------- - confidence_interval : float - Cumulative probability threshold that defines the bound. Must be - between 0 and 1. - double_sided : bool, optional - Whether the provided threshold is single-sided or double sided, - by default True - - Returns - ------- - numpy.ndarray with shape (N,) - NEES value corresponding to confidence interval - - """ - if confidence_interval >= 1 or confidence_interval <= 0: - raise ValueError("Confidence interval must lie in (0, 1)") - - upper_bound_threshold = confidence_interval - if double_sided: - upper_bound_threshold += (1 - confidence_interval) / 2 - - return ( - chi2.ppf(upper_bound_threshold, df=self.num_trials * self.dof) - / self.num_trials - ) - - -class IMMResult(GaussianResult): - __slots__ = [ - "stamp", - "state", - "state_true", - "covariance", - "error", - "ees", - "nees", - "md", - "three_sigma", - "model_probabilities", - ] - - def __init__(self, imm_estimate: IMMState, state_true: State): - super().__init__( - gaussian_mixing( - imm_estimate.model_probabilities, imm_estimate.model_states - ), - state_true, - ) - - self.model_probabilities = imm_estimate.model_probabilities - - -class IMMResultList(GaussianResultList): - __slots__ = [ - "stamp", - "state", - "state_true", - "covariance", - "error", - "ees", - "nees", - "md", - "three_sigma", - "value", - "value_true", - "dof", - "model_probabilities", - ] - - def __init__(self, result_list: List[IMMResult]): - super().__init__(result_list) - self.model_probabilities = np.array( - [r.model_probabilities for r in result_list] - ) - - @staticmethod - def from_estimates( - estimate_list: List[IMMState], - state_true_list: List[State], - method="nearest", - ): - """ - A convenience function that creates a IMMResultList from a list of - IMMState and a list of true State objects - - Parameters - ---------- - estimate_list : List[IMMState] - A list of IMMState objects - state_true_list : List[State] - A list of true State objects - method : "nearest" or "linear", optional - The method used to interpolate the true state when the timestamps - do not line up exactly, by default "nearest". - - Returns - ------- - IMMResultList - A IMMResultList object - """ - stamps = [r.model_states[0].stamp for r in estimate_list] - - state_true_list = state_interp(stamps, state_true_list, method=method) - return IMMResultList( - [ - IMMResult(estimate, state_true) - for estimate, state_true in zip(estimate_list, state_true_list) - ] - ) - - -def monte_carlo( - trial: Callable[[int], GaussianResultList], - num_trials: int, - num_jobs: int = -1, - verbose: int = 10, -) -> MonteCarloResult: - """ - Monte-Carlo experiment executor. Give a callable ``trial`` function that - executes a trial and returns a ``GaussianResultList``, and this function - will execute it a number of times and aappgregate the results. - - Parameters - ---------- - trial : Callable[[int], GaussianResultList] - Callable trial function. Must accept a single integer trial number, - and return a GaussianResultList. From trial to trial, the timestamps - are expected to remain consistent. - num_trials : int - Number of Trials to execute - num_jobs: int, optional - The maximum number of concurrently running jobs, by default -1. - If -1 all CPUs are used. If 1 is given, no parallel computing code - is used at all, which is useful for debugging. For num_jobs below -1, - (n_cpus + 1 + num_jobs) are used. Thus for num_jobs = -2, all CPUs but - one are used. - verbose: int, optional - The verbosity level, by default 10. If non zero, progress messages - are printed. Above 50, the output is sent to stdout. The frequency - of the messages increases with the verbosity level. If it more than - 10, all iterations are reported. - - Returns - ------- - MonteCarloResult - Data container object - """ - trial_results = [None] * num_trials - - print("Starting Monte Carlo experiment...") - trial_results = Parallel(n_jobs=num_jobs, verbose=verbose)( - delayed(trial)(i) for i in range(num_trials) - ) - - return MonteCarloResult(trial_results) - - def randvec(cov: np.ndarray, num_samples: int = 1) -> np.ndarray: """ @@ -1516,3 +976,49 @@ def from_estimates( for estimate, state_true in zip(estimate_list, state_true_list) ] ) + + +def monte_carlo( + trial: Callable[[int], GaussianResultList], + num_trials: int, + num_jobs: int = -1, + verbose: int = 10, +) -> MonteCarloResult: + """ + Monte-Carlo experiment executor. Give a callable ``trial`` function that + executes a trial and returns a ``GaussianResultList``, and this function + will execute it a number of times and aappgregate the results. + + Parameters + ---------- + trial : Callable[[int], GaussianResultList] + Callable trial function. Must accept a single integer trial number, + and return a GaussianResultList. From trial to trial, the timestamps + are expected to remain consistent. + num_trials : int + Number of Trials to execute + num_jobs: int, optional + The maximum number of concurrently running jobs, by default -1. + If -1 all CPUs are used. If 1 is given, no parallel computing code + is used at all, which is useful for debugging. For num_jobs below -1, + (n_cpus + 1 + num_jobs) are used. Thus for num_jobs = -2, all CPUs but + one are used. + verbose: int, optional + The verbosity level, by default 10. If non zero, progress messages + are printed. Above 50, the output is sent to stdout. The frequency + of the messages increases with the verbosity level. If it more than + 10, all iterations are reported. + + Returns + ------- + MonteCarloResult + Data container object + """ + trial_results = [None] * num_trials + + print("Starting Monte Carlo experiment...") + trial_results = Parallel(n_jobs=num_jobs, verbose=verbose)( + delayed(trial)(i) for i in range(num_trials) + ) + + return MonteCarloResult(trial_results)