From 424ab549a63db4fed759e9b58de0aabd2bbcc783 Mon Sep 17 00:00:00 2001 From: Steven Hiscocks Date: Wed, 3 Sep 2025 14:06:48 +0100 Subject: [PATCH 01/15] Work in progress sensor bias model/feeder --- stonesoup/feeder/bias.py | 194 +++++++++++++++++++++++++++ stonesoup/models/measurement/bias.py | 120 +++++++++++++++++ 2 files changed, 314 insertions(+) create mode 100644 stonesoup/feeder/bias.py create mode 100644 stonesoup/models/measurement/bias.py diff --git a/stonesoup/feeder/bias.py b/stonesoup/feeder/bias.py new file mode 100644 index 000000000..73c7d5f26 --- /dev/null +++ b/stonesoup/feeder/bias.py @@ -0,0 +1,194 @@ +import copy +import datetime +from abc import abstractmethod +from functools import partial + +import numpy as np +from scipy.linalg import block_diag + +from .base import DetectionFeeder +from ..base import Property +from ..buffered_generator import BufferedGenerator +from ..models.measurement.bias import \ + OrientationBiasWrapper, OrientationTranslationBiasWrapper, \ + TimeBiasWrapper, TranslationBiasWrapper +from ..models.measurement.nonlinear import CombinedReversibleGaussianMeasurementModel +from ..models.transition import TransitionModel +from ..predictor.kalman import KalmanPredictor +from ..updater.kalman import KalmanUpdater, UnscentedKalmanUpdater +from ..types.array import CovarianceMatrix, StateVector +from ..types.detection import Detection +from ..types.hypothesis import SingleHypothesis +from ..types.state import GaussianState +from ..types.update import Update + + +class _GaussianBiasFeeder(DetectionFeeder): + bias_prior: GaussianState = Property(doc="Prior bias") + bias_predictor: KalmanPredictor = Property(doc="Predictor for bias") + updater: KalmanUpdater = Property( + default=None, + doc="Updater for bias and joint states. Must support non-linear models. " + "Default `None` will create UKF instance.") + max_bias: list[float] = Property(default=None, doc="Max bias ± from 0 allowed") + + @property + @abstractmethod + def _model_wrapper(self): + raise NotImplementedError() + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._bias_state = copy.deepcopy(self.bias_prior) + if self.updater is None: + self.updater = UnscentedKalmanUpdater(None) + + @property + def ndim_bias(self): + return self._bias_state.state_vector.shape[0] + + @property + def bias_state(self): + return self._bias_state + + @property + def bias(self): + return self._bias_state.state_vector + + @abstractmethod + @BufferedGenerator.generator_method + def data_gen(self): + raise NotImplementedError() + + def update_bias(self, hypotheses): + if any(not hyp for hyp in hypotheses): + raise ValueError("Must provide only non-missed detection hypotheses") + + ndim_bias = self.ndim_bias + + # Predict bias + pred_time = max(hypothesis.prediction.timestamp for hypothesis in hypotheses) + if self._bias_state.timestamp is None: + self._bias_state.timestamp = pred_time + else: + self._bias_state = self.bias_predictor.predict(self._bias_state, timestamp=pred_time) + + # Create joint state + states = [hypothesis.prediction for hypothesis in hypotheses] + applied_bias = next((h.measurement.applied_bias for h in hypotheses), + np.zeros((ndim_bias, 1))) + delta_bias = self.bias - applied_bias + states.append(GaussianState(delta_bias, self.bias_state.covar)) + combined_pred = GaussianState( + np.vstack([state.state_vector for state in states]).view(StateVector), + block_diag(*[state.covar for state in states]).view(CovarianceMatrix), + timestamp=pred_time, + ) + + # Create joint measurement + offset = 0 + models = [] + for prediction, measurement in ( + (hypothesis.prediction, hypothesis.measurement) for hypothesis in hypotheses): + models.append(self._model_wrapper( + ndim_state=combined_pred.state_vector.shape[0], + measurement_model=measurement.measurement_model, + state_mapping=[offset + n for n in range(prediction.ndim)], + bias_mapping=list(range(-ndim_bias, 0)) + )) + offset += prediction.ndim + combined_meas = Detection( + np.vstack([hypothesis.measurement.state_vector for hypothesis in hypotheses]), + timestamp=pred_time, + measurement_model=CombinedReversibleGaussianMeasurementModel(models)) + + # Update bias + update = self.updater.update(SingleHypothesis(combined_pred, combined_meas)) + rel_delta_bias = update.state_vector[-ndim_bias:, :] - delta_bias + self.bias_state.state_vector += rel_delta_bias + if self.max_bias is not None: + self.bias = np.min([abs(self.bias), self.max_bias], axis=0) * np.sign(self.bias) + self.bias_state.covar = update.covar[-ndim_bias:, -ndim_bias:] + + # Create update states + offset = 0 + updates = [] + for hypothesis in hypotheses: + update_slice = slice(offset, offset + hypothesis.prediction.ndim) + updates.append(Update.from_state( + hypothesis.prediction, + state_vector=update.state_vector[update_slice, :], + covar=update.covar[update_slice, update_slice], + timestamp=hypothesis.prediction.timestamp, + hypothesis=hypothesis)) + offset += hypothesis.prediction.ndim + + return updates + + +class TimeGaussianBiasFeeder(_GaussianBiasFeeder): + transition_model: TransitionModel = Property() + + @property + def _model_wrapper(self): + return partial(TimeBiasWrapper, transition_model=self.transition_model) + + @BufferedGenerator.generator_method + def data_gen(self): + for time, detections in self.reader: + bias = self.bias + bias_delta = datetime.timedelta(seconds=float(bias)) + time -= bias_delta + for detection in detections: + detection.timestamp -= bias_delta + detection.applied_bias = bias + yield time, detections + + +class OrientationGaussianBiasFeeder(_GaussianBiasFeeder): + _model_wrapper = OrientationBiasWrapper + + @BufferedGenerator.generator_method + def data_gen(self): + for time, detections in self.reader: + bias = self.bias + models = set() + for detection in detections: + models.add(detection.measurement_model) + detection.applied_bias = bias + for model in models: + model.rotation_offset = model.rotation_offset - bias + yield time, detections + + +class TranslationGaussianBiasFeeder(_GaussianBiasFeeder): + _model_wrapper = TranslationBiasWrapper + + @BufferedGenerator.generator_method + def data_gen(self): + for time, detections in self.reader: + bias = self.bias.copy() + models = set() + for detection in detections: + models.add(detection.measurement_model) + detection.applied_bias = bias + for model in models: + model.translation_offset = model.translation_offset - bias + yield time, detections + + +class OrientationTranslationGaussianBiasFeeder(_GaussianBiasFeeder): + _model_wrapper = OrientationTranslationBiasWrapper + + @BufferedGenerator.generator_method + def data_gen(self): + for time, detections in self.reader: + bias = self.bias.copy() + models = set() + for detection in detections: + models.add(detection.measurement_model) + detection.applied_bias = bias + for model in models: + model.orientation_offset = model.orientation_offset - bias[:3] + model.translation_offset = model.translation_offset - bias[3:] + yield time, detections diff --git a/stonesoup/models/measurement/bias.py b/stonesoup/models/measurement/bias.py new file mode 100644 index 000000000..7f84ea6e6 --- /dev/null +++ b/stonesoup/models/measurement/bias.py @@ -0,0 +1,120 @@ +import copy +import datetime +from abc import abstractmethod + +from . import MeasurementModel +from ..transition import TransitionModel +from ...base import Property +from ...functions import jacobian as compute_jac +from ...types.state import State, StateVectors + + +class _BiasWrapper(MeasurementModel): + measurement_model: MeasurementModel = Property( + doc="Model being wrapped that bias will be applied to") + state_mapping: list[int] = Property( + doc="Mapping to state vector elements relevant to wrapped model") + bias_mapping: list[int] = Property(doc="Mapping to state vector elements where bias is") + + @property + def mapping(self): + return list(self.measurement_model.mapping) + list(self.bias_mapping) + + @property + def ndim_meas(self): + return self.measurement_model.ndim_meas + + @abstractmethod + def function(self, state, noise=False, **kwargs): + raise NotImplementedError() + + def covar(self, *args, **kwargs): + return self.measurement_model.covar(*args, **kwargs) + + def jacobian(self, state, **kwargs): + return compute_jac(self.function, state, **kwargs) + + def pdf(self, *args, **kwargs): + raise NotImplementedError() + + def rvs(self, *args, **kwargs): + raise NotImplementedError() + + +class TimeBiasWrapper(_BiasWrapper): + transition_model: TransitionModel = Property( + doc="Transition model applied to state to apply time offset") + bias_mapping: tuple[int] | int = Property( + default=(-1, ), doc="Mapping to state vector elements where bias is") + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + if isinstance(self.bias_mapping, int): + self.bias_mapping = (self.bias_mapping, ) + + def function(self, state, noise=False, **kwargs): + predicted_state_vectors = [] + for state_vector in state.state_vector.view(StateVectors): + delta_t = state_vector[self.bias_mapping[0], 0] + predicted_state_vectors.append(self.transition_model.function( + State(state_vector[self.state_mapping, :]), + time_interval=datetime.timedelta(seconds=-delta_t), + **kwargs)) + return self.measurement_model.function( + State(StateVectors(predicted_state_vectors)), noise=noise, **kwargs) + + +class OrientationBiasWrapper(_BiasWrapper): + bias_mapping: tuple[int, int, int] = Property( + default=(-3, -2, -1), doc="Mapping to state vector elements where bias is") + + def function(self, state, noise=False, **kwargs): + state_vectors = [] + for state_vector in state.state_vector.view(StateVectors): + delta_orient = state_vector[self.bias_mapping, :] + bias_model = copy.copy(self.measurement_model) + bias_model.rotation_offset = bias_model.rotation_offset - delta_orient + state_vectors.append(bias_model.function( + State(state_vector[self.state_mapping, :]), noise=noise, **kwargs)) + if len(state_vectors) == 1: + return state_vectors[0] + else: + return StateVectors(state_vectors) + + +class TranslationBiasWrapper(_BiasWrapper): + bias_mapping: tuple[int, int] | tuple[int, int, int] = Property( + default=(-3, -2, -1), doc="Mapping to state vector elements where bias is") + + def function(self, state, noise=False, **kwargs): + state_vectors = [] + for state_vector in state.state_vector.view(StateVectors): + delta_trans = state_vector[self.bias_mapping, :] + bias_model = copy.copy(self.measurement_model) + bias_model.translation_offset = bias_model.translation_offset - delta_trans + state_vectors.append(bias_model.function( + State(state_vector[self.state_mapping, :]), noise=noise, **kwargs)) + if len(state_vectors) == 1: + return state_vectors[0] + else: + return StateVectors(state_vectors) + + +class OrientationTranslationBiasWrapper(_BiasWrapper): + bias_mapping: tuple[int, int, int, int, int] | tuple[int, int, int, int, int, int] = Property( + default=(-6, -5, -4, -3, -2, -1), doc="Mapping to state vector elements where bias is") + + def function(self, state, noise=False, **kwargs): + state_vectors = [] + for state_vector in state.state_vector.view(StateVectors): + delta_orient = state_vector[self.bias_mapping[:3], :] + delta_trans = state_vector[self.bias_mapping[3:], :] + bias_model = copy.copy(self.measurement_model) + bias_model.rotation_offset = bias_model.rotation_offset - delta_orient + bias_model.translation_offset = bias_model.translation_offset - delta_trans + state_vectors.append(bias_model.function( + State(state_vector[self.state_mapping, :]), noise=noise, **kwargs)) + if len(state_vectors) == 1: + return state_vectors[0] + else: + return StateVectors(state_vectors) From e7bfd4ec482e4b6cf083e8fc743cd15cd2a7fbdd Mon Sep 17 00:00:00 2001 From: Steven Hiscocks Date: Thu, 11 Sep 2025 17:18:46 +0100 Subject: [PATCH 02/15] Correct rotation offset in OrientationTranslationGaussianBiasFeeder --- stonesoup/feeder/bias.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stonesoup/feeder/bias.py b/stonesoup/feeder/bias.py index 73c7d5f26..a98b1f95f 100644 --- a/stonesoup/feeder/bias.py +++ b/stonesoup/feeder/bias.py @@ -189,6 +189,6 @@ def data_gen(self): models.add(detection.measurement_model) detection.applied_bias = bias for model in models: - model.orientation_offset = model.orientation_offset - bias[:3] + model.rotation_offset = model.rotation_offset - bias[:3] model.translation_offset = model.translation_offset - bias[3:] yield time, detections From 63689932a9b1cd6d6e5e24e0fd9bd88b18068b3f Mon Sep 17 00:00:00 2001 From: Steven Hiscocks Date: Thu, 11 Sep 2025 17:19:23 +0100 Subject: [PATCH 03/15] Fix max bias in bias feeders --- stonesoup/feeder/bias.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/stonesoup/feeder/bias.py b/stonesoup/feeder/bias.py index a98b1f95f..96407ec58 100644 --- a/stonesoup/feeder/bias.py +++ b/stonesoup/feeder/bias.py @@ -107,7 +107,8 @@ def update_bias(self, hypotheses): rel_delta_bias = update.state_vector[-ndim_bias:, :] - delta_bias self.bias_state.state_vector += rel_delta_bias if self.max_bias is not None: - self.bias = np.min([abs(self.bias), self.max_bias], axis=0) * np.sign(self.bias) + self.bias_state.state_vector = \ + np.min([abs(self.bias), self.max_bias], axis=0) * np.sign(self.bias) self.bias_state.covar = update.covar[-ndim_bias:, -ndim_bias:] # Create update states From bfc0487a82e961f7de676e7d348b777eb9f1b8a3 Mon Sep 17 00:00:00 2001 From: Steven Hiscocks Date: Thu, 11 Sep 2025 17:20:41 +0100 Subject: [PATCH 04/15] Add tests for bias estimation components --- stonesoup/feeder/tests/test_bias.py | 291 ++++++++++++++++++ .../models/measurement/tests/test_bias.py | 243 +++++++++++++++ 2 files changed, 534 insertions(+) create mode 100644 stonesoup/feeder/tests/test_bias.py create mode 100644 stonesoup/models/measurement/tests/test_bias.py diff --git a/stonesoup/feeder/tests/test_bias.py b/stonesoup/feeder/tests/test_bias.py new file mode 100644 index 000000000..b65aa0b7d --- /dev/null +++ b/stonesoup/feeder/tests/test_bias.py @@ -0,0 +1,291 @@ +import datetime + +import numpy as np +import pytest + +from stonesoup.feeder.bias import ( + TimeGaussianBiasFeeder, + TranslationGaussianBiasFeeder, OrientationGaussianBiasFeeder, + OrientationTranslationGaussianBiasFeeder) +from stonesoup.functions import build_rotation_matrix +from stonesoup.models.transition.linear import CombinedLinearGaussianTransitionModel, RandomWalk +from stonesoup.predictor.kalman import KalmanPredictor +from stonesoup.types.detection import Detection +from stonesoup.types.hypothesis import SingleHypothesis +from stonesoup.types.state import GaussianState, StateVector + + +def make_dummy_detection(state_vector, measurement_model): + det = Detection( + state_vector, + timestamp=datetime.datetime.now(), + measurement_model=measurement_model + ) + det.applied_bias = None + return det + + +def make_dummy_measurement_model(): + class DummyMeasurementModel: + mapping = [0, 1, 2] + ndim_meas = 3 + translation_offset = StateVector([[0.], [0.], [0.]]) + rotation_offset = StateVector([[0.], [0.], [0.]]) + + def function(self, state, noise=False, **kwargs): + return build_rotation_matrix(self.rotation_offset) \ + @ (state.state_vector - self.translation_offset) + + def covar(self, *args, **kwargs): + return np.eye(3) + + def jacobian(self, state, **kwargs): + return np.eye(3, 6) + + return DummyMeasurementModel() + + +def make_dummy_transition_model(): + class DummyTransitionModel: + def function(self, state, noise=False, time_interval=None, **kwargs): + # Adds time_interval (seconds) to the state_vector + if time_interval is None: + time_interval = datetime.timedelta(seconds=0) + return state.state_vector + time_interval.total_seconds() + return DummyTransitionModel() + + +@pytest.fixture(params=[None, datetime.datetime(2025, 9, 9, 23, 59, 59)]) +def bias_timestamp(request): + return request.param + + +def test_translation_gaussian_bias_feeder_iter(): + # Setup feeder + bias_prior = GaussianState(StateVector([[1.], [2.], [3.]]), np.eye(3)) + bias_predictor = KalmanPredictor( + CombinedLinearGaussianTransitionModel([RandomWalk(1)] * 3)) + # Mock reader to yield a single time and detection + measurement_model = make_dummy_measurement_model() + detection = make_dummy_detection(StateVector([[10.], [20.], [30.]]), measurement_model) + feeder = TranslationGaussianBiasFeeder( + reader=[(datetime.datetime(2025, 9, 10), [detection])], + bias_prior=bias_prior, + bias_predictor=bias_predictor, + ) + # Iterate over feeder + for time, detections in feeder: + # Bias should be applied to detection + for det in detections: + assert np.allclose(det.applied_bias, feeder.bias) + # The measurement model's translation_offset should be updated + assert np.allclose( + det.measurement_model.translation_offset, + -feeder.bias + ) + + +def test_translation_gaussian_bias_feeder_update_bias(bias_timestamp): + # Setup feeder + bias_prior = GaussianState(StateVector([[0.], [0.], [0.]]), np.eye(3) * 10, bias_timestamp) + bias_predictor = KalmanPredictor( + CombinedLinearGaussianTransitionModel([RandomWalk(1)] * 3)) + feeder = TranslationGaussianBiasFeeder( + reader=None, + bias_prior=bias_prior, + bias_predictor=bias_predictor, + ) + # Create dummy prediction and measurement + measurement_model = make_dummy_measurement_model() + pred = GaussianState( + StateVector([[10.], [20.], [30.]]), + np.eye(3), + timestamp=datetime.datetime(2025, 9, 10) + ) + meas = Detection( + StateVector([[11.], [21.], [31.]]), + timestamp=datetime.datetime(2025, 9, 10), + measurement_model=measurement_model + ) + meas.applied_bias = np.zeros((3, 1)) + # Create hypothesis + hyp = SingleHypothesis(pred, meas) + # Call update_bias + updates = feeder.update_bias([hyp]) + # The bias_state should be updated (should not be the same as initial)... + assert not np.allclose(feeder.bias_state.state_vector, bias_prior.state_vector) + # and should be around 0.8 ± 0.1 + assert np.allclose(feeder.bias_state.state_vector, [[0.8], [0.8], [0.8]], atol=0.1) + # The returned updates should match the updated state shape + assert updates[0].state_vector.shape == pred.state_vector.shape + + +def test_orientation_gaussian_bias_feeder_iter(): + bias_prior = GaussianState(StateVector([[0.], [0.], [np.pi/16]]), np.eye(3)) + bias_predictor = KalmanPredictor( + CombinedLinearGaussianTransitionModel([RandomWalk(1)] * 3)) + measurement_model = make_dummy_measurement_model() + detection = make_dummy_detection(StateVector([[10.], [20.], [30.]]), measurement_model) + feeder = OrientationGaussianBiasFeeder( + reader=[(datetime.datetime(2025, 9, 10), [detection])], + bias_prior=bias_prior, + bias_predictor=bias_predictor, + ) + for time, detections in feeder: + for det in detections: + assert np.allclose(det.applied_bias, feeder.bias) + assert np.allclose( + det.measurement_model.rotation_offset, + StateVector([[0.], [0.], [0.]]) - feeder.bias + ) + rotated = build_rotation_matrix(det.measurement_model.rotation_offset) \ + @ (det.state_vector - det.measurement_model.translation_offset) + expected_rotated = build_rotation_matrix(-feeder.bias) \ + @ (det.state_vector - det.measurement_model.translation_offset) + assert np.allclose(rotated, expected_rotated) + + +def test_orientation_gaussian_bias_feeder_update_bias(bias_timestamp): + bias_prior = GaussianState( + StateVector([[0.], [0.], [0.]]), np.diag([1e-6, 1e-6, 1]), bias_timestamp) + bias_predictor = KalmanPredictor( + CombinedLinearGaussianTransitionModel([RandomWalk(1e-6)] * 3)) + feeder = OrientationGaussianBiasFeeder( + reader=None, + bias_prior=bias_prior, + bias_predictor=bias_predictor, + max_bias=StateVector([[0.], [0.], [np.pi]]) + ) + measurement_model = make_dummy_measurement_model() + pred = GaussianState( + StateVector([[10.], [20.], [30.]]), + np.eye(3), + timestamp=datetime.datetime(2025, 9, 10) + ) + meas = Detection( + build_rotation_matrix(np.array([[0.], [0.], [-np.pi/16]])) + @ StateVector([[10.], [20.], [30.]]), + timestamp=datetime.datetime(2025, 9, 10), + measurement_model=measurement_model + ) + meas.applied_bias = np.zeros((3, 1)) + hyp = SingleHypothesis(pred, meas) + updates = feeder.update_bias([hyp]) + assert not np.allclose(feeder.bias_state.state_vector, bias_prior.state_vector) + assert np.allclose(feeder.bias_state.state_vector, [[0.], [0.], [np.pi/16]], atol=0.05) + assert updates[0].state_vector.shape == pred.state_vector.shape + + +def test_orientation_translation_gaussian_bias_feeder_iter(): + bias_prior = GaussianState( + StateVector([[0.], [0.], [np.pi/16], [1.], [2.], [3.]]), np.diag([1e-6, 1e-6, 1, 3, 3, 3])) + bias_predictor = KalmanPredictor( + CombinedLinearGaussianTransitionModel([RandomWalk(1e-6)]*3 + [RandomWalk(1)]*3)) + measurement_model = make_dummy_measurement_model() + detection = make_dummy_detection(StateVector([[10.], [20.], [30.]]), measurement_model) + feeder = OrientationTranslationGaussianBiasFeeder( + reader=[(datetime.datetime(2025, 9, 10), [detection])], + bias_prior=bias_prior, + bias_predictor=bias_predictor, + ) + for time, detections in feeder: + for det in detections: + assert np.allclose(det.applied_bias, feeder.bias) + assert np.allclose( + np.vstack([ + det.measurement_model.rotation_offset, + det.measurement_model.translation_offset]), + -feeder.bias + ) + rotated = build_rotation_matrix(det.measurement_model.rotation_offset) \ + @ (det.state_vector - det.measurement_model.translation_offset) + expected_rotated = build_rotation_matrix(-feeder.bias) \ + @ (det.state_vector - det.measurement_model.translation_offset) + assert np.allclose(rotated, expected_rotated) + + +def test_orientation_translation_gaussian_bias_feeder_update_bias(bias_timestamp): + bias_prior = GaussianState( + StateVector([[0.], [0.], [np.pi/16], [1.], [2.], [3.]]), + np.diag([1e-6, 1e-6, 1, 3, 3, 3]), + bias_timestamp) + bias_predictor = KalmanPredictor( + CombinedLinearGaussianTransitionModel([RandomWalk(1e-6)]*3 + [RandomWalk(1)]*3)) + feeder = OrientationTranslationGaussianBiasFeeder( + reader=None, + bias_prior=bias_prior, + bias_predictor=bias_predictor, + ) + measurement_model = make_dummy_measurement_model() + hyps = [] + for n in range(-10, 11): # A lot ok unknowns so a few detections needed + pred = GaussianState( + StateVector([[10.*n], [20.*n], [30.*n]]), + np.eye(3), + timestamp=datetime.datetime(2025, 9, 10) + ) + meas = Detection( + build_rotation_matrix(np.array([[0.], [0.], [-np.pi/16]])) + @ (pred.state_vector+StateVector([[1.], [1.], [1.]])), + timestamp=datetime.datetime(2025, 9, 10), + measurement_model=measurement_model + ) + meas.applied_bias = np.zeros((6, 1)) + hyp = SingleHypothesis(pred, meas) + hyps.append(hyp) + updates = feeder.update_bias(hyps) + assert not np.allclose(feeder.bias_state.state_vector, bias_prior.state_vector) + assert np.allclose( + feeder.bias_state.state_vector, + [[0.], [0.], [np.pi/16], [1.], [1.], [1.]], + atol=0.1) + assert updates[0].state_vector.shape == pred.state_vector.shape + + +def test_time_gaussian_bias_feeder_iter(): + bias_prior = GaussianState(StateVector([[0.5]]), np.eye(1)) + bias_predictor = KalmanPredictor( + CombinedLinearGaussianTransitionModel([RandomWalk(1)])) + measurement_model = make_dummy_measurement_model() + detection = make_dummy_detection(StateVector([[42.]]), measurement_model) + orig_timestamp = detection.timestamp + feeder = TimeGaussianBiasFeeder( + reader=[(datetime.datetime(2025, 9, 10), [detection])], + transition_model=None, + bias_prior=bias_prior, + bias_predictor=bias_predictor, + ) + for time, detections in feeder: + for det in detections: + assert np.allclose(det.applied_bias, feeder.bias) + expected_timestamp = orig_timestamp - datetime.timedelta(seconds=feeder.bias[0]) + assert detection.timestamp == expected_timestamp + + +def test_time_gaussian_bias_feeder_update_bias(bias_timestamp): + bias_prior = GaussianState(StateVector([[0.5]]), np.eye(1) * 10, bias_timestamp) + bias_predictor = KalmanPredictor( + CombinedLinearGaussianTransitionModel([RandomWalk(1e-3)])) + feeder = TimeGaussianBiasFeeder( + reader=None, + transition_model=make_dummy_transition_model(), + bias_prior=bias_prior, + bias_predictor=bias_predictor, + ) + measurement_model = make_dummy_measurement_model() + pred = GaussianState( + StateVector([[41.5]]), + np.eye(1), + timestamp=datetime.datetime(2025, 9, 10) + ) + meas = Detection( + StateVector([[41.]]), + timestamp=datetime.datetime(2025, 9, 10), + measurement_model=measurement_model + ) + meas.applied_bias = StateVector([[0.5]]) + hyp = SingleHypothesis(pred, meas) + updates = feeder.update_bias([hyp]) + assert not np.allclose(feeder.bias_state.state_vector, bias_prior.state_vector) + assert np.allclose(feeder.bias_state.state_vector, [[1.0]], atol=0.1) + assert updates[0].state_vector.shape == pred.state_vector.shape diff --git a/stonesoup/models/measurement/tests/test_bias.py b/stonesoup/models/measurement/tests/test_bias.py new file mode 100644 index 000000000..9180e3324 --- /dev/null +++ b/stonesoup/models/measurement/tests/test_bias.py @@ -0,0 +1,243 @@ +import datetime + +import pytest +import numpy as np + +from stonesoup.models.measurement.bias import ( + TimeBiasWrapper, + OrientationBiasWrapper, TranslationBiasWrapper, OrientationTranslationBiasWrapper) +from stonesoup.models.measurement.nonlinear import CartesianToElevationBearingRangeRate +from stonesoup.models.transition.linear import ConstantVelocity +from stonesoup.models.transition.base import CombinedGaussianTransitionModel +from stonesoup.types.state import State, StateVector + + +@pytest.fixture(params=[ + np.array([[0.], [0.], [0.]]), + np.array([[0.1], [0.2], [0.3]]), + np.array([[-0.5], [0.], [np.pi/4]]) +]) +def orientation_bias(request): + return request.param + + +@pytest.fixture(params=[ + np.array([[0.], [0.], [0.]]), + np.array([[1.], [-2.], [3.]]), + np.array([[-5.], [0.], [2.5]]) +]) +def translation_bias(request): + return request.param + + +def test_orientation_translation_bias_wrapper(orientation_bias, translation_bias): + # Setup measurement model + model = CartesianToElevationBearingRangeRate( + mapping=[0, 1, 2], + ndim_state=4, + noise_covar=np.eye(4), + translation_offset=StateVector([[0.], [0.], [0.]]), + rotation_offset=StateVector([[0.], [0.], [0.]]), + ) + # Wrap with orientation+translation bias + wrapper = OrientationTranslationBiasWrapper( + measurement_model=model, + state_mapping=[0, 1, 2, 3, 4, 5], + bias_mapping=(6, 7, 8, 9, 10, 11), + ndim_state=12 + ) + # State vector: [x, y, z, vx, vy, vz, roll, pitch, yaw, tx, ty, tz] + state_vec = np.vstack(( + [1.], [2.], [3.], [0.1], [0.2], [0.3], + orientation_bias.reshape(3, 1), + translation_bias.reshape(3, 1) + )) + state = State(StateVector(state_vec)) + # The wrapper should subtract the bias from the model's rotation_offset and translation_offset + result = wrapper.function(state) + # Check that the model's offsets were not changed + assert np.allclose(wrapper.measurement_model.rotation_offset, StateVector([[0.], [0.], [0.]])) + assert np.allclose( + wrapper.measurement_model.translation_offset, StateVector([[0.], [0.], [0.]])) + bias_model = CartesianToElevationBearingRangeRate( + mapping=[0, 1, 2], + ndim_state=4, + noise_covar=np.eye(4), + translation_offset=StateVector([[0.], [0.], [0.]]), + rotation_offset=StateVector([[0.], [0.], [0.]]), + ) + bias_model.rotation_offset = bias_model.rotation_offset - orientation_bias + bias_model.translation_offset = bias_model.translation_offset - translation_bias + # Use only [x, y, z, v] for expected state + expected_state = StateVector(state_vec[:6]) + expected = bias_model.function(State(expected_state)) + assert np.allclose(result, expected) + + # Additional coverage: mapping, ndim_meas, covar, jacobian + expected_mapping = [0, 1, 2, 6, 7, 8, 9, 10, 11] + assert wrapper.mapping == expected_mapping + assert wrapper.ndim_meas == 4 + covar = wrapper.covar() + assert covar.shape == (4, 4) + jac = wrapper.jacobian(state) + assert jac.shape[0] == wrapper.ndim_meas + assert jac.shape[1] == wrapper.ndim_state + + +def test_translation_bias_wrapper(translation_bias): + # Setup measurement model + model = CartesianToElevationBearingRangeRate( + mapping=[0, 1, 2], + ndim_state=4, + noise_covar=np.eye(4), + translation_offset=StateVector([[0.], [0.], [0.]]), + rotation_offset=StateVector([[0.], [0.], [0.]]), + ) + # Wrap with translation bias + wrapper = TranslationBiasWrapper( + measurement_model=model, + state_mapping=[0, 1, 2, 3, 4, 5], + bias_mapping=(6, 7, 8), + ndim_state=9 + ) + # State vector: [x, y, z, vx, vy, vz, tx, ty, tz] + state_vec = np.vstack(( + [1.], [2.], [3.], [0.1], [0.2], [0.3], + translation_bias.reshape(3, 1) + )) + state = State(StateVector(state_vec)) + # The wrapper should subtract the bias from the model's translation_offset + result = wrapper.function(state) + # Check that the model's translation_offset was set correctly + assert np.allclose( + wrapper.measurement_model.translation_offset, StateVector([[0.], [0.], [0.]])) + # Check that the bias was applied (the result should be as if translation_offset was -bias) + bias_model = CartesianToElevationBearingRangeRate( + mapping=[0, 1, 2], + ndim_state=4, + noise_covar=np.eye(4), + translation_offset=StateVector([[0.], [0.], [0.]]), + rotation_offset=StateVector([[0.], [0.], [0.]]), + ) + bias_model.translation_offset = bias_model.translation_offset - translation_bias + # Use only [x, y, z, v] for expected state + expected_state = StateVector(state_vec[:6]) + expected = bias_model.function(State(expected_state)) + assert np.allclose(result, expected) + + # Additional coverage: mapping, ndim_meas, covar, jacobian + expected_mapping = [0, 1, 2, 6, 7, 8] + assert wrapper.mapping == expected_mapping + assert wrapper.ndim_meas == 4 + covar = wrapper.covar() + assert covar.shape == (4, 4) + jac = wrapper.jacobian(state) + assert jac.shape[0] == wrapper.ndim_meas + assert jac.shape[1] == wrapper.ndim_state + + +def test_orientation_bias_wrapper(orientation_bias): + # Setup measurement model + model = CartesianToElevationBearingRangeRate( + mapping=[0, 1, 2], + ndim_state=4, + noise_covar=np.eye(4), + translation_offset=StateVector([[0.], [0.], [0.]]), + rotation_offset=StateVector([[0.], [0.], [0.]]), + ) + # Wrap with orientation bias + wrapper = OrientationBiasWrapper( + measurement_model=model, + state_mapping=[0, 1, 2, 3, 4, 5], + bias_mapping=(6, 7, 8), + ndim_state=9 + ) + # State vector: [x, y, z, vx, vy, vz, roll, pitch, yaw] + state_vec = np.vstack(( + [1.], [2.], [3.], [0.1], [0.2], [0.3], + orientation_bias.reshape(3, 1) + )) + state = State(StateVector(state_vec)) + # The wrapper should subtract the bias from the model's rotation_offset + result = wrapper.function(state) + # Check that the model's rotation_offset was not changed + assert np.allclose(wrapper.measurement_model.rotation_offset, StateVector([[0.], [0.], [0.]])) + # Check that the bias was applied (the result should be as if rotation_offset was -bias) + bias_model = CartesianToElevationBearingRangeRate( + mapping=[0, 1, 2], + ndim_state=4, + noise_covar=np.eye(4), + translation_offset=StateVector([[0.], [0.], [0.]]), + rotation_offset=StateVector([[0.], [0.], [0.]]), + ) + bias_model.rotation_offset = bias_model.rotation_offset - orientation_bias + expected_state = StateVector(state_vec[:6]) + expected = bias_model.function(State(expected_state)) + assert np.allclose(result, expected) + + # Additional coverage: mapping, ndim_meas, covar, jacobian + expected_mapping = [0, 1, 2, 6, 7, 8] + assert wrapper.mapping == expected_mapping + assert wrapper.ndim_meas == 4 + covar = wrapper.covar() + assert covar.shape == (4, 4) + jac = wrapper.jacobian(state) + assert jac.shape[0] == wrapper.ndim_meas + assert jac.shape[1] == wrapper.ndim_state + + +@pytest.mark.parametrize('bias_mapping', [-1, (-1, )]) +def test_time_bias_wrapper(bias_mapping): + # Setup transition model: 3D constant velocity + cv_x = ConstantVelocity(noise_diff_coeff=0.1) + cv_y = ConstantVelocity(noise_diff_coeff=0.1) + cv_z = ConstantVelocity(noise_diff_coeff=0.1) + transition_model = CombinedGaussianTransitionModel(model_list=[cv_x, cv_y, cv_z]) + + # Setup measurement model (dummy, just returns state) + class DummyMeasurementModel: + mapping = [0, 2, 4] + ndim_meas = 3 + + def function(self, state, noise=False, **kwargs): + # Just return the position components + return state.state_vector[[0, 2, 4], :] + + def covar(self, *args, **kwargs): + return np.eye(3) + + def jacobian(self, state, **kwargs): + return np.eye(3, 7) + + # Wrap with time bias + wrapper = TimeBiasWrapper( + measurement_model=DummyMeasurementModel(), + transition_model=transition_model, + state_mapping=[0, 1, 2, 3, 4, 5], + bias_mapping=bias_mapping, + ndim_state=7 + ) + + # State vector: [x, vx, y, vy, z, vz, bias] + state_vec = np.array([[1.], [2.], [3.], [4.], [5.], [6.], [1.]]) + state = State(StateVector(state_vec)) + + # The wrapper should apply the transition model with time_interval = -bias + expected_state = transition_model.function( + State(StateVector(state_vec[:6])), + time_interval=datetime.timedelta(seconds=-state_vec[6, 0]) + ) + expected = wrapper.measurement_model.function(State(expected_state)) + result = wrapper.function(state) + assert np.allclose(result, expected) + + # Also test with negative bias (forward in time) + state_vec2 = np.array([[1.], [2.], [3.], [4.], [5.], [6.], [-2.]]) + state2 = State(StateVector(state_vec2)) + expected_state2 = transition_model.function( + State(StateVector(state_vec2[:6])), + time_interval=datetime.timedelta(seconds=-state_vec2[6, 0]) + ) + expected2 = wrapper.measurement_model.function(State(expected_state2)) + result2 = wrapper.function(state2) + assert np.allclose(result2, expected2) From 0042d6cf637d49ed29c1bac01e8d9009c27ac6d0 Mon Sep 17 00:00:00 2001 From: Steven Hiscocks Date: Thu, 11 Sep 2025 17:20:58 +0100 Subject: [PATCH 05/15] Add uncertainty to Plotterly 1D plotter --- stonesoup/plotter.py | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/stonesoup/plotter.py b/stonesoup/plotter.py index 25ed8ae83..cb67c3ea3 100644 --- a/stonesoup/plotter.py +++ b/stonesoup/plotter.py @@ -1425,16 +1425,30 @@ def plot_tracks(self, tracks, mapping, uncertainty=False, particle=False, label= if self.dimension == 1: # plot 1D tracks - if uncertainty or particle: + if particle: raise NotImplementedError + x = [state.timestamp for state in track] + y = [float(getattr(state, 'mean', state.state_vector)[mapping[0]]) + for state in track] self.fig.add_scatter( - x=[state.timestamp for state in track], - y=[float(getattr(state, 'mean', state.state_vector)[mapping[0]]) - for state in track], + x=x, + y=y, text=[self._format_state_text(state) for state in track], **scatter_kwargs) + if uncertainty: + dy = np.array([ + np.sqrt(float(state.covar[mapping[0], mapping[0]])) for state in track]) + name = track_kwargs['legendgroup'] + "
(Error)" + self.fig.add_scatter( + x=np.concatenate([x, x[::-1]]), + y=np.concatenate([y + dy, (y - dy)[::-1]]), + mode='none', fill='toself', fillcolor=track_colors[track], + opacity=0.2, hoverinfo='skip', + legendgroup=name, name=name, + legendrank=track_kwargs['legendrank'] + 10) + elif self.dimension == 2: # plot 2D tracks self.fig.add_scatter( From 44c84d2748493c0e38c0fe65fc999eff12c18fe4 Mon Sep 17 00:00:00 2001 From: Steven Hiscocks Date: Thu, 11 Sep 2025 17:21:45 +0100 Subject: [PATCH 06/15] Add sensor fusion based bias estimation example --- docs/examples/sensorfusion/senor_bias.py | 270 +++++++++++++++++++++++ docs/source/stonesoup.feeder.rst | 6 + 2 files changed, 276 insertions(+) create mode 100644 docs/examples/sensorfusion/senor_bias.py diff --git a/docs/examples/sensorfusion/senor_bias.py b/docs/examples/sensorfusion/senor_bias.py new file mode 100644 index 000000000..db92e5691 --- /dev/null +++ b/docs/examples/sensorfusion/senor_bias.py @@ -0,0 +1,270 @@ +#!/usr/bin/env python +# coding: utf-8 + +""" +Estimating Bias Between Sensors +=============================== +""" +# %% +# This example demonstrates how to simulate and estimate a drifting bias in the position of a sensor platform. +# Specifically, the platform at index 0 (and its sensor) will have a time-varying bias applied to its position. +# We use Stone-Soup's bias wrappers and feeders to estimate this changing bias from sensor measurements. + +# Some initial imports and set up +import datetime +import numpy as np + +np.random.seed(2001) +start_time = datetime.datetime.now().replace(microsecond=0) + +# %% +# Define Platforms and Sensors +# ---------------------------- +# +# We create three moving platforms, each with a radar sensor. The first platform will have a drifting bias applied. +from stonesoup.models.transition.linear import \ + RandomWalk, ConstantVelocity, OrnsteinUhlenbeck, CombinedLinearGaussianTransitionModel +from stonesoup.platform import MovingPlatform +from stonesoup.sensor.radar.radar import RadarBearingRange +from stonesoup.types.state import State, GaussianState +from stonesoup.types.track import Track +from stonesoup.types.array import StateVector +from stonesoup.plotter import Plotterly +from stonesoup.types.groundtruth import GroundTruthPath, GroundTruthState +platforms = [ + MovingPlatform( + states=State( + StateVector([-10., 1., -10., 1.]), + timestamp=start_time + ), + position_mapping=[0, 2], + transition_model=CombinedLinearGaussianTransitionModel([OrnsteinUhlenbeck(0.1, 5e-1)]*2), + ), + MovingPlatform( + states=State( + StateVector([20., -1., 20., -1.]), + timestamp=start_time + ), + position_mapping=[0, 2], + transition_model=CombinedLinearGaussianTransitionModel([OrnsteinUhlenbeck(0.1, 5e-1)]*2), + ), + MovingPlatform( + states=State( + StateVector([-20., -1., -30., -1.]), + timestamp=start_time + ), + position_mapping=[0, 2], + transition_model=CombinedLinearGaussianTransitionModel([OrnsteinUhlenbeck(0.1, 5e-1)]*2), + ) +] + +sensors = [ + RadarBearingRange( + ndim_state=4, position_mapping=[0, 2], noise_covar=np.diag([np.radians(0.05), 0.5])), + RadarBearingRange( + ndim_state=4, position_mapping=[0, 2], noise_covar=np.diag([np.radians(0.05), 0.5])), + RadarBearingRange( + ndim_state=4, position_mapping=[0, 2], noise_covar=np.diag([np.radians(0.05), 0.5])), +] +# %% +# Attach Sensors to Platforms +for platform, sensor in zip(platforms, sensors): + platform.add_sensor(sensor) + +# %% +# Add Targets +# ----------- +# +# We add several moving targets to the scenario, each with its own motion model. +targets = { + MovingPlatform( + states=State( + StateVector([i * 5, 0.1*np.sign(i), i * 5, -0.1*np.sign(i+1)]), + timestamp=start_time + ), + position_mapping=[0, 2], + transition_model=CombinedLinearGaussianTransitionModel([ConstantVelocity(0.01)]*2), + ) + for i in range(-3, 4) +} + +# %% +# Simulate Platform Motion and Sensor Measurements +# ------------------------------------------------ +# +# We simulate the motion of each platform and generate sensor measurements for each target. +# The first platform's sensor measurements will be affected by a drifting bias. +# +# We create a time-varying bias using a random walk model, and apply this bias to the measurements of platform 0. +true_bias_prior = State([[0.], [0.]], start_time) +bias_transition_model = CombinedLinearGaussianTransitionModel([RandomWalk(1e-2)]*2) +true_bias = GroundTruthPath([true_bias_prior]) + +# %% +# Simulate platforms and measurements including bias for platform 0 +# +ground_truths = [GroundTruthPath() for _ in platforms] + +timestamps = [start_time + datetime.timedelta(seconds=n) for n in range(1, 51)] +measurements = [[] for _ in sensors] + +for time in timestamps: + # Update the true bias using the transition model + true_bias.append(State.from_state( + true_bias.state, + state_vector=bias_transition_model.function( + true_bias, noise=True, time_interval=time - true_bias.timestamp), + timestamp=time)) + for target in targets: + target.move(timestamp=time) + for platform_index, platform in enumerate(platforms): + platform.move(noise=True, timestamp=time) + + # Add ground truth state for each platform + ground_truth_state = GroundTruthState(platform.state_vector, timestamp=time) + ground_truths[platform_index].append(ground_truth_state) + + # Generate measurement for each platform + measurements[platform_index].append((time, (meas := platform.sensors[0].measure(targets)))) + # Apply drifting bias to platform 0's sensor measurements + if platform_index == 0: + for model in {m.measurement_model for m in meas}: + model.translation_offset = model.translation_offset + true_bias.state_vector + + +# %% +# Visualise Ground Truths and Measurements +# ---------------------------------------- +# We plot the ground truth positions of platforms and targets, and the sensor measurements +# (with bias for platform 0 in green). +plotter = Plotterly() +plotter.plot_ground_truths(ground_truths, mapping=[0, 2], line_dash="solid", label="Platforms") +plotter.plot_ground_truths(targets, mapping=[0, 2]) +for n, sensor_measurements in enumerate(measurements): + kwargs = {} + if n == 0: + kwargs['marker'] = {'color': 'green'} + plotter.plot_measurements( + {m for ms in sensor_measurements for m in ms[1]}, + mapping=[0, 2], + label=f'Sensor {n}', + **kwargs) +plotter.fig + +# %% +# Initialise Bias Estimation +# -------------------------- +# We set up the bias feeder and predictor to estimate the drifting bias from platform 0's sensor +# measurements. +# +# These are all added to a MultiDataFeeder to combine them into single detection feed. +from stonesoup.predictor.kalman import KalmanPredictor +from stonesoup.feeder.bias import TranslationGaussianBiasFeeder +from stonesoup.feeder.multi import MultiDataFeeder + +bias_prior = GaussianState([[0.0], [0.]], np.diag([0.5, 0.5]), start_time) +bias_track = Track([bias_prior]) + +bias_predictor = KalmanPredictor(CombinedLinearGaussianTransitionModel([RandomWalk(1e-1)]*2)) +bias_feeder = TranslationGaussianBiasFeeder( + measurements[0], + bias_prior, + bias_predictor) +# %% +# These are all added to a MultiDataFeeder to combine them into single detection feed. +feeder = MultiDataFeeder([*measurements[1:], bias_feeder]) + +# %% +# Run Tracking and Bias Estimation +# -------------------------------- +# We use an Extended Kalman Predictor and Unscented Kalman Updater for tracking, and associate +# measurements using a global nearest neighbour associator. +from stonesoup.predictor.kalman import KalmanPredictor, ExtendedKalmanPredictor +predictor = ExtendedKalmanPredictor( + CombinedLinearGaussianTransitionModel([ConstantVelocity(0.01)]*2)) + +from stonesoup.updater.kalman import UnscentedKalmanUpdater +updater = UnscentedKalmanUpdater(None) + +from stonesoup.hypothesiser.distance import DistanceHypothesiser +from stonesoup.measures import Mahalanobis +hypothesiser = DistanceHypothesiser(predictor, updater, measure=Mahalanobis(), missed_distance=5) + +from stonesoup.dataassociator.neighbour import GlobalNearestNeighbour +data_associator = GlobalNearestNeighbour(hypothesiser) + +# %% +# Tracks will be initialised by taking first observation from unbiased sensor +from stonesoup.initiator.simple import SinglePointMeasurementInitiator +initiator = SinglePointMeasurementInitiator( + GaussianState([0., 0., 0., 0.], np.diag([0., 1., 0., 1.])) +) +tracks = initiator.initiate(sensors[1].measure({t[0] for t in targets}, noise=False), start_time) + +# %% +# Run Tracking and Bias Estimation +# -------------------------------- +# +# For each time step, we associate measurements to tracks, update the bias estimate, +# and update the tracks accordingly. +for time, detections in feeder: + hypotheses = data_associator.associate(tracks, detections, time) + if any(hasattr(measurement, 'applied_bias') for measurement in detections) and any(hypotheses.values()): + # Update bias estimate using associated measurements + updates = bias_feeder.update_bias([h for h in hypotheses.values() if h]) + for track, update in zip((t for t, h in hypotheses.items() if h), updates): + track.append(update) + for track, hyp in {t: h for t, h in hypotheses.items() if not h}.items(): + track.append(hyp.prediction) + + # Adjust measurement models by removing relative bias for plotting later + rel_bias_vector = bias_track.state_vector - bias_feeder.bias_state.state_vector + for model in {d.measurement_model for d in detections}: + model.translation_offset -= rel_bias_vector + bias_track.append(bias_feeder.bias_state) + else: + # Standard track update if no bias applied i.e. unbiased sensors + for track in tracks: + hypothesis = hypotheses[track] + if hypothesis.measurement: + post = updater.update(hypothesis) + track.append(post) + else: + track.append(hypothesis.prediction) + +# %% +# Visualise Tracking Results +# -------------------------- +# +# We plot the estimated tracks alongside the ground truths and measurements, showing the effect +# of bias estimation. +# +# By comparing the green biased detection to the previous plot (with ground truth layer also to +# make comparison clearer), it can be seen that the bias has been corrected. +plotter = Plotterly() +plotter.plot_ground_truths(ground_truths, mapping=[0, 2], line_dash="solid", label="Platforms") +plotter.plot_ground_truths(targets, mapping=[0, 2]) +for n, sensor_measurements in enumerate(measurements): + kwargs = {} + if n == 0: + kwargs['marker'] = {'color': 'green'} + plotter.plot_measurements( + {m for ms in sensor_measurements for m in ms[1]}, + mapping=[0, 2], + label=f'Sensor {n}', + **kwargs) +plotter.plot_tracks(tracks, [0, 2]) +plotter.fig + +# %% +# Visualise Bias Estimation +# ------------------------- +# +# Finally, we plot the true bias and the estimated bias over time, for both x and y components, +# including 1 standard deviation error area. +plotter = Plotterly(dimension=1, axis_labels=['Bias', 'Time']) +plotter.plot_ground_truths(true_bias, mapping=[0], label="True 𝑥 bias") +plotter.plot_tracks(bias_track, mapping=[0], uncertainty=True, label="𝑥 bias estimate") +plotter.plot_ground_truths(true_bias, mapping=[1], label="True 𝑦 bias") +plotter.plot_tracks(bias_track, mapping=[1], uncertainty=True, label="𝑦 bias estimate") +plotter.fig diff --git a/docs/source/stonesoup.feeder.rst b/docs/source/stonesoup.feeder.rst index ab05c51cd..966915b4f 100644 --- a/docs/source/stonesoup.feeder.rst +++ b/docs/source/stonesoup.feeder.rst @@ -42,3 +42,9 @@ Track .. automodule:: stonesoup.feeder.track :show-inheritance: + +Bias +---- + +.. automodule:: stonesoup.feeder.bias + :show-inheritance: From 6b880b041c12e2f27ca0d7d55d079b385b57ebea Mon Sep 17 00:00:00 2001 From: Steven Hiscocks Date: Thu, 11 Sep 2025 17:34:01 +0100 Subject: [PATCH 07/15] Fix plotterly 1d test --- stonesoup/tests/test_plotter.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/stonesoup/tests/test_plotter.py b/stonesoup/tests/test_plotter.py index 344c8f58f..8f81f8b62 100644 --- a/stonesoup/tests/test_plotter.py +++ b/stonesoup/tests/test_plotter.py @@ -300,15 +300,12 @@ def test_plotterly_1d(): plotter1d.plot_ground_truths(truth, [0]) plotter1d.plot_measurements(true_measurements, [0]) plotter1d.plot_tracks(track, [0]) + plotter1d.plot_tracks(track, [0], uncertainty=True) # check that particle=True does not plot with pytest.raises(NotImplementedError): plotter1d.plot_tracks(track, [0], particle=True) - # check that uncertainty=True does not plot - with pytest.raises(NotImplementedError): - plotter1d.plot_tracks(track, [0], uncertainty=True) - def test_plotterly_2d(): plotter2d = Plotterly() From ab28c816c450cdd65902430f30f4e83edcd5526a Mon Sep 17 00:00:00 2001 From: Steven Hiscocks Date: Fri, 12 Sep 2025 08:00:58 +0100 Subject: [PATCH 08/15] Minor doc fixes for sensor bias --- docs/examples/sensorfusion/senor_bias.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/examples/sensorfusion/senor_bias.py b/docs/examples/sensorfusion/senor_bias.py index db92e5691..b2e494f4e 100644 --- a/docs/examples/sensorfusion/senor_bias.py +++ b/docs/examples/sensorfusion/senor_bias.py @@ -202,9 +202,6 @@ tracks = initiator.initiate(sensors[1].measure({t[0] for t in targets}, noise=False), start_time) # %% -# Run Tracking and Bias Estimation -# -------------------------------- -# # For each time step, we associate measurements to tracks, update the bias estimate, # and update the tracks accordingly. for time, detections in feeder: @@ -262,6 +259,9 @@ # # Finally, we plot the true bias and the estimated bias over time, for both x and y components, # including 1 standard deviation error area. + +# sphinx_gallery_thumbnail_number = 3 + plotter = Plotterly(dimension=1, axis_labels=['Bias', 'Time']) plotter.plot_ground_truths(true_bias, mapping=[0], label="True 𝑥 bias") plotter.plot_tracks(bias_track, mapping=[0], uncertainty=True, label="𝑥 bias estimate") From 2e81e3c4423458a28189459c3fd5182dca79757a Mon Sep 17 00:00:00 2001 From: Steven Hiscocks Date: Wed, 22 Oct 2025 13:48:51 +0100 Subject: [PATCH 09/15] Bias feeder handle case of no bias already applied --- stonesoup/feeder/bias.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/stonesoup/feeder/bias.py b/stonesoup/feeder/bias.py index 96407ec58..d507f23fb 100644 --- a/stonesoup/feeder/bias.py +++ b/stonesoup/feeder/bias.py @@ -75,8 +75,11 @@ def update_bias(self, hypotheses): # Create joint state states = [hypothesis.prediction for hypothesis in hypotheses] - applied_bias = next((h.measurement.applied_bias for h in hypotheses), - np.zeros((ndim_bias, 1))) + applied_bias = next( + (h.measurement.applied_bias + for h in hypotheses + if hasattr(h.measurement, 'applied_bias')), + np.zeros((ndim_bias, 1))) delta_bias = self.bias - applied_bias states.append(GaussianState(delta_bias, self.bias_state.covar)) combined_pred = GaussianState( From 5f94fce281b4ab37eb3a6ee83a500bbcb612d36f Mon Sep 17 00:00:00 2001 From: Steven Hiscocks Date: Fri, 24 Oct 2025 14:23:55 +0100 Subject: [PATCH 10/15] Add GaussianBiasUpdater and move update functionality from feeder This moves update logic from feeder to updater class which is more fitting to the wider design. This also now includes measurement prediction which factors in sensor bias, and such better handles data association, etc. This enables example now to include a prior bias making bias more visible and demonstrating this functionality. --- docs/examples/sensorfusion/senor_bias.py | 47 +++-- stonesoup/feeder/bias.py | 141 ++----------- stonesoup/feeder/tests/test_bias.py | 191 ++---------------- stonesoup/models/measurement/bias.py | 10 +- .../models/measurement/tests/test_bias.py | 12 +- stonesoup/updater/bias.py | 136 +++++++++++++ stonesoup/updater/tests/test_bias.py | 185 +++++++++++++++++ 7 files changed, 389 insertions(+), 333 deletions(-) create mode 100644 stonesoup/updater/bias.py create mode 100644 stonesoup/updater/tests/test_bias.py diff --git a/docs/examples/sensorfusion/senor_bias.py b/docs/examples/sensorfusion/senor_bias.py index b2e494f4e..47474a0bb 100644 --- a/docs/examples/sensorfusion/senor_bias.py +++ b/docs/examples/sensorfusion/senor_bias.py @@ -95,8 +95,9 @@ # We simulate the motion of each platform and generate sensor measurements for each target. # The first platform's sensor measurements will be affected by a drifting bias. # -# We create a time-varying bias using a random walk model, and apply this bias to the measurements of platform 0. -true_bias_prior = State([[0.], [0.]], start_time) +# We create a time-varying bias using a random walk model, and apply this bias to the measurements +# of platform 0. +true_bias_prior = State([[5.], [5.]], start_time) bias_transition_model = CombinedLinearGaussianTransitionModel([RandomWalk(1e-2)]*2) true_bias = GroundTruthPath([true_bias_prior]) @@ -154,22 +155,21 @@ # %% # Initialise Bias Estimation # -------------------------- -# We set up the bias feeder and predictor to estimate the drifting bias from platform 0's sensor +# We set up the bias feeder and predictor to apply the drifting bias from platform 0's sensor # measurements. # # These are all added to a MultiDataFeeder to combine them into single detection feed. +import copy from stonesoup.predictor.kalman import KalmanPredictor from stonesoup.feeder.bias import TranslationGaussianBiasFeeder from stonesoup.feeder.multi import MultiDataFeeder -bias_prior = GaussianState([[0.0], [0.]], np.diag([0.5, 0.5]), start_time) -bias_track = Track([bias_prior]) +bias_state = GaussianState([[0.], [0.]], np.diag([5**2, 5**2]), start_time) +bias_track = Track([copy.copy(bias_state)]) bias_predictor = KalmanPredictor(CombinedLinearGaussianTransitionModel([RandomWalk(1e-1)]*2)) -bias_feeder = TranslationGaussianBiasFeeder( - measurements[0], - bias_prior, - bias_predictor) +bias_feeder = TranslationGaussianBiasFeeder(measurements[0], bias_state) + # %% # These are all added to a MultiDataFeeder to combine them into single detection feed. feeder = MultiDataFeeder([*measurements[1:], bias_feeder]) @@ -190,8 +190,21 @@ from stonesoup.measures import Mahalanobis hypothesiser = DistanceHypothesiser(predictor, updater, measure=Mahalanobis(), missed_distance=5) -from stonesoup.dataassociator.neighbour import GlobalNearestNeighbour -data_associator = GlobalNearestNeighbour(hypothesiser) +from stonesoup.dataassociator.neighbour import GNNWith2DAssignment +data_associator = GNNWith2DAssignment(hypothesiser) + +# %% +# A bias aware hypothesiser and data assocaitor are created to factor the bias uncertainty into +# association threshold. These use bias updater wrapper, which is also used to update target +# and bias estimates. +from stonesoup.updater.bias import GaussianBiasUpdater +from stonesoup.models.measurement.bias import TranslationBiasModelWrapper + +bias_updater = GaussianBiasUpdater( + bias_state, bias_predictor, TranslationBiasModelWrapper, updater) +bias_hypothesiser = DistanceHypothesiser( + predictor, bias_updater, measure=Mahalanobis(), missed_distance=5) +bias_data_associator = GNNWith2DAssignment(bias_hypothesiser) # %% # Tracks will be initialised by taking first observation from unbiased sensor @@ -205,22 +218,24 @@ # For each time step, we associate measurements to tracks, update the bias estimate, # and update the tracks accordingly. for time, detections in feeder: - hypotheses = data_associator.associate(tracks, detections, time) - if any(hasattr(measurement, 'applied_bias') for measurement in detections) and any(hypotheses.values()): + if any(hasattr(measurement.measurement_model, 'applied_bias') for measurement in detections): + hypotheses = bias_data_associator.associate(tracks, detections, time) # Update bias estimate using associated measurements - updates = bias_feeder.update_bias([h for h in hypotheses.values() if h]) + updates = bias_updater.update([h for h in hypotheses.values() if h]) for track, update in zip((t for t, h in hypotheses.items() if h), updates): track.append(update) for track, hyp in {t: h for t, h in hypotheses.items() if not h}.items(): track.append(hyp.prediction) # Adjust measurement models by removing relative bias for plotting later - rel_bias_vector = bias_track.state_vector - bias_feeder.bias_state.state_vector + rel_bias_vector = bias_track.state_vector - bias_state.state_vector for model in {d.measurement_model for d in detections}: model.translation_offset -= rel_bias_vector - bias_track.append(bias_feeder.bias_state) + model.applied_bias += rel_bias_vector # No longer used, but for completeness + bias_track.append(copy.copy(bias_state)) else: # Standard track update if no bias applied i.e. unbiased sensors + hypotheses = data_associator.associate(tracks, detections, time) for track in tracks: hypothesis = hypotheses[track] if hypothesis.measurement: diff --git a/stonesoup/feeder/bias.py b/stonesoup/feeder/bias.py index d507f23fb..344bb9932 100644 --- a/stonesoup/feeder/bias.py +++ b/stonesoup/feeder/bias.py @@ -1,198 +1,83 @@ -import copy import datetime from abc import abstractmethod -from functools import partial - -import numpy as np -from scipy.linalg import block_diag from .base import DetectionFeeder from ..base import Property from ..buffered_generator import BufferedGenerator -from ..models.measurement.bias import \ - OrientationBiasWrapper, OrientationTranslationBiasWrapper, \ - TimeBiasWrapper, TranslationBiasWrapper -from ..models.measurement.nonlinear import CombinedReversibleGaussianMeasurementModel -from ..models.transition import TransitionModel -from ..predictor.kalman import KalmanPredictor -from ..updater.kalman import KalmanUpdater, UnscentedKalmanUpdater -from ..types.array import CovarianceMatrix, StateVector -from ..types.detection import Detection -from ..types.hypothesis import SingleHypothesis from ..types.state import GaussianState -from ..types.update import Update class _GaussianBiasFeeder(DetectionFeeder): - bias_prior: GaussianState = Property(doc="Prior bias") - bias_predictor: KalmanPredictor = Property(doc="Predictor for bias") - updater: KalmanUpdater = Property( - default=None, - doc="Updater for bias and joint states. Must support non-linear models. " - "Default `None` will create UKF instance.") - max_bias: list[float] = Property(default=None, doc="Max bias ± from 0 allowed") - - @property - @abstractmethod - def _model_wrapper(self): - raise NotImplementedError() - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._bias_state = copy.deepcopy(self.bias_prior) - if self.updater is None: - self.updater = UnscentedKalmanUpdater(None) - - @property - def ndim_bias(self): - return self._bias_state.state_vector.shape[0] - - @property - def bias_state(self): - return self._bias_state + bias_state: GaussianState = Property(doc="Prior bias") @property def bias(self): - return self._bias_state.state_vector + return self.bias_state.state_vector @abstractmethod @BufferedGenerator.generator_method def data_gen(self): raise NotImplementedError() - def update_bias(self, hypotheses): - if any(not hyp for hyp in hypotheses): - raise ValueError("Must provide only non-missed detection hypotheses") - - ndim_bias = self.ndim_bias - - # Predict bias - pred_time = max(hypothesis.prediction.timestamp for hypothesis in hypotheses) - if self._bias_state.timestamp is None: - self._bias_state.timestamp = pred_time - else: - self._bias_state = self.bias_predictor.predict(self._bias_state, timestamp=pred_time) - - # Create joint state - states = [hypothesis.prediction for hypothesis in hypotheses] - applied_bias = next( - (h.measurement.applied_bias - for h in hypotheses - if hasattr(h.measurement, 'applied_bias')), - np.zeros((ndim_bias, 1))) - delta_bias = self.bias - applied_bias - states.append(GaussianState(delta_bias, self.bias_state.covar)) - combined_pred = GaussianState( - np.vstack([state.state_vector for state in states]).view(StateVector), - block_diag(*[state.covar for state in states]).view(CovarianceMatrix), - timestamp=pred_time, - ) - - # Create joint measurement - offset = 0 - models = [] - for prediction, measurement in ( - (hypothesis.prediction, hypothesis.measurement) for hypothesis in hypotheses): - models.append(self._model_wrapper( - ndim_state=combined_pred.state_vector.shape[0], - measurement_model=measurement.measurement_model, - state_mapping=[offset + n for n in range(prediction.ndim)], - bias_mapping=list(range(-ndim_bias, 0)) - )) - offset += prediction.ndim - combined_meas = Detection( - np.vstack([hypothesis.measurement.state_vector for hypothesis in hypotheses]), - timestamp=pred_time, - measurement_model=CombinedReversibleGaussianMeasurementModel(models)) - - # Update bias - update = self.updater.update(SingleHypothesis(combined_pred, combined_meas)) - rel_delta_bias = update.state_vector[-ndim_bias:, :] - delta_bias - self.bias_state.state_vector += rel_delta_bias - if self.max_bias is not None: - self.bias_state.state_vector = \ - np.min([abs(self.bias), self.max_bias], axis=0) * np.sign(self.bias) - self.bias_state.covar = update.covar[-ndim_bias:, -ndim_bias:] - - # Create update states - offset = 0 - updates = [] - for hypothesis in hypotheses: - update_slice = slice(offset, offset + hypothesis.prediction.ndim) - updates.append(Update.from_state( - hypothesis.prediction, - state_vector=update.state_vector[update_slice, :], - covar=update.covar[update_slice, update_slice], - timestamp=hypothesis.prediction.timestamp, - hypothesis=hypothesis)) - offset += hypothesis.prediction.ndim - - return updates - class TimeGaussianBiasFeeder(_GaussianBiasFeeder): - transition_model: TransitionModel = Property() - - @property - def _model_wrapper(self): - return partial(TimeBiasWrapper, transition_model=self.transition_model) @BufferedGenerator.generator_method def data_gen(self): for time, detections in self.reader: - bias = self.bias + bias = self.bias_state.state_vector.copy() bias_delta = datetime.timedelta(seconds=float(bias)) time -= bias_delta + models = set() for detection in detections: detection.timestamp -= bias_delta - detection.applied_bias = bias + models.add(detection.measurement_model) + for model in models: + model.applied_bias = bias yield time, detections class OrientationGaussianBiasFeeder(_GaussianBiasFeeder): - _model_wrapper = OrientationBiasWrapper @BufferedGenerator.generator_method def data_gen(self): for time, detections in self.reader: - bias = self.bias + bias = self.bias_state.state_vector.copy() models = set() for detection in detections: models.add(detection.measurement_model) - detection.applied_bias = bias for model in models: model.rotation_offset = model.rotation_offset - bias + model.applied_bias = bias yield time, detections class TranslationGaussianBiasFeeder(_GaussianBiasFeeder): - _model_wrapper = TranslationBiasWrapper @BufferedGenerator.generator_method def data_gen(self): for time, detections in self.reader: - bias = self.bias.copy() + bias = self.bias_state.state_vector.copy() models = set() for detection in detections: models.add(detection.measurement_model) - detection.applied_bias = bias for model in models: model.translation_offset = model.translation_offset - bias + model.applied_bias = bias yield time, detections class OrientationTranslationGaussianBiasFeeder(_GaussianBiasFeeder): - _model_wrapper = OrientationTranslationBiasWrapper @BufferedGenerator.generator_method def data_gen(self): for time, detections in self.reader: - bias = self.bias.copy() + bias = self.bias_state.state_vector.copy() models = set() for detection in detections: models.add(detection.measurement_model) - detection.applied_bias = bias for model in models: model.rotation_offset = model.rotation_offset - bias[:3] model.translation_offset = model.translation_offset - bias[3:] + model.applied_bias = bias yield time, detections diff --git a/stonesoup/feeder/tests/test_bias.py b/stonesoup/feeder/tests/test_bias.py index b65aa0b7d..72afbcf47 100644 --- a/stonesoup/feeder/tests/test_bias.py +++ b/stonesoup/feeder/tests/test_bias.py @@ -1,17 +1,13 @@ import datetime import numpy as np -import pytest from stonesoup.feeder.bias import ( TimeGaussianBiasFeeder, TranslationGaussianBiasFeeder, OrientationGaussianBiasFeeder, OrientationTranslationGaussianBiasFeeder) from stonesoup.functions import build_rotation_matrix -from stonesoup.models.transition.linear import CombinedLinearGaussianTransitionModel, RandomWalk -from stonesoup.predictor.kalman import KalmanPredictor from stonesoup.types.detection import Detection -from stonesoup.types.hypothesis import SingleHypothesis from stonesoup.types.state import GaussianState, StateVector @@ -21,7 +17,7 @@ def make_dummy_detection(state_vector, measurement_model): timestamp=datetime.datetime.now(), measurement_model=measurement_model ) - det.applied_bias = None + det.measurement_model.applied_bias = None return det @@ -45,39 +41,21 @@ def jacobian(self, state, **kwargs): return DummyMeasurementModel() -def make_dummy_transition_model(): - class DummyTransitionModel: - def function(self, state, noise=False, time_interval=None, **kwargs): - # Adds time_interval (seconds) to the state_vector - if time_interval is None: - time_interval = datetime.timedelta(seconds=0) - return state.state_vector + time_interval.total_seconds() - return DummyTransitionModel() - - -@pytest.fixture(params=[None, datetime.datetime(2025, 9, 9, 23, 59, 59)]) -def bias_timestamp(request): - return request.param - - def test_translation_gaussian_bias_feeder_iter(): # Setup feeder - bias_prior = GaussianState(StateVector([[1.], [2.], [3.]]), np.eye(3)) - bias_predictor = KalmanPredictor( - CombinedLinearGaussianTransitionModel([RandomWalk(1)] * 3)) + bias_state = GaussianState(StateVector([[1.], [2.], [3.]]), np.eye(3)) # Mock reader to yield a single time and detection measurement_model = make_dummy_measurement_model() detection = make_dummy_detection(StateVector([[10.], [20.], [30.]]), measurement_model) feeder = TranslationGaussianBiasFeeder( reader=[(datetime.datetime(2025, 9, 10), [detection])], - bias_prior=bias_prior, - bias_predictor=bias_predictor, + bias_state=bias_state, ) # Iterate over feeder for time, detections in feeder: # Bias should be applied to detection for det in detections: - assert np.allclose(det.applied_bias, feeder.bias) + assert np.allclose(det.measurement_model.applied_bias, feeder.bias) # The measurement model's translation_offset should be updated assert np.allclose( det.measurement_model.translation_offset, @@ -85,55 +63,17 @@ def test_translation_gaussian_bias_feeder_iter(): ) -def test_translation_gaussian_bias_feeder_update_bias(bias_timestamp): - # Setup feeder - bias_prior = GaussianState(StateVector([[0.], [0.], [0.]]), np.eye(3) * 10, bias_timestamp) - bias_predictor = KalmanPredictor( - CombinedLinearGaussianTransitionModel([RandomWalk(1)] * 3)) - feeder = TranslationGaussianBiasFeeder( - reader=None, - bias_prior=bias_prior, - bias_predictor=bias_predictor, - ) - # Create dummy prediction and measurement - measurement_model = make_dummy_measurement_model() - pred = GaussianState( - StateVector([[10.], [20.], [30.]]), - np.eye(3), - timestamp=datetime.datetime(2025, 9, 10) - ) - meas = Detection( - StateVector([[11.], [21.], [31.]]), - timestamp=datetime.datetime(2025, 9, 10), - measurement_model=measurement_model - ) - meas.applied_bias = np.zeros((3, 1)) - # Create hypothesis - hyp = SingleHypothesis(pred, meas) - # Call update_bias - updates = feeder.update_bias([hyp]) - # The bias_state should be updated (should not be the same as initial)... - assert not np.allclose(feeder.bias_state.state_vector, bias_prior.state_vector) - # and should be around 0.8 ± 0.1 - assert np.allclose(feeder.bias_state.state_vector, [[0.8], [0.8], [0.8]], atol=0.1) - # The returned updates should match the updated state shape - assert updates[0].state_vector.shape == pred.state_vector.shape - - def test_orientation_gaussian_bias_feeder_iter(): - bias_prior = GaussianState(StateVector([[0.], [0.], [np.pi/16]]), np.eye(3)) - bias_predictor = KalmanPredictor( - CombinedLinearGaussianTransitionModel([RandomWalk(1)] * 3)) + bias_state = GaussianState(StateVector([[0.], [0.], [np.pi/16]]), np.eye(3)) measurement_model = make_dummy_measurement_model() detection = make_dummy_detection(StateVector([[10.], [20.], [30.]]), measurement_model) feeder = OrientationGaussianBiasFeeder( reader=[(datetime.datetime(2025, 9, 10), [detection])], - bias_prior=bias_prior, - bias_predictor=bias_predictor, + bias_state=bias_state, ) for time, detections in feeder: for det in detections: - assert np.allclose(det.applied_bias, feeder.bias) + assert np.allclose(det.measurement_model.applied_bias, feeder.bias) assert np.allclose( det.measurement_model.rotation_offset, StateVector([[0.], [0.], [0.]]) - feeder.bias @@ -145,52 +85,18 @@ def test_orientation_gaussian_bias_feeder_iter(): assert np.allclose(rotated, expected_rotated) -def test_orientation_gaussian_bias_feeder_update_bias(bias_timestamp): - bias_prior = GaussianState( - StateVector([[0.], [0.], [0.]]), np.diag([1e-6, 1e-6, 1]), bias_timestamp) - bias_predictor = KalmanPredictor( - CombinedLinearGaussianTransitionModel([RandomWalk(1e-6)] * 3)) - feeder = OrientationGaussianBiasFeeder( - reader=None, - bias_prior=bias_prior, - bias_predictor=bias_predictor, - max_bias=StateVector([[0.], [0.], [np.pi]]) - ) - measurement_model = make_dummy_measurement_model() - pred = GaussianState( - StateVector([[10.], [20.], [30.]]), - np.eye(3), - timestamp=datetime.datetime(2025, 9, 10) - ) - meas = Detection( - build_rotation_matrix(np.array([[0.], [0.], [-np.pi/16]])) - @ StateVector([[10.], [20.], [30.]]), - timestamp=datetime.datetime(2025, 9, 10), - measurement_model=measurement_model - ) - meas.applied_bias = np.zeros((3, 1)) - hyp = SingleHypothesis(pred, meas) - updates = feeder.update_bias([hyp]) - assert not np.allclose(feeder.bias_state.state_vector, bias_prior.state_vector) - assert np.allclose(feeder.bias_state.state_vector, [[0.], [0.], [np.pi/16]], atol=0.05) - assert updates[0].state_vector.shape == pred.state_vector.shape - - def test_orientation_translation_gaussian_bias_feeder_iter(): - bias_prior = GaussianState( + bias_state = GaussianState( StateVector([[0.], [0.], [np.pi/16], [1.], [2.], [3.]]), np.diag([1e-6, 1e-6, 1, 3, 3, 3])) - bias_predictor = KalmanPredictor( - CombinedLinearGaussianTransitionModel([RandomWalk(1e-6)]*3 + [RandomWalk(1)]*3)) measurement_model = make_dummy_measurement_model() detection = make_dummy_detection(StateVector([[10.], [20.], [30.]]), measurement_model) feeder = OrientationTranslationGaussianBiasFeeder( reader=[(datetime.datetime(2025, 9, 10), [detection])], - bias_prior=bias_prior, - bias_predictor=bias_predictor, + bias_state=bias_state, ) for time, detections in feeder: for det in detections: - assert np.allclose(det.applied_bias, feeder.bias) + assert np.allclose(det.measurement_model.applied_bias, feeder.bias) assert np.allclose( np.vstack([ det.measurement_model.rotation_offset, @@ -204,88 +110,17 @@ def test_orientation_translation_gaussian_bias_feeder_iter(): assert np.allclose(rotated, expected_rotated) -def test_orientation_translation_gaussian_bias_feeder_update_bias(bias_timestamp): - bias_prior = GaussianState( - StateVector([[0.], [0.], [np.pi/16], [1.], [2.], [3.]]), - np.diag([1e-6, 1e-6, 1, 3, 3, 3]), - bias_timestamp) - bias_predictor = KalmanPredictor( - CombinedLinearGaussianTransitionModel([RandomWalk(1e-6)]*3 + [RandomWalk(1)]*3)) - feeder = OrientationTranslationGaussianBiasFeeder( - reader=None, - bias_prior=bias_prior, - bias_predictor=bias_predictor, - ) - measurement_model = make_dummy_measurement_model() - hyps = [] - for n in range(-10, 11): # A lot ok unknowns so a few detections needed - pred = GaussianState( - StateVector([[10.*n], [20.*n], [30.*n]]), - np.eye(3), - timestamp=datetime.datetime(2025, 9, 10) - ) - meas = Detection( - build_rotation_matrix(np.array([[0.], [0.], [-np.pi/16]])) - @ (pred.state_vector+StateVector([[1.], [1.], [1.]])), - timestamp=datetime.datetime(2025, 9, 10), - measurement_model=measurement_model - ) - meas.applied_bias = np.zeros((6, 1)) - hyp = SingleHypothesis(pred, meas) - hyps.append(hyp) - updates = feeder.update_bias(hyps) - assert not np.allclose(feeder.bias_state.state_vector, bias_prior.state_vector) - assert np.allclose( - feeder.bias_state.state_vector, - [[0.], [0.], [np.pi/16], [1.], [1.], [1.]], - atol=0.1) - assert updates[0].state_vector.shape == pred.state_vector.shape - - def test_time_gaussian_bias_feeder_iter(): - bias_prior = GaussianState(StateVector([[0.5]]), np.eye(1)) - bias_predictor = KalmanPredictor( - CombinedLinearGaussianTransitionModel([RandomWalk(1)])) + bias_state = GaussianState(StateVector([[0.5]]), np.eye(1)) measurement_model = make_dummy_measurement_model() detection = make_dummy_detection(StateVector([[42.]]), measurement_model) orig_timestamp = detection.timestamp feeder = TimeGaussianBiasFeeder( reader=[(datetime.datetime(2025, 9, 10), [detection])], - transition_model=None, - bias_prior=bias_prior, - bias_predictor=bias_predictor, + bias_state=bias_state, ) for time, detections in feeder: for det in detections: - assert np.allclose(det.applied_bias, feeder.bias) + assert np.allclose(det.measurement_model.applied_bias, feeder.bias) expected_timestamp = orig_timestamp - datetime.timedelta(seconds=feeder.bias[0]) assert detection.timestamp == expected_timestamp - - -def test_time_gaussian_bias_feeder_update_bias(bias_timestamp): - bias_prior = GaussianState(StateVector([[0.5]]), np.eye(1) * 10, bias_timestamp) - bias_predictor = KalmanPredictor( - CombinedLinearGaussianTransitionModel([RandomWalk(1e-3)])) - feeder = TimeGaussianBiasFeeder( - reader=None, - transition_model=make_dummy_transition_model(), - bias_prior=bias_prior, - bias_predictor=bias_predictor, - ) - measurement_model = make_dummy_measurement_model() - pred = GaussianState( - StateVector([[41.5]]), - np.eye(1), - timestamp=datetime.datetime(2025, 9, 10) - ) - meas = Detection( - StateVector([[41.]]), - timestamp=datetime.datetime(2025, 9, 10), - measurement_model=measurement_model - ) - meas.applied_bias = StateVector([[0.5]]) - hyp = SingleHypothesis(pred, meas) - updates = feeder.update_bias([hyp]) - assert not np.allclose(feeder.bias_state.state_vector, bias_prior.state_vector) - assert np.allclose(feeder.bias_state.state_vector, [[1.0]], atol=0.1) - assert updates[0].state_vector.shape == pred.state_vector.shape diff --git a/stonesoup/models/measurement/bias.py b/stonesoup/models/measurement/bias.py index 7f84ea6e6..b68b161c0 100644 --- a/stonesoup/models/measurement/bias.py +++ b/stonesoup/models/measurement/bias.py @@ -9,7 +9,7 @@ from ...types.state import State, StateVectors -class _BiasWrapper(MeasurementModel): +class BiasModelWrapper(MeasurementModel): measurement_model: MeasurementModel = Property( doc="Model being wrapped that bias will be applied to") state_mapping: list[int] = Property( @@ -41,7 +41,7 @@ def rvs(self, *args, **kwargs): raise NotImplementedError() -class TimeBiasWrapper(_BiasWrapper): +class TimeBiasModelWrapper(BiasModelWrapper): transition_model: TransitionModel = Property( doc="Transition model applied to state to apply time offset") bias_mapping: tuple[int] | int = Property( @@ -64,7 +64,7 @@ def function(self, state, noise=False, **kwargs): State(StateVectors(predicted_state_vectors)), noise=noise, **kwargs) -class OrientationBiasWrapper(_BiasWrapper): +class OrientationBiasModelWrapper(BiasModelWrapper): bias_mapping: tuple[int, int, int] = Property( default=(-3, -2, -1), doc="Mapping to state vector elements where bias is") @@ -82,7 +82,7 @@ def function(self, state, noise=False, **kwargs): return StateVectors(state_vectors) -class TranslationBiasWrapper(_BiasWrapper): +class TranslationBiasModelWrapper(BiasModelWrapper): bias_mapping: tuple[int, int] | tuple[int, int, int] = Property( default=(-3, -2, -1), doc="Mapping to state vector elements where bias is") @@ -100,7 +100,7 @@ def function(self, state, noise=False, **kwargs): return StateVectors(state_vectors) -class OrientationTranslationBiasWrapper(_BiasWrapper): +class OrientationTranslationBiasModelWrapper(BiasModelWrapper): bias_mapping: tuple[int, int, int, int, int] | tuple[int, int, int, int, int, int] = Property( default=(-6, -5, -4, -3, -2, -1), doc="Mapping to state vector elements where bias is") diff --git a/stonesoup/models/measurement/tests/test_bias.py b/stonesoup/models/measurement/tests/test_bias.py index 9180e3324..72db1cd0b 100644 --- a/stonesoup/models/measurement/tests/test_bias.py +++ b/stonesoup/models/measurement/tests/test_bias.py @@ -4,8 +4,8 @@ import numpy as np from stonesoup.models.measurement.bias import ( - TimeBiasWrapper, - OrientationBiasWrapper, TranslationBiasWrapper, OrientationTranslationBiasWrapper) + TimeBiasModelWrapper, TranslationBiasModelWrapper, + OrientationBiasModelWrapper, OrientationTranslationBiasModelWrapper) from stonesoup.models.measurement.nonlinear import CartesianToElevationBearingRangeRate from stonesoup.models.transition.linear import ConstantVelocity from stonesoup.models.transition.base import CombinedGaussianTransitionModel @@ -40,7 +40,7 @@ def test_orientation_translation_bias_wrapper(orientation_bias, translation_bias rotation_offset=StateVector([[0.], [0.], [0.]]), ) # Wrap with orientation+translation bias - wrapper = OrientationTranslationBiasWrapper( + wrapper = OrientationTranslationBiasModelWrapper( measurement_model=model, state_mapping=[0, 1, 2, 3, 4, 5], bias_mapping=(6, 7, 8, 9, 10, 11), @@ -94,7 +94,7 @@ def test_translation_bias_wrapper(translation_bias): rotation_offset=StateVector([[0.], [0.], [0.]]), ) # Wrap with translation bias - wrapper = TranslationBiasWrapper( + wrapper = TranslationBiasModelWrapper( measurement_model=model, state_mapping=[0, 1, 2, 3, 4, 5], bias_mapping=(6, 7, 8), @@ -146,7 +146,7 @@ def test_orientation_bias_wrapper(orientation_bias): rotation_offset=StateVector([[0.], [0.], [0.]]), ) # Wrap with orientation bias - wrapper = OrientationBiasWrapper( + wrapper = OrientationBiasModelWrapper( measurement_model=model, state_mapping=[0, 1, 2, 3, 4, 5], bias_mapping=(6, 7, 8), @@ -210,7 +210,7 @@ def jacobian(self, state, **kwargs): return np.eye(3, 7) # Wrap with time bias - wrapper = TimeBiasWrapper( + wrapper = TimeBiasModelWrapper( measurement_model=DummyMeasurementModel(), transition_model=transition_model, state_mapping=[0, 1, 2, 3, 4, 5], diff --git a/stonesoup/updater/bias.py b/stonesoup/updater/bias.py new file mode 100644 index 000000000..161ec037c --- /dev/null +++ b/stonesoup/updater/bias.py @@ -0,0 +1,136 @@ +import copy + +import numpy as np +from scipy.linalg import block_diag + +from ..base import Property +from ..models.measurement.bias import BiasModelWrapper +from ..models.measurement.nonlinear import CombinedReversibleGaussianMeasurementModel +from ..predictor.kalman import KalmanPredictor +from ..types.array import StateVector, CovarianceMatrix +from ..types.detection import Detection +from ..types.hypothesis import SingleHypothesis +from ..types.state import GaussianState +from ..types.update import Update +from ..updater import Updater +from ..updater.kalman import UnscentedKalmanUpdater + + +class GaussianBiasUpdater(Updater): + measurement_model = None + bias_state: GaussianState = Property(doc="Prior bias") + bias_predictor: KalmanPredictor = Property(doc="Predictor for bias") + bias_model_wrapper: BiasModelWrapper = Property() + updater: Updater = Property( + default=None, + doc="Updater for bias and joint states. Must support non-linear models. " + "Default `None` will create UKF instance.") + max_bias: list[float] = Property(default=None, doc="Max bias ± from 0 allowed") + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + if self.updater is None: + self.updater = UnscentedKalmanUpdater(None) + + def predict_measurement( + self, predicted_state, measurement_model=None, measurement_noise=True, **kwargs): + ndim_bias = self.bias_state.ndim + + # Predict bias + if self.bias_state.timestamp is None: + pred_bias_state = copy.copy(self.bias_state) + pred_bias_state.timestamp = predicted_state.timestamp + else: + pred_bias_state = self.bias_predictor.predict( + self.bias_state, timestamp=predicted_state.timestamp) + + applied_bias = getattr(measurement_model, 'applied_bias', np.zeros((ndim_bias, 1))) + delta_bias = pred_bias_state.state_vector - applied_bias + combined_pred = GaussianState( + np.vstack([predicted_state.state_vector, delta_bias]).view(StateVector), + block_diag(*[predicted_state.covar, pred_bias_state.covar]).view(CovarianceMatrix), + timestamp=predicted_state.timestamp, + ) + + bias_measurement_model = self.bias_model_wrapper( + ndim_state=combined_pred.state_vector.shape[0], + measurement_model=measurement_model, + state_mapping=list(range(predicted_state.ndim)), + bias_mapping=list(range(-ndim_bias, 0)) + ) + + return self.updater.predict_measurement( + combined_pred, bias_measurement_model, measurement_noise, **kwargs) + + def update(self, hypotheses, **kwargs): + if any(not hyp for hyp in hypotheses): + raise ValueError("Must provide only non-missed detection hypotheses") + + ndim_bias = self.bias_state.ndim + + # Predict bias + pred_time = max(hypothesis.prediction.timestamp for hypothesis in hypotheses) + if self.bias_state.timestamp is None: + self.bias_state.timestamp = pred_time + else: + new_bias_state = self.bias_predictor.predict(self.bias_state, timestamp=pred_time) + self.bias_state.state_vector = new_bias_state.state_vector + self.bias_state.covar = new_bias_state.covar + self.bias_state.timestamp = new_bias_state.timestamp + + # Create joint state + states = [hypothesis.prediction for hypothesis in hypotheses] + applied_bias = next( + (h.measurement.measurement_model.applied_bias + for h in hypotheses + if hasattr(h.measurement.measurement_model, 'applied_bias')), + np.zeros((ndim_bias, 1))) + delta_bias = self.bias_state.state_vector - applied_bias + states.append(GaussianState(delta_bias, self.bias_state.covar)) + combined_pred = GaussianState( + np.vstack([state.state_vector for state in states]).view(StateVector), + block_diag(*[state.covar for state in states]).view(CovarianceMatrix), + timestamp=pred_time, + ) + + # Create joint measurement + offset = 0 + models = [] + for prediction, measurement in ( + (hypothesis.prediction, hypothesis.measurement) for hypothesis in hypotheses): + models.append(self.bias_model_wrapper( + ndim_state=combined_pred.state_vector.shape[0], + measurement_model=measurement.measurement_model, + state_mapping=[offset + n for n in range(prediction.ndim)], + bias_mapping=list(range(-ndim_bias, 0)) + )) + offset += prediction.ndim + combined_meas = Detection( + np.vstack([hypothesis.measurement.state_vector for hypothesis in hypotheses]), + timestamp=pred_time, + measurement_model=CombinedReversibleGaussianMeasurementModel(models)) + + # Update bias + update = self.updater.update(SingleHypothesis(combined_pred, combined_meas), **kwargs) + rel_delta_bias = update.state_vector[-ndim_bias:, :] - delta_bias + self.bias_state.state_vector = self.bias_state.state_vector + rel_delta_bias + if self.max_bias is not None: + self.bias_state.state_vector = \ + np.min([abs(self.bias_state.state_vector), self.max_bias], axis=0) \ + * np.sign(self.bias_state.state_vector) + self.bias_state.covar = update.covar[-ndim_bias:, -ndim_bias:] + + # Create update states + offset = 0 + updates = [] + for hypothesis in hypotheses: + update_slice = slice(offset, offset + hypothesis.prediction.ndim) + updates.append(Update.from_state( + hypothesis.prediction, + state_vector=update.state_vector[update_slice, :], + covar=update.covar[update_slice, update_slice], + timestamp=hypothesis.prediction.timestamp, + hypothesis=hypothesis)) + offset += hypothesis.prediction.ndim + + return updates diff --git a/stonesoup/updater/tests/test_bias.py b/stonesoup/updater/tests/test_bias.py new file mode 100644 index 000000000..469beeb6d --- /dev/null +++ b/stonesoup/updater/tests/test_bias.py @@ -0,0 +1,185 @@ +import copy +import datetime +from functools import partial + +import numpy as np +import pytest + +from ..bias import GaussianBiasUpdater +from ...models.measurement.bias import ( + TimeBiasModelWrapper, TranslationBiasModelWrapper, + OrientationBiasModelWrapper, OrientationTranslationBiasModelWrapper) +from ...functions import build_rotation_matrix +from ...models.transition.linear import CombinedLinearGaussianTransitionModel, RandomWalk +from ...predictor.kalman import KalmanPredictor +from ...types.detection import Detection +from ...types.hypothesis import SingleHypothesis +from ...types.state import GaussianState, StateVector + + +def make_dummy_measurement_model(): + class DummyMeasurementModel: + mapping = [0, 1, 2] + ndim_meas = 3 + translation_offset = StateVector([[0.], [0.], [0.]]) + rotation_offset = StateVector([[0.], [0.], [0.]]) + + def function(self, state, noise=False, **kwargs): + return build_rotation_matrix(self.rotation_offset) \ + @ (state.state_vector - self.translation_offset) + + def covar(self, *args, **kwargs): + return np.eye(3) + + def jacobian(self, state, **kwargs): + return np.eye(3, 6) + + return DummyMeasurementModel() + + +def make_dummy_transition_model(): + class DummyTransitionModel: + def function(self, state, noise=False, time_interval=None, **kwargs): + # Adds time_interval (seconds) to the state_vector + if time_interval is None: + time_interval = datetime.timedelta(seconds=0) + return state.state_vector + time_interval.total_seconds() + return DummyTransitionModel() + + +@pytest.fixture(params=[None, datetime.datetime(2025, 9, 10)]) +def bias_timestamp(request): + return request.param + + +def test_translation_gaussian_bias_feeder_update_bias(bias_timestamp): + # Setup feeder + bias_prior = GaussianState(StateVector([[0.], [0.], [0.]]), np.eye(3) * 10, bias_timestamp) + bias_predictor = KalmanPredictor( + CombinedLinearGaussianTransitionModel([RandomWalk(1)] * 3)) + updater = GaussianBiasUpdater( + bias_state=copy.copy(bias_prior), + bias_predictor=bias_predictor, + bias_model_wrapper=TranslationBiasModelWrapper, + ) + # Create dummy prediction and measurement + measurement_model = make_dummy_measurement_model() + pred = GaussianState( + StateVector([[10.], [20.], [30.]]), + np.eye(3), + timestamp=datetime.datetime(2025, 9, 10) + ) + meas = Detection( + StateVector([[11.], [21.], [31.]]), + timestamp=datetime.datetime(2025, 9, 10), + measurement_model=measurement_model + ) + meas.applied_bias = np.zeros((3, 1)) + # Create hypothesis + hyp = SingleHypothesis(pred, meas) + # Call update_bias + updates = updater.update([hyp]) + # The bias_state should be updated (should not be the same as initial)... + assert not np.allclose(updater.bias_state.state_vector, bias_prior.state_vector) + # and should be around 0.8 ± 0.1 + assert np.allclose(updater.bias_state.state_vector, [[0.8], [0.8], [0.8]], atol=0.1) + # The returned updates should match the updated state shape + assert updates[0].state_vector.shape == pred.state_vector.shape + + +def test_orientation_gaussian_bias_feeder_update_bias(bias_timestamp): + bias_prior = GaussianState( + StateVector([[0.], [0.], [0.]]), np.diag([1e-6, 1e-6, 1]), bias_timestamp) + bias_predictor = KalmanPredictor( + CombinedLinearGaussianTransitionModel([RandomWalk(1e-6)] * 3)) + updater = GaussianBiasUpdater( + bias_state=copy.copy(bias_prior), + bias_predictor=bias_predictor, + bias_model_wrapper=OrientationBiasModelWrapper, + max_bias=StateVector([[0.], [0.], [np.pi]]) + ) + measurement_model = make_dummy_measurement_model() + pred = GaussianState( + StateVector([[10.], [20.], [30.]]), + np.eye(3), + timestamp=datetime.datetime(2025, 9, 10) + ) + meas = Detection( + build_rotation_matrix(np.array([[0.], [0.], [-np.pi/16]])) + @ StateVector([[10.], [20.], [30.]]), + timestamp=datetime.datetime(2025, 9, 10), + measurement_model=measurement_model + ) + meas.applied_bias = np.zeros((3, 1)) + hyp = SingleHypothesis(pred, meas) + updates = updater.update([hyp]) + assert not np.allclose(updater.bias_state.state_vector, bias_prior.state_vector) + assert np.allclose(updater.bias_state.state_vector, [[0.], [0.], [np.pi/16]], atol=0.05) + assert updates[0].state_vector.shape == pred.state_vector.shape + + +def test_orientation_translation_gaussian_bias_feeder_update_bias(bias_timestamp): + bias_prior = GaussianState( + StateVector([[0.], [0.], [np.pi/16], [1.], [2.], [3.]]), + np.diag([1e-6, 1e-6, 1, 3, 3, 3]), + bias_timestamp) + bias_predictor = KalmanPredictor( + CombinedLinearGaussianTransitionModel([RandomWalk(1e-6)]*3 + [RandomWalk(1)]*3)) + updater = GaussianBiasUpdater( + bias_state=copy.copy(bias_prior), + bias_predictor=bias_predictor, + bias_model_wrapper=OrientationTranslationBiasModelWrapper + ) + measurement_model = make_dummy_measurement_model() + hyps = [] + for n in range(-10, 11): # A lot ok unknowns so a few detections needed + pred = GaussianState( + StateVector([[10.*n], [20.*n], [30.*n]]), + np.eye(3), + timestamp=datetime.datetime(2025, 9, 10) + ) + meas = Detection( + build_rotation_matrix(np.array([[0.], [0.], [-np.pi/16]])) + @ (pred.state_vector+StateVector([[1.], [1.], [1.]])), + timestamp=datetime.datetime(2025, 9, 10), + measurement_model=measurement_model + ) + meas.measurement_model.applied_bias = np.zeros((6, 1)) + hyp = SingleHypothesis(pred, meas) + hyps.append(hyp) + updates = updater.update(hyps) + assert not np.allclose(updater.bias_state.state_vector, bias_prior.state_vector) + assert np.allclose( + updater.bias_state.state_vector, + [[0.], [0.], [np.pi/16], [1.], [1.], [1.]], + atol=0.1) + assert updates[0].state_vector.shape == pred.state_vector.shape + + +def test_time_gaussian_bias_feeder_update_bias(bias_timestamp): + bias_prior = GaussianState(StateVector([[0.5]]), np.eye(1) * 10, bias_timestamp) + bias_predictor = KalmanPredictor( + CombinedLinearGaussianTransitionModel([RandomWalk(1e-3)])) + updater = GaussianBiasUpdater( + bias_state=copy.copy(bias_prior), + bias_predictor=bias_predictor, + bias_model_wrapper=partial( + TimeBiasModelWrapper, transition_model=make_dummy_transition_model()) + ) + measurement_model = make_dummy_measurement_model() + pred = GaussianState( + StateVector([[41.5]]), + np.eye(1), + timestamp=datetime.datetime(2025, 9, 10) + ) + meas = Detection( + StateVector([[41.]]), + timestamp=datetime.datetime(2025, 9, 10), + measurement_model=measurement_model + ) + meas.measurement_model.applied_bias = StateVector([[0.5]]) + hyp = SingleHypothesis(pred, meas) + updates = updater.update([hyp]) + assert not np.allclose(updater.bias_state.state_vector, bias_prior.state_vector) + assert np.allclose(updater.bias_state.state_vector, [[1.0]], atol=0.1) + assert updates[0].state_vector.shape == pred.state_vector.shape From 015b3f0ff39c0dd366c192121b12a326e66e4797 Mon Sep 17 00:00:00 2001 From: Steven Hiscocks Date: Fri, 24 Oct 2025 14:54:39 +0100 Subject: [PATCH 11/15] Add test for GaussianBiasUpdater predict_measurement --- stonesoup/updater/tests/test_bias.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/stonesoup/updater/tests/test_bias.py b/stonesoup/updater/tests/test_bias.py index 469beeb6d..535680819 100644 --- a/stonesoup/updater/tests/test_bias.py +++ b/stonesoup/updater/tests/test_bias.py @@ -74,7 +74,12 @@ def test_translation_gaussian_bias_feeder_update_bias(bias_timestamp): timestamp=datetime.datetime(2025, 9, 10), measurement_model=measurement_model ) - meas.applied_bias = np.zeros((3, 1)) + meas.measurement_model.applied_bias = np.zeros((3, 1)) + + pred_meas = updater.predict_measurement(pred, measurement_model) + unbias_pred_meas = updater.updater.predict_measurement(pred, measurement_model) + assert np.sum(np.trace(pred_meas.covar)) > np.sum(np.trace(unbias_pred_meas.covar)) + # Create hypothesis hyp = SingleHypothesis(pred, meas) # Call update_bias @@ -110,7 +115,12 @@ def test_orientation_gaussian_bias_feeder_update_bias(bias_timestamp): timestamp=datetime.datetime(2025, 9, 10), measurement_model=measurement_model ) - meas.applied_bias = np.zeros((3, 1)) + meas.measurement_model.applied_bias = np.zeros((3, 1)) + + pred_meas = updater.predict_measurement(pred, measurement_model) + unbias_pred_meas = updater.updater.predict_measurement(pred, measurement_model) + assert np.sum(np.trace(pred_meas.covar)) > np.sum(np.trace(unbias_pred_meas.covar)) + hyp = SingleHypothesis(pred, meas) updates = updater.update([hyp]) assert not np.allclose(updater.bias_state.state_vector, bias_prior.state_vector) @@ -178,6 +188,11 @@ def test_time_gaussian_bias_feeder_update_bias(bias_timestamp): measurement_model=measurement_model ) meas.measurement_model.applied_bias = StateVector([[0.5]]) + + pred_meas = updater.predict_measurement(pred, measurement_model) + unbias_pred_meas = updater.updater.predict_measurement(pred, measurement_model) + assert np.sum(np.trace(pred_meas.covar)) > np.sum(np.trace(unbias_pred_meas.covar)) + hyp = SingleHypothesis(pred, meas) updates = updater.update([hyp]) assert not np.allclose(updater.bias_state.state_vector, bias_prior.state_vector) From 923e1be6cc6a68962e453cb733b8d62cc8b0908f Mon Sep 17 00:00:00 2001 From: Steven Hiscocks Date: Thu, 30 Oct 2025 16:16:42 +0000 Subject: [PATCH 12/15] Update sensor bias documentation --- docs/examples/sensorfusion/senor_bias.py | 6 +-- docs/source/stonesoup.models.measurement.rst | 6 +++ docs/source/stonesoup.updater.rst | 6 +++ stonesoup/feeder/bias.py | 41 ++++++++++++++---- stonesoup/feeder/tests/test_bias.py | 14 +++--- stonesoup/models/measurement/bias.py | 45 +++++++++++++++++--- stonesoup/updater/bias.py | 10 +++++ 7 files changed, 105 insertions(+), 23 deletions(-) diff --git a/docs/examples/sensorfusion/senor_bias.py b/docs/examples/sensorfusion/senor_bias.py index 47474a0bb..3af0082f8 100644 --- a/docs/examples/sensorfusion/senor_bias.py +++ b/docs/examples/sensorfusion/senor_bias.py @@ -161,14 +161,14 @@ # These are all added to a MultiDataFeeder to combine them into single detection feed. import copy from stonesoup.predictor.kalman import KalmanPredictor -from stonesoup.feeder.bias import TranslationGaussianBiasFeeder +from stonesoup.feeder.bias import TranslationBiasFeeder from stonesoup.feeder.multi import MultiDataFeeder bias_state = GaussianState([[0.], [0.]], np.diag([5**2, 5**2]), start_time) bias_track = Track([copy.copy(bias_state)]) bias_predictor = KalmanPredictor(CombinedLinearGaussianTransitionModel([RandomWalk(1e-1)]*2)) -bias_feeder = TranslationGaussianBiasFeeder(measurements[0], bias_state) +bias_feeder = TranslationBiasFeeder(measurements[0], bias_state) # %% # These are all added to a MultiDataFeeder to combine them into single detection feed. @@ -194,7 +194,7 @@ data_associator = GNNWith2DAssignment(hypothesiser) # %% -# A bias aware hypothesiser and data assocaitor are created to factor the bias uncertainty into +# A bias aware hypothesiser and data associator are created to factor the bias uncertainty into # association threshold. These use bias updater wrapper, which is also used to update target # and bias estimates. from stonesoup.updater.bias import GaussianBiasUpdater diff --git a/docs/source/stonesoup.models.measurement.rst b/docs/source/stonesoup.models.measurement.rst index a2737c3bd..a087e5299 100644 --- a/docs/source/stonesoup.models.measurement.rst +++ b/docs/source/stonesoup.models.measurement.rst @@ -36,3 +36,9 @@ Graph .. automodule:: stonesoup.models.measurement.graph :show-inheritance: :inherited-members: + +Bias +---- +.. automodule:: stonesoup.models.measurement.bias + :show-inheritance: + :inherited-members: diff --git a/docs/source/stonesoup.updater.rst b/docs/source/stonesoup.updater.rst index 834e77097..a5fffd6a4 100644 --- a/docs/source/stonesoup.updater.rst +++ b/docs/source/stonesoup.updater.rst @@ -95,3 +95,9 @@ Probabilistic .. automodule:: stonesoup.updater.probability :show-inheritance: + +Bias +---- + +.. automodule:: stonesoup.updater.bias + :show-inheritance: diff --git a/stonesoup/feeder/bias.py b/stonesoup/feeder/bias.py index 344bb9932..2c39645c4 100644 --- a/stonesoup/feeder/bias.py +++ b/stonesoup/feeder/bias.py @@ -7,12 +7,12 @@ from ..types.state import GaussianState -class _GaussianBiasFeeder(DetectionFeeder): - bias_state: GaussianState = Property(doc="Prior bias") +class _BiasFeeder(DetectionFeeder): + bias_state: GaussianState = Property(doc="Bias State") @property def bias(self): - return self.bias_state.state_vector + return self.bias_state.mean @abstractmethod @BufferedGenerator.generator_method @@ -20,12 +20,18 @@ def data_gen(self): raise NotImplementedError() -class TimeGaussianBiasFeeder(_GaussianBiasFeeder): +class TimeBiasFeeder(_BiasFeeder): + """Time Bias Feeder + + Apply bias to detection timestamp and overall timestamp yielded. + """ + bias_state: GaussianState = Property( + doc="Bias state with state vector shape (1, 1) in units of seconds") @BufferedGenerator.generator_method def data_gen(self): for time, detections in self.reader: - bias = self.bias_state.state_vector.copy() + bias = self.bias_state.state_vector[0, 0] bias_delta = datetime.timedelta(seconds=float(bias)) time -= bias_delta models = set() @@ -37,7 +43,13 @@ def data_gen(self): yield time, detections -class OrientationGaussianBiasFeeder(_GaussianBiasFeeder): +class OrientationBiasFeeder(_BiasFeeder): + """Orientation Bias Feeder + + Apply bias to detection measurement model rotation offset + """ + bias_state: GaussianState = Property( + doc="Bias state with state vector shape (3, 1) is expected") @BufferedGenerator.generator_method def data_gen(self): @@ -52,7 +64,13 @@ def data_gen(self): yield time, detections -class TranslationGaussianBiasFeeder(_GaussianBiasFeeder): +class TranslationBiasFeeder(_BiasFeeder): + """Translation Bias Feeder + + Apply bias to detection measurement model translation offset + """ + bias_state: GaussianState = Property( + doc="Bias state with state vector shape (n, 1), where n is dimensions of the model") @BufferedGenerator.generator_method def data_gen(self): @@ -67,7 +85,14 @@ def data_gen(self): yield time, detections -class OrientationTranslationGaussianBiasFeeder(_GaussianBiasFeeder): +class OrientationTranslationBiasFeeder(_BiasFeeder): + """Orientation Translation Bias Feeder + + Apply bias to detection measurement model rotation and translation offset + """ + bias_state: GaussianState = Property( + doc="Bias state with state vector shape (3+n, 1), 3 for rotation and where n is " + "dimensions of the model") @BufferedGenerator.generator_method def data_gen(self): diff --git a/stonesoup/feeder/tests/test_bias.py b/stonesoup/feeder/tests/test_bias.py index 72afbcf47..055fbcdbb 100644 --- a/stonesoup/feeder/tests/test_bias.py +++ b/stonesoup/feeder/tests/test_bias.py @@ -3,9 +3,9 @@ import numpy as np from stonesoup.feeder.bias import ( - TimeGaussianBiasFeeder, - TranslationGaussianBiasFeeder, OrientationGaussianBiasFeeder, - OrientationTranslationGaussianBiasFeeder) + TimeBiasFeeder, + TranslationBiasFeeder, OrientationBiasFeeder, + OrientationTranslationBiasFeeder) from stonesoup.functions import build_rotation_matrix from stonesoup.types.detection import Detection from stonesoup.types.state import GaussianState, StateVector @@ -47,7 +47,7 @@ def test_translation_gaussian_bias_feeder_iter(): # Mock reader to yield a single time and detection measurement_model = make_dummy_measurement_model() detection = make_dummy_detection(StateVector([[10.], [20.], [30.]]), measurement_model) - feeder = TranslationGaussianBiasFeeder( + feeder = TranslationBiasFeeder( reader=[(datetime.datetime(2025, 9, 10), [detection])], bias_state=bias_state, ) @@ -67,7 +67,7 @@ def test_orientation_gaussian_bias_feeder_iter(): bias_state = GaussianState(StateVector([[0.], [0.], [np.pi/16]]), np.eye(3)) measurement_model = make_dummy_measurement_model() detection = make_dummy_detection(StateVector([[10.], [20.], [30.]]), measurement_model) - feeder = OrientationGaussianBiasFeeder( + feeder = OrientationBiasFeeder( reader=[(datetime.datetime(2025, 9, 10), [detection])], bias_state=bias_state, ) @@ -90,7 +90,7 @@ def test_orientation_translation_gaussian_bias_feeder_iter(): StateVector([[0.], [0.], [np.pi/16], [1.], [2.], [3.]]), np.diag([1e-6, 1e-6, 1, 3, 3, 3])) measurement_model = make_dummy_measurement_model() detection = make_dummy_detection(StateVector([[10.], [20.], [30.]]), measurement_model) - feeder = OrientationTranslationGaussianBiasFeeder( + feeder = OrientationTranslationBiasFeeder( reader=[(datetime.datetime(2025, 9, 10), [detection])], bias_state=bias_state, ) @@ -115,7 +115,7 @@ def test_time_gaussian_bias_feeder_iter(): measurement_model = make_dummy_measurement_model() detection = make_dummy_detection(StateVector([[42.]]), measurement_model) orig_timestamp = detection.timestamp - feeder = TimeGaussianBiasFeeder( + feeder = TimeBiasFeeder( reader=[(datetime.datetime(2025, 9, 10), [detection])], bias_state=bias_state, ) diff --git a/stonesoup/models/measurement/bias.py b/stonesoup/models/measurement/bias.py index b68b161c0..0e411f70d 100644 --- a/stonesoup/models/measurement/bias.py +++ b/stonesoup/models/measurement/bias.py @@ -10,8 +10,10 @@ class BiasModelWrapper(MeasurementModel): + """Abstract wrapper that applies bias values to an existing MeasurementModel. + """ measurement_model: MeasurementModel = Property( - doc="Model being wrapped that bias will be applied to") + doc="Unbiased model being wrapped that bias will be applied to") state_mapping: list[int] = Property( doc="Mapping to state vector elements relevant to wrapped model") bias_mapping: list[int] = Property(doc="Mapping to state vector elements where bias is") @@ -42,9 +44,17 @@ def rvs(self, *args, **kwargs): class TimeBiasModelWrapper(BiasModelWrapper): + """Apply a time-offset bias from the state to the wrapped measurement model. + + The bias elements selected by `bias_mapping` are interpreted as a time offset in seconds. + For each state vector the wrapper computes the state at the (biased) measurement time by + applying the `transition_model` with a negative time interval equal to the bias. The + resulting state is then passed to the wrapped `measurement_model` to produce the + (time-corrected) state vector. + """ transition_model: TransitionModel = Property( doc="Transition model applied to state to apply time offset") - bias_mapping: tuple[int] | int = Property( + bias_mapping: list[int] = Property( default=(-1, ), doc="Mapping to state vector elements where bias is") def __init__(self, *args, **kwargs): @@ -65,7 +75,17 @@ def function(self, state, noise=False, **kwargs): class OrientationBiasModelWrapper(BiasModelWrapper): - bias_mapping: tuple[int, int, int] = Property( + """Apply an orientation (rotation) bias from the state to the wrapped measurement model. + + The wrapper expects `bias_mapping` to select orientation elements (e.g. Euler angles or + equivalent) stored in the state vector. For each input state the wrapper creates a copy + of the wrapped measurement model, adjusts its `rotation_offset` by subtracting the + bias value, and then evaluates the measurement function using the corrected model. + + This allows the wrapped model to remain stateless while the wrapper applies per-state + orientation corrections. + """ + bias_mapping: list[int] = Property( default=(-3, -2, -1), doc="Mapping to state vector elements where bias is") def function(self, state, noise=False, **kwargs): @@ -83,7 +103,14 @@ def function(self, state, noise=False, **kwargs): class TranslationBiasModelWrapper(BiasModelWrapper): - bias_mapping: tuple[int, int] | tuple[int, int, int] = Property( + """Apply a translation (position) bias from the state to the wrapped measurement model. + + The wrapper expects `bias_mapping` to select translation elements stored in the state + vector. For each input state the wrapper creates a copy of the wrapped measurement model, + adjusts its `translation_offset` by subtracting the bias value, and then evaluates the + measurement function using the corrected model. + """ + bias_mapping: list[int] = Property( default=(-3, -2, -1), doc="Mapping to state vector elements where bias is") def function(self, state, noise=False, **kwargs): @@ -101,7 +128,15 @@ def function(self, state, noise=False, **kwargs): class OrientationTranslationBiasModelWrapper(BiasModelWrapper): - bias_mapping: tuple[int, int, int, int, int] | tuple[int, int, int, int, int, int] = Property( + """Apply combined orientation and translation biases from the state to the wrapped model. + + `bias_mapping` is expected to contain orientation indices first (3 elements) followed by + translation indices (2 or 3 elements). For each input state the wrapper copies the + wrapped measurement model, subtracts the orientation and translation bias values from + the model's `rotation_offset` and `translation_offset` respectively, and evaluates the + corrected model on the mapped portion of the state. + """ + bias_mapping: list[int] = Property( default=(-6, -5, -4, -3, -2, -1), doc="Mapping to state vector elements where bias is") def function(self, state, noise=False, **kwargs): diff --git a/stonesoup/updater/bias.py b/stonesoup/updater/bias.py index 161ec037c..3c522db7d 100644 --- a/stonesoup/updater/bias.py +++ b/stonesoup/updater/bias.py @@ -17,6 +17,16 @@ class GaussianBiasUpdater(Updater): + """Updater that jointly estimates a bias alongside target states. + + Maintains a separate Gaussian bias state and integrates it with target predictions + to perform joint prediction and update steps. Uses a provided non-linear `updater` + (defaults to an Unscented Kalman updater) and a `bias_model_wrapper` to build + joint measurement models for bias estimation. + + Note that this assumes that all measurements/hypotheses are updating a common + bias i.e. all measurements from the same sensor. + """ measurement_model = None bias_state: GaussianState = Property(doc="Prior bias") bias_predictor: KalmanPredictor = Property(doc="Predictor for bias") From 96d5420c4bd9c048d8f3d2d2c42af744206ee6ab Mon Sep 17 00:00:00 2001 From: Christopher Sherman Date: Mon, 3 Nov 2025 13:28:36 +0000 Subject: [PATCH 13/15] Replace bias_state with a bias_track in BiasFeeder and BiasUpdaters --- docs/examples/sensorfusion/senor_bias.py | 10 +++--- stonesoup/feeder/bias.py | 30 +++++++++-------- stonesoup/feeder/tests/test_bias.py | 9 +++--- stonesoup/updater/bias.py | 41 ++++++++++++------------ stonesoup/updater/tests/test_bias.py | 25 ++++++++------- 5 files changed, 58 insertions(+), 57 deletions(-) diff --git a/docs/examples/sensorfusion/senor_bias.py b/docs/examples/sensorfusion/senor_bias.py index 3af0082f8..417129f32 100644 --- a/docs/examples/sensorfusion/senor_bias.py +++ b/docs/examples/sensorfusion/senor_bias.py @@ -159,16 +159,15 @@ # measurements. # # These are all added to a MultiDataFeeder to combine them into single detection feed. -import copy from stonesoup.predictor.kalman import KalmanPredictor from stonesoup.feeder.bias import TranslationBiasFeeder from stonesoup.feeder.multi import MultiDataFeeder bias_state = GaussianState([[0.], [0.]], np.diag([5**2, 5**2]), start_time) -bias_track = Track([copy.copy(bias_state)]) +bias_track = Track([bias_state]) bias_predictor = KalmanPredictor(CombinedLinearGaussianTransitionModel([RandomWalk(1e-1)]*2)) -bias_feeder = TranslationBiasFeeder(measurements[0], bias_state) +bias_feeder = TranslationBiasFeeder(measurements[0], bias_track) # %% # These are all added to a MultiDataFeeder to combine them into single detection feed. @@ -201,7 +200,7 @@ from stonesoup.models.measurement.bias import TranslationBiasModelWrapper bias_updater = GaussianBiasUpdater( - bias_state, bias_predictor, TranslationBiasModelWrapper, updater) + bias_track, bias_predictor, TranslationBiasModelWrapper, updater) bias_hypothesiser = DistanceHypothesiser( predictor, bias_updater, measure=Mahalanobis(), missed_distance=5) bias_data_associator = GNNWith2DAssignment(bias_hypothesiser) @@ -228,11 +227,10 @@ track.append(hyp.prediction) # Adjust measurement models by removing relative bias for plotting later - rel_bias_vector = bias_track.state_vector - bias_state.state_vector + rel_bias_vector = bias_track[-2].state_vector - bias_track[-1].state_vector for model in {d.measurement_model for d in detections}: model.translation_offset -= rel_bias_vector model.applied_bias += rel_bias_vector # No longer used, but for completeness - bias_track.append(copy.copy(bias_state)) else: # Standard track update if no bias applied i.e. unbiased sensors hypotheses = data_associator.associate(tracks, detections, time) diff --git a/stonesoup/feeder/bias.py b/stonesoup/feeder/bias.py index 2c39645c4..85b52ebde 100644 --- a/stonesoup/feeder/bias.py +++ b/stonesoup/feeder/bias.py @@ -5,14 +5,15 @@ from ..base import Property from ..buffered_generator import BufferedGenerator from ..types.state import GaussianState +from ..types.track import Track class _BiasFeeder(DetectionFeeder): - bias_state: GaussianState = Property(doc="Bias State") + bias_track: Track[GaussianState] = Property(doc="Track of bias states") @property def bias(self): - return self.bias_state.mean + return self.bias_track.state.mean @abstractmethod @BufferedGenerator.generator_method @@ -25,13 +26,13 @@ class TimeBiasFeeder(_BiasFeeder): Apply bias to detection timestamp and overall timestamp yielded. """ - bias_state: GaussianState = Property( - doc="Bias state with state vector shape (1, 1) in units of seconds") + bias_track: Track[GaussianState] = Property( + doc="Track of bias states with state vector shape (1, 1) in units of seconds") @BufferedGenerator.generator_method def data_gen(self): for time, detections in self.reader: - bias = self.bias_state.state_vector[0, 0] + bias = self.bias_track.state_vector[0, 0] bias_delta = datetime.timedelta(seconds=float(bias)) time -= bias_delta models = set() @@ -48,13 +49,13 @@ class OrientationBiasFeeder(_BiasFeeder): Apply bias to detection measurement model rotation offset """ - bias_state: GaussianState = Property( - doc="Bias state with state vector shape (3, 1) is expected") + bias_track: Track[GaussianState] = Property( + doc="Track of bias states with state vector shape (3, 1) is expected") @BufferedGenerator.generator_method def data_gen(self): for time, detections in self.reader: - bias = self.bias_state.state_vector.copy() + bias = self.bias_track.state_vector.copy() models = set() for detection in detections: models.add(detection.measurement_model) @@ -69,13 +70,14 @@ class TranslationBiasFeeder(_BiasFeeder): Apply bias to detection measurement model translation offset """ - bias_state: GaussianState = Property( - doc="Bias state with state vector shape (n, 1), where n is dimensions of the model") + bias_track: Track[GaussianState] = Property( + doc="Track of bias states with state vector shape (n, 1), " + "where n is dimensions of the model") @BufferedGenerator.generator_method def data_gen(self): for time, detections in self.reader: - bias = self.bias_state.state_vector.copy() + bias = self.bias_track.state_vector.copy() models = set() for detection in detections: models.add(detection.measurement_model) @@ -90,14 +92,14 @@ class OrientationTranslationBiasFeeder(_BiasFeeder): Apply bias to detection measurement model rotation and translation offset """ - bias_state: GaussianState = Property( - doc="Bias state with state vector shape (3+n, 1), 3 for rotation and where n is " + bias_track: Track[GaussianState] = Property( + doc="Track of bias states with state vector shape (3+n, 1), 3 for rotation and where n is " "dimensions of the model") @BufferedGenerator.generator_method def data_gen(self): for time, detections in self.reader: - bias = self.bias_state.state_vector.copy() + bias = self.bias_track.state_vector.copy() models = set() for detection in detections: models.add(detection.measurement_model) diff --git a/stonesoup/feeder/tests/test_bias.py b/stonesoup/feeder/tests/test_bias.py index 055fbcdbb..666223cc8 100644 --- a/stonesoup/feeder/tests/test_bias.py +++ b/stonesoup/feeder/tests/test_bias.py @@ -9,6 +9,7 @@ from stonesoup.functions import build_rotation_matrix from stonesoup.types.detection import Detection from stonesoup.types.state import GaussianState, StateVector +from stonesoup.types.track import Track def make_dummy_detection(state_vector, measurement_model): @@ -49,7 +50,7 @@ def test_translation_gaussian_bias_feeder_iter(): detection = make_dummy_detection(StateVector([[10.], [20.], [30.]]), measurement_model) feeder = TranslationBiasFeeder( reader=[(datetime.datetime(2025, 9, 10), [detection])], - bias_state=bias_state, + bias_track=Track([bias_state]), ) # Iterate over feeder for time, detections in feeder: @@ -69,7 +70,7 @@ def test_orientation_gaussian_bias_feeder_iter(): detection = make_dummy_detection(StateVector([[10.], [20.], [30.]]), measurement_model) feeder = OrientationBiasFeeder( reader=[(datetime.datetime(2025, 9, 10), [detection])], - bias_state=bias_state, + bias_track=Track([bias_state]), ) for time, detections in feeder: for det in detections: @@ -92,7 +93,7 @@ def test_orientation_translation_gaussian_bias_feeder_iter(): detection = make_dummy_detection(StateVector([[10.], [20.], [30.]]), measurement_model) feeder = OrientationTranslationBiasFeeder( reader=[(datetime.datetime(2025, 9, 10), [detection])], - bias_state=bias_state, + bias_track=Track([bias_state]), ) for time, detections in feeder: for det in detections: @@ -117,7 +118,7 @@ def test_time_gaussian_bias_feeder_iter(): orig_timestamp = detection.timestamp feeder = TimeBiasFeeder( reader=[(datetime.datetime(2025, 9, 10), [detection])], - bias_state=bias_state, + bias_track=Track([bias_state]), ) for time, detections in feeder: for det in detections: diff --git a/stonesoup/updater/bias.py b/stonesoup/updater/bias.py index 3c522db7d..5e4d8d89d 100644 --- a/stonesoup/updater/bias.py +++ b/stonesoup/updater/bias.py @@ -11,6 +11,7 @@ from ..types.detection import Detection from ..types.hypothesis import SingleHypothesis from ..types.state import GaussianState +from ..types.track import Track from ..types.update import Update from ..updater import Updater from ..updater.kalman import UnscentedKalmanUpdater @@ -28,7 +29,7 @@ class GaussianBiasUpdater(Updater): bias i.e. all measurements from the same sensor. """ measurement_model = None - bias_state: GaussianState = Property(doc="Prior bias") + bias_track: Track[GaussianState] = Property(doc="Prior bias") bias_predictor: KalmanPredictor = Property(doc="Predictor for bias") bias_model_wrapper: BiasModelWrapper = Property() updater: Updater = Property( @@ -44,15 +45,15 @@ def __init__(self, *args, **kwargs): def predict_measurement( self, predicted_state, measurement_model=None, measurement_noise=True, **kwargs): - ndim_bias = self.bias_state.ndim - + bias_state = self.bias_track.state + ndim_bias = bias_state.ndim # Predict bias - if self.bias_state.timestamp is None: - pred_bias_state = copy.copy(self.bias_state) + if bias_state.timestamp is None: + pred_bias_state = copy.copy(bias_state) pred_bias_state.timestamp = predicted_state.timestamp else: pred_bias_state = self.bias_predictor.predict( - self.bias_state, timestamp=predicted_state.timestamp) + bias_state, timestamp=predicted_state.timestamp) applied_bias = getattr(measurement_model, 'applied_bias', np.zeros((ndim_bias, 1))) delta_bias = pred_bias_state.state_vector - applied_bias @@ -75,18 +76,15 @@ def predict_measurement( def update(self, hypotheses, **kwargs): if any(not hyp for hyp in hypotheses): raise ValueError("Must provide only non-missed detection hypotheses") - - ndim_bias = self.bias_state.ndim + bias_state = self.bias_track.state + ndim_bias = bias_state.ndim # Predict bias pred_time = max(hypothesis.prediction.timestamp for hypothesis in hypotheses) - if self.bias_state.timestamp is None: - self.bias_state.timestamp = pred_time + if bias_state.timestamp is None: + bias_state.timestamp = pred_time else: - new_bias_state = self.bias_predictor.predict(self.bias_state, timestamp=pred_time) - self.bias_state.state_vector = new_bias_state.state_vector - self.bias_state.covar = new_bias_state.covar - self.bias_state.timestamp = new_bias_state.timestamp + bias_state = self.bias_predictor.predict(bias_state, timestamp=pred_time) # Create joint state states = [hypothesis.prediction for hypothesis in hypotheses] @@ -95,8 +93,8 @@ def update(self, hypotheses, **kwargs): for h in hypotheses if hasattr(h.measurement.measurement_model, 'applied_bias')), np.zeros((ndim_bias, 1))) - delta_bias = self.bias_state.state_vector - applied_bias - states.append(GaussianState(delta_bias, self.bias_state.covar)) + delta_bias = bias_state.state_vector - applied_bias + states.append(GaussianState(delta_bias, bias_state.covar)) combined_pred = GaussianState( np.vstack([state.state_vector for state in states]).view(StateVector), block_diag(*[state.covar for state in states]).view(CovarianceMatrix), @@ -123,12 +121,13 @@ def update(self, hypotheses, **kwargs): # Update bias update = self.updater.update(SingleHypothesis(combined_pred, combined_meas), **kwargs) rel_delta_bias = update.state_vector[-ndim_bias:, :] - delta_bias - self.bias_state.state_vector = self.bias_state.state_vector + rel_delta_bias + bias_state.state_vector = bias_state.state_vector + rel_delta_bias if self.max_bias is not None: - self.bias_state.state_vector = \ - np.min([abs(self.bias_state.state_vector), self.max_bias], axis=0) \ - * np.sign(self.bias_state.state_vector) - self.bias_state.covar = update.covar[-ndim_bias:, -ndim_bias:] + bias_state.state_vector = \ + np.min([abs(bias_state.state_vector), self.max_bias], axis=0) \ + * np.sign(bias_state.state_vector) + bias_state.covar = update.covar[-ndim_bias:, -ndim_bias:] + self.bias_track.append(bias_state) # Create update states offset = 0 diff --git a/stonesoup/updater/tests/test_bias.py b/stonesoup/updater/tests/test_bias.py index 535680819..5a0acbaab 100644 --- a/stonesoup/updater/tests/test_bias.py +++ b/stonesoup/updater/tests/test_bias.py @@ -15,6 +15,7 @@ from ...types.detection import Detection from ...types.hypothesis import SingleHypothesis from ...types.state import GaussianState, StateVector +from ...types.track import Track def make_dummy_measurement_model(): @@ -58,7 +59,7 @@ def test_translation_gaussian_bias_feeder_update_bias(bias_timestamp): bias_predictor = KalmanPredictor( CombinedLinearGaussianTransitionModel([RandomWalk(1)] * 3)) updater = GaussianBiasUpdater( - bias_state=copy.copy(bias_prior), + bias_track=Track([copy.copy(bias_prior)]), bias_predictor=bias_predictor, bias_model_wrapper=TranslationBiasModelWrapper, ) @@ -85,9 +86,9 @@ def test_translation_gaussian_bias_feeder_update_bias(bias_timestamp): # Call update_bias updates = updater.update([hyp]) # The bias_state should be updated (should not be the same as initial)... - assert not np.allclose(updater.bias_state.state_vector, bias_prior.state_vector) + assert not np.allclose(updater.bias_track.state_vector, bias_prior.state_vector) # and should be around 0.8 ± 0.1 - assert np.allclose(updater.bias_state.state_vector, [[0.8], [0.8], [0.8]], atol=0.1) + assert np.allclose(updater.bias_track.state_vector, [[0.8], [0.8], [0.8]], atol=0.1) # The returned updates should match the updated state shape assert updates[0].state_vector.shape == pred.state_vector.shape @@ -98,7 +99,7 @@ def test_orientation_gaussian_bias_feeder_update_bias(bias_timestamp): bias_predictor = KalmanPredictor( CombinedLinearGaussianTransitionModel([RandomWalk(1e-6)] * 3)) updater = GaussianBiasUpdater( - bias_state=copy.copy(bias_prior), + bias_track=Track([copy.copy(bias_prior)]), bias_predictor=bias_predictor, bias_model_wrapper=OrientationBiasModelWrapper, max_bias=StateVector([[0.], [0.], [np.pi]]) @@ -123,8 +124,8 @@ def test_orientation_gaussian_bias_feeder_update_bias(bias_timestamp): hyp = SingleHypothesis(pred, meas) updates = updater.update([hyp]) - assert not np.allclose(updater.bias_state.state_vector, bias_prior.state_vector) - assert np.allclose(updater.bias_state.state_vector, [[0.], [0.], [np.pi/16]], atol=0.05) + assert not np.allclose(updater.bias_track.state_vector, bias_prior.state_vector) + assert np.allclose(updater.bias_track.state_vector, [[0.], [0.], [np.pi/16]], atol=0.05) assert updates[0].state_vector.shape == pred.state_vector.shape @@ -136,7 +137,7 @@ def test_orientation_translation_gaussian_bias_feeder_update_bias(bias_timestamp bias_predictor = KalmanPredictor( CombinedLinearGaussianTransitionModel([RandomWalk(1e-6)]*3 + [RandomWalk(1)]*3)) updater = GaussianBiasUpdater( - bias_state=copy.copy(bias_prior), + bias_track=Track([copy.copy(bias_prior)]), bias_predictor=bias_predictor, bias_model_wrapper=OrientationTranslationBiasModelWrapper ) @@ -158,9 +159,9 @@ def test_orientation_translation_gaussian_bias_feeder_update_bias(bias_timestamp hyp = SingleHypothesis(pred, meas) hyps.append(hyp) updates = updater.update(hyps) - assert not np.allclose(updater.bias_state.state_vector, bias_prior.state_vector) + assert not np.allclose(updater.bias_track.state_vector, bias_prior.state_vector) assert np.allclose( - updater.bias_state.state_vector, + updater.bias_track.state_vector, [[0.], [0.], [np.pi/16], [1.], [1.], [1.]], atol=0.1) assert updates[0].state_vector.shape == pred.state_vector.shape @@ -171,7 +172,7 @@ def test_time_gaussian_bias_feeder_update_bias(bias_timestamp): bias_predictor = KalmanPredictor( CombinedLinearGaussianTransitionModel([RandomWalk(1e-3)])) updater = GaussianBiasUpdater( - bias_state=copy.copy(bias_prior), + bias_track=Track([copy.copy(bias_prior)]), bias_predictor=bias_predictor, bias_model_wrapper=partial( TimeBiasModelWrapper, transition_model=make_dummy_transition_model()) @@ -195,6 +196,6 @@ def test_time_gaussian_bias_feeder_update_bias(bias_timestamp): hyp = SingleHypothesis(pred, meas) updates = updater.update([hyp]) - assert not np.allclose(updater.bias_state.state_vector, bias_prior.state_vector) - assert np.allclose(updater.bias_state.state_vector, [[1.0]], atol=0.1) + assert not np.allclose(updater.bias_track.state_vector, bias_prior.state_vector) + assert np.allclose(updater.bias_track.state_vector, [[1.0]], atol=0.1) assert updates[0].state_vector.shape == pred.state_vector.shape From 9629d2d0a896c931283261df1e61719aeace1810 Mon Sep 17 00:00:00 2001 From: Steven Hiscocks Date: Tue, 4 Nov 2025 13:27:39 +0000 Subject: [PATCH 14/15] Fix docs on bias feeder and models; and example --- docs/examples/sensorfusion/senor_bias.py | 2 +- stonesoup/feeder/bias.py | 8 ++++---- stonesoup/models/measurement/bias.py | 10 +++++----- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/docs/examples/sensorfusion/senor_bias.py b/docs/examples/sensorfusion/senor_bias.py index 417129f32..ec364f747 100644 --- a/docs/examples/sensorfusion/senor_bias.py +++ b/docs/examples/sensorfusion/senor_bias.py @@ -8,7 +8,7 @@ # %% # This example demonstrates how to simulate and estimate a drifting bias in the position of a sensor platform. # Specifically, the platform at index 0 (and its sensor) will have a time-varying bias applied to its position. -# We use Stone-Soup's bias wrappers and feeders to estimate this changing bias from sensor measurements. +# We use Stone-Soup's bias wrappers, feeders and updater to estimate this changing bias from sensor measurements. # Some initial imports and set up import datetime diff --git a/stonesoup/feeder/bias.py b/stonesoup/feeder/bias.py index 85b52ebde..bac7129d4 100644 --- a/stonesoup/feeder/bias.py +++ b/stonesoup/feeder/bias.py @@ -24,7 +24,7 @@ def data_gen(self): class TimeBiasFeeder(_BiasFeeder): """Time Bias Feeder - Apply bias to detection timestamp and overall timestamp yielded. + Remove bias from detection timestamp and overall timestamp yielded. """ bias_track: Track[GaussianState] = Property( doc="Track of bias states with state vector shape (1, 1) in units of seconds") @@ -47,7 +47,7 @@ def data_gen(self): class OrientationBiasFeeder(_BiasFeeder): """Orientation Bias Feeder - Apply bias to detection measurement model rotation offset + Remove bias from detection measurement model rotation offset """ bias_track: Track[GaussianState] = Property( doc="Track of bias states with state vector shape (3, 1) is expected") @@ -68,7 +68,7 @@ def data_gen(self): class TranslationBiasFeeder(_BiasFeeder): """Translation Bias Feeder - Apply bias to detection measurement model translation offset + Remove bias from detection measurement model translation offset """ bias_track: Track[GaussianState] = Property( doc="Track of bias states with state vector shape (n, 1), " @@ -90,7 +90,7 @@ def data_gen(self): class OrientationTranslationBiasFeeder(_BiasFeeder): """Orientation Translation Bias Feeder - Apply bias to detection measurement model rotation and translation offset + Remove bias from detection measurement model rotation and translation offset """ bias_track: Track[GaussianState] = Property( doc="Track of bias states with state vector shape (3+n, 1), 3 for rotation and where n is " diff --git a/stonesoup/models/measurement/bias.py b/stonesoup/models/measurement/bias.py index 0e411f70d..454e64043 100644 --- a/stonesoup/models/measurement/bias.py +++ b/stonesoup/models/measurement/bias.py @@ -10,7 +10,7 @@ class BiasModelWrapper(MeasurementModel): - """Abstract wrapper that applies bias values to an existing MeasurementModel. + """Abstract wrapper that removes bias values from an existing MeasurementModel. """ measurement_model: MeasurementModel = Property( doc="Unbiased model being wrapped that bias will be applied to") @@ -44,7 +44,7 @@ def rvs(self, *args, **kwargs): class TimeBiasModelWrapper(BiasModelWrapper): - """Apply a time-offset bias from the state to the wrapped measurement model. + """Removes a time-offset bias from the state to the wrapped measurement model. The bias elements selected by `bias_mapping` are interpreted as a time offset in seconds. For each state vector the wrapper computes the state at the (biased) measurement time by @@ -75,7 +75,7 @@ def function(self, state, noise=False, **kwargs): class OrientationBiasModelWrapper(BiasModelWrapper): - """Apply an orientation (rotation) bias from the state to the wrapped measurement model. + """Removes an orientation (rotation) bias from the state to the wrapped measurement model. The wrapper expects `bias_mapping` to select orientation elements (e.g. Euler angles or equivalent) stored in the state vector. For each input state the wrapper creates a copy @@ -103,7 +103,7 @@ def function(self, state, noise=False, **kwargs): class TranslationBiasModelWrapper(BiasModelWrapper): - """Apply a translation (position) bias from the state to the wrapped measurement model. + """Removes a translation (position) bias from the state to the wrapped measurement model. The wrapper expects `bias_mapping` to select translation elements stored in the state vector. For each input state the wrapper creates a copy of the wrapped measurement model, @@ -128,7 +128,7 @@ def function(self, state, noise=False, **kwargs): class OrientationTranslationBiasModelWrapper(BiasModelWrapper): - """Apply combined orientation and translation biases from the state to the wrapped model. + """Removes combined orientation and translation biases from the state to the wrapped model. `bias_mapping` is expected to contain orientation indices first (3 elements) followed by translation indices (2 or 3 elements). For each input state the wrapper copies the From 271f03d092ea3d6ed674d1e9828f4fe754ff990c Mon Sep 17 00:00:00 2001 From: Steven Hiscocks Date: Thu, 6 Nov 2025 11:17:43 +0000 Subject: [PATCH 15/15] Fixes for bias tests and variable names --- stonesoup/feeder/tests/test_bias.py | 4 ++-- stonesoup/models/measurement/bias.py | 20 ++++++++++---------- stonesoup/updater/bias.py | 4 ++-- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/stonesoup/feeder/tests/test_bias.py b/stonesoup/feeder/tests/test_bias.py index 666223cc8..3ab87194d 100644 --- a/stonesoup/feeder/tests/test_bias.py +++ b/stonesoup/feeder/tests/test_bias.py @@ -106,8 +106,8 @@ def test_orientation_translation_gaussian_bias_feeder_iter(): ) rotated = build_rotation_matrix(det.measurement_model.rotation_offset) \ @ (det.state_vector - det.measurement_model.translation_offset) - expected_rotated = build_rotation_matrix(-feeder.bias) \ - @ (det.state_vector - det.measurement_model.translation_offset) + expected_rotated = build_rotation_matrix(-feeder.bias[:3]) \ + @ (det.state_vector + feeder.bias[3:]) assert np.allclose(rotated, expected_rotated) diff --git a/stonesoup/models/measurement/bias.py b/stonesoup/models/measurement/bias.py index 454e64043..06c9975be 100644 --- a/stonesoup/models/measurement/bias.py +++ b/stonesoup/models/measurement/bias.py @@ -65,10 +65,10 @@ def __init__(self, *args, **kwargs): def function(self, state, noise=False, **kwargs): predicted_state_vectors = [] for state_vector in state.state_vector.view(StateVectors): - delta_t = state_vector[self.bias_mapping[0], 0] + delta_time = state_vector[self.bias_mapping[0], 0] predicted_state_vectors.append(self.transition_model.function( State(state_vector[self.state_mapping, :]), - time_interval=datetime.timedelta(seconds=-delta_t), + time_interval=datetime.timedelta(seconds=-delta_time), **kwargs)) return self.measurement_model.function( State(StateVectors(predicted_state_vectors)), noise=noise, **kwargs) @@ -91,9 +91,9 @@ class OrientationBiasModelWrapper(BiasModelWrapper): def function(self, state, noise=False, **kwargs): state_vectors = [] for state_vector in state.state_vector.view(StateVectors): - delta_orient = state_vector[self.bias_mapping, :] + delta_orientation = state_vector[self.bias_mapping, :] bias_model = copy.copy(self.measurement_model) - bias_model.rotation_offset = bias_model.rotation_offset - delta_orient + bias_model.rotation_offset = bias_model.rotation_offset - delta_orientation state_vectors.append(bias_model.function( State(state_vector[self.state_mapping, :]), noise=noise, **kwargs)) if len(state_vectors) == 1: @@ -116,9 +116,9 @@ class TranslationBiasModelWrapper(BiasModelWrapper): def function(self, state, noise=False, **kwargs): state_vectors = [] for state_vector in state.state_vector.view(StateVectors): - delta_trans = state_vector[self.bias_mapping, :] + delta_translation = state_vector[self.bias_mapping, :] bias_model = copy.copy(self.measurement_model) - bias_model.translation_offset = bias_model.translation_offset - delta_trans + bias_model.translation_offset = bias_model.translation_offset - delta_translation state_vectors.append(bias_model.function( State(state_vector[self.state_mapping, :]), noise=noise, **kwargs)) if len(state_vectors) == 1: @@ -142,11 +142,11 @@ class OrientationTranslationBiasModelWrapper(BiasModelWrapper): def function(self, state, noise=False, **kwargs): state_vectors = [] for state_vector in state.state_vector.view(StateVectors): - delta_orient = state_vector[self.bias_mapping[:3], :] - delta_trans = state_vector[self.bias_mapping[3:], :] + delta_orientation = state_vector[self.bias_mapping[:3], :] + delta_translation = state_vector[self.bias_mapping[3:], :] bias_model = copy.copy(self.measurement_model) - bias_model.rotation_offset = bias_model.rotation_offset - delta_orient - bias_model.translation_offset = bias_model.translation_offset - delta_trans + bias_model.rotation_offset = bias_model.rotation_offset - delta_orientation + bias_model.translation_offset = bias_model.translation_offset - delta_translation state_vectors.append(bias_model.function( State(state_vector[self.state_mapping, :]), noise=noise, **kwargs)) if len(state_vectors) == 1: diff --git a/stonesoup/updater/bias.py b/stonesoup/updater/bias.py index 5e4d8d89d..6d4b452ae 100644 --- a/stonesoup/updater/bias.py +++ b/stonesoup/updater/bias.py @@ -120,8 +120,8 @@ def update(self, hypotheses, **kwargs): # Update bias update = self.updater.update(SingleHypothesis(combined_pred, combined_meas), **kwargs) - rel_delta_bias = update.state_vector[-ndim_bias:, :] - delta_bias - bias_state.state_vector = bias_state.state_vector + rel_delta_bias + relative_delta_bias = update.state_vector[-ndim_bias:, :] - delta_bias + bias_state.state_vector = bias_state.state_vector + relative_delta_bias if self.max_bias is not None: bias_state.state_vector = \ np.min([abs(bias_state.state_vector), self.max_bias], axis=0) \