diff --git a/navlie/batch/__init__.py b/navlie/batch/__init__.py index 319da26..3817e69 100644 --- a/navlie/batch/__init__.py +++ b/navlie/batch/__init__.py @@ -3,3 +3,4 @@ """ from .estimator import BatchEstimator +from .problem import Problem diff --git a/navlie/batch/gaussian_mixtures.py b/navlie/batch/gaussian_mixtures.py index e56a390..b1358e8 100644 --- a/navlie/batch/gaussian_mixtures.py +++ b/navlie/batch/gaussian_mixtures.py @@ -217,7 +217,7 @@ def mix_errors( nonlinear_part = np.array(np.log(alpha_max / alpha_k)).reshape(-1) nonlinear_part = np.sqrt(2) * np.sqrt(nonlinear_part) - e_mix = np.concatenate([linear_part, nonlinear_part]) + e_mix = np.concatenate([linear_part.reshape(-1), nonlinear_part]) reused_values = {"alphas": alphas, "res_values": res_values} diff --git a/navlie/batch/residuals.py b/navlie/batch/residuals.py index 1759442..585a305 100644 --- a/navlie/batch/residuals.py +++ b/navlie/batch/residuals.py @@ -67,9 +67,9 @@ def jacobian_fd(self, states: List[State], step_size=1e-6) -> List[np.ndarray]: Parameters ---------- states : List[State] - Evaluation point of Jacobians, a list of states that + Evaluation point of Jacobians, a list of states that the residual is a function of. - + Returns ------- List[np.ndarray] @@ -78,7 +78,7 @@ def jacobian_fd(self, states: List[State], step_size=1e-6) -> List[np.ndarray]: w.r.t states[0], the second element is the Jacobian of the residual w.r.t states[1], etc. """ jac_list: List[np.ndarray] = [None] * len(states) - + # Compute the Jacobian for each state via finite difference for state_num, X_bar in enumerate(states): e_bar = self.evaluate(states) @@ -100,13 +100,14 @@ def jacobian_fd(self, states: List[State], step_size=1e-6) -> List[np.ndarray]: jac_list[state_num] = jac_fd return jac_list - + def sqrt_info_matrix(self, states: List[State]): """ Returns the information matrix """ pass + class PriorResidual(Residual): """ A generic prior error. @@ -157,6 +158,7 @@ def sqrt_info_matrix(self, states: List[State]): """ return self._L + class ProcessResidual(Residual): """ A generic process residual. @@ -211,9 +213,7 @@ def evaluate( if compute_jacobians: jac_list = [None] * len(states) if compute_jacobians[0]: - jac_list[0] = -L.T @ self._process_model.jacobian( - x_km1, self._u, dt - ) + jac_list[0] = -L.T @ self._process_model.jacobian(x_km1, self._u, dt) if compute_jacobians[1]: jac_list[1] = L.T @ x_k.minus_jacobian(x_k_hat) @@ -221,6 +221,16 @@ def evaluate( return e + def sqrt_info_matrix(self, states: List[State]): + """ + Returns the square root of the information matrix + """ + x_km1 = states[0] + x_k = states[1] + dt = x_k.stamp - x_km1.stamp + L = self._process_model.sqrt_information(x_km1, self._u, dt) + return L + class MeasurementResidual(Residual): """ @@ -268,3 +278,11 @@ def evaluate( return e, jacobians return e + + def sqrt_info_matrix(self, states: List[State]): + """ + Returns the square root of the information matrix + """ + x = states[0] + L = self._y.model.sqrt_information(x) + return L diff --git a/navlie/lib/models.py b/navlie/lib/models.py index c624c96..dc35a59 100644 --- a/navlie/lib/models.py +++ b/navlie/lib/models.py @@ -1,3 +1,4 @@ +from navlie.lib.states import SE2State, SE3State, SE23State from navlie.types import ( Measurement, ProcessModel, @@ -154,9 +155,7 @@ def __init__(self, Q: np.ndarray): self._Q = Q self.dim = int(Q.shape[0] / 2) - def evaluate( - self, x: VectorState, u: VectorInput, dt: float - ) -> VectorState: + def evaluate(self, x: VectorState, u: VectorInput, dt: float) -> VectorState: x = x.copy() u = u.copy() Ad = super().jacobian(x, u, dt) @@ -244,9 +243,7 @@ def evaluate( x.value = x.value @ x.group.Exp(u.value * dt) return x - def jacobian( - self, x: MatrixLieGroupState, u: VectorInput, dt: float - ) -> np.ndarray: + def jacobian(self, x: MatrixLieGroupState, u: VectorInput, dt: float) -> np.ndarray: if x.direction == "right": return x.group.adjoint(x.group.Exp(-u.value * dt)) elif x.direction == "left": @@ -321,16 +318,12 @@ def evaluate( x.value = x.group.Exp(-u[0] * dt) @ x.value @ x.group.Exp(u[1] * dt) return x - def jacobian( - self, x: MatrixLieGroupState, u: VectorInput, dt: float - ) -> np.ndarray: + def jacobian(self, x: MatrixLieGroupState, u: VectorInput, dt: float) -> np.ndarray: u = u.value.reshape((2, round(u.value.size / 2))) if x.direction == "right": return x.group.adjoint(x.group.Exp(-u[1] * dt)) else: - raise NotImplementedError( - "TODO: left jacobian not yet implemented." - ) + raise NotImplementedError("TODO: left jacobian not yet implemented.") def covariance( self, x: MatrixLieGroupState, u: VectorInput, dt: float @@ -347,9 +340,7 @@ def covariance( L2 = dt * x.group.left_jacobian(-dt * u2) return L1 @ self._Q1 @ L1.T + L2 @ self._Q2 @ L2.T else: - raise NotImplementedError( - "TODO: left covariance not yet implemented." - ) + raise NotImplementedError("TODO: left covariance not yet implemented.") class LinearMeasurement(MeasurementModel): @@ -479,16 +470,21 @@ def jacobian(self, x: MatrixLieGroupState) -> np.ndarray: r_zw_a = x.position.reshape((-1, 1)) C_ab = x.attitude r_pw_a = self._landmark_position.reshape((-1, 1)) - y = C_ab.T @ (r_pw_a - r_zw_a) + y = C_ab.T @ (r_pw_a - r_zw_a).reshape(-1, 1) + + if C_ab.shape[0] == 2: + group = SO2 + if C_ab.shape[0] == 3: + group = SO3 if x.direction == "right": return x.jacobian_from_blocks( - attitude=-SO3.odot(y), position=-np.identity(r_zw_a.shape[0]) + attitude=-group.odot(y), position=-np.identity(r_zw_a.shape[0]) ) elif x.direction == "left": return x.jacobian_from_blocks( - attitude=-C_ab.T @ SO3.odot(r_pw_a), position=-C_ab.T + attitude=-C_ab.T @ group.odot(r_pw_a), position=-C_ab.T ) def covariance(self, x: MatrixLieGroupState) -> np.ndarray: @@ -541,7 +537,9 @@ def jacobian(self, x: CompositeState) -> np.ndarray: r_pw_a = landmark.value.reshape((-1, 1)) y = C_ab.T @ (r_pw_a - r_zw_a) - # Compute Jacobian of measurement model with respect to the state + # Compute Jacobian of measurement model with respect to the stated + # TODO: Implement SE2 version. + if pose.direction == "right": pose_jacobian = pose.jacobian_from_blocks( attitude=-SO3.odot(y), position=-np.identity(r_zw_a.shape[0]) @@ -624,9 +622,7 @@ def jacobian(self, x: CompositeState) -> np.ndarray: # Compute the Jacobian of the measurement model if pose.direction == "right": - r_pc_c = self._camera.resolve_landmark_in_cam_frame( - pose, landmark.value - ) + r_pc_c = self._camera.resolve_landmark_in_cam_frame(pose, landmark.value) dg_dr = self._compute_jac_of_projection_model(r_pc_c) # Extract relevant states C_bc = self._camera.T_bc.attitude @@ -660,15 +656,11 @@ def jacobian(self, x: CompositeState) -> np.ndarray: def covariance(self, x: CompositeState) -> np.ndarray: return self._R - def _compute_jac_of_projection_model( - self, r_pc_c: np.ndarray - ) -> np.ndarray: + def _compute_jac_of_projection_model(self, r_pc_c: np.ndarray) -> np.ndarray: x, y, z = r_pc_c.ravel() fu = self._camera.fu fv = self._camera.fv - dg_dr = (1.0 / z) * np.array( - [[fu, 0, -fu * x / z], [0, fv, -fv * y / z]] - ) + dg_dr = (1.0 / z) * np.array([[fu, 0, -fu * x / z], [0, fv, -fv * y / z]]) return dg_dr @@ -805,9 +797,7 @@ class RangePoseToPose(MeasurementModel): Compatible with ``SE2State, SE3State, SE23State, IMUState``. """ - def __init__( - self, tag_body_position1, tag_body_position2, state_id1, state_id2, R - ): + def __init__(self, tag_body_position1, tag_body_position2, state_id1, state_id2, R): """ Parameters ---------- @@ -842,9 +832,7 @@ def evaluate(self, x: CompositeState) -> np.ndarray: C_a2 = x2.attitude r_t1_1 = self.tag_body_position1.reshape((-1, 1)) r_t2_2 = self.tag_body_position2.reshape((-1, 1)) - r_t1t2_a: np.ndarray = (C_a1 @ r_t1_1 + r_1w_a) - ( - C_a2 @ r_t2_2 + r_2w_a - ) + r_t1t2_a: np.ndarray = (C_a1 @ r_t1_1 + r_1w_a) - (C_a2 @ r_t2_2 + r_2w_a) return np.array(np.linalg.norm(r_t1t2_a.flatten())) def jacobian(self, x: CompositeState) -> np.ndarray: @@ -856,18 +844,16 @@ def jacobian(self, x: CompositeState) -> np.ndarray: C_a2 = x2.attitude r_t1_1 = self.tag_body_position1.reshape((-1, 1)) r_t2_2 = self.tag_body_position2.reshape((-1, 1)) - r_t1t2_a: np.ndarray = (C_a1 @ r_t1_1 + r_1w_a) - ( - C_a2 @ r_t2_2 + r_2w_a - ) + r_t1t2_a: np.ndarray = (C_a1 @ r_t1_1 + r_1w_a) - (C_a2 @ r_t2_2 + r_2w_a) if C_a1.shape == (2, 2): att_group = SO2 elif C_a1.shape == (3, 3): att_group = SO3 - rho: np.ndarray = ( - r_t1t2_a / np.linalg.norm(r_t1t2_a.flatten()) - ).reshape((-1, 1)) + rho: np.ndarray = (r_t1t2_a / np.linalg.norm(r_t1t2_a.flatten())).reshape( + (-1, 1) + ) if x1.direction == "right": jac1 = x1.jacobian_from_blocks( @@ -891,9 +877,7 @@ def jacobian(self, x: CompositeState) -> np.ndarray: position=-rho.T @ np.identity(r_t2_2.size), ) - return x.jacobian_from_blocks( - {self.state_id1: jac1, self.state_id2: jac2} - ) + return x.jacobian_from_blocks({self.state_id1: jac1, self.state_id2: jac2}) def covariance(self, x: CompositeState) -> np.ndarray: return self._R @@ -1061,9 +1045,7 @@ def evaluate(self, x: MatrixLieGroupState): def jacobian(self, x: MatrixLieGroupState): if x.direction == "right": - return x.jacobian_from_blocks( - position=x.attitude[2, :].reshape((1, -1)) - ) + return x.jacobian_from_blocks(position=x.attitude[2, :].reshape((1, -1))) elif x.direction == "left": return x.jacobian_from_blocks( attitude=SO3.odot(x.position)[2, :].reshape((1, -1)), @@ -1107,13 +1089,9 @@ def evaluate(self, x: MatrixLieGroupState): def jacobian(self, x: MatrixLieGroupState): if x.direction == "right": - return x.jacobian_from_blocks( - attitude=-SO3.odot(x.attitude.T @ self._g_a) - ) + return x.jacobian_from_blocks(attitude=-SO3.odot(x.attitude.T @ self._g_a)) elif x.direction == "left": - return x.jacobian_from_blocks( - attitude=x.attitude.T @ -SO3.odot(self._g_a) - ) + return x.jacobian_from_blocks(attitude=x.attitude.T @ -SO3.odot(self._g_a)) def covariance(self, x: MatrixLieGroupState) -> np.ndarray: if np.isscalar(self.R): @@ -1156,13 +1134,9 @@ def evaluate(self, x: MatrixLieGroupState): def jacobian(self, x: MatrixLieGroupState): if x.direction == "right": - return x.jacobian_from_blocks( - attitude=-SO3.odot(x.attitude.T @ self._m_a) - ) + return x.jacobian_from_blocks(attitude=-SO3.odot(x.attitude.T @ self._m_a)) elif x.direction == "left": - return x.jacobian_from_blocks( - attitude=-x.attitude.T @ SO3.odot(self._m_a) - ) + return x.jacobian_from_blocks(attitude=-x.attitude.T @ SO3.odot(self._m_a)) def covariance(self, x: MatrixLieGroupState) -> np.ndarray: if np.isscalar(self.R): @@ -1172,9 +1146,7 @@ def covariance(self, x: MatrixLieGroupState) -> np.ndarray: class _InvariantInnovation(MeasurementModel): - def __init__( - self, y: np.ndarray, model: MeasurementModel, direction="right" - ): + def __init__(self, y: np.ndarray, model: MeasurementModel, direction="right"): self.measurement_model = model self.y = y.ravel() self.direction = direction @@ -1225,9 +1197,7 @@ def _compute_direction(self, x: MatrixLieGroupState) -> str: elif x.direction == "right": direction = "left" else: - raise ValueError( - "Invalid direction. Must be 'left', 'right' or 'auto'" - ) + raise ValueError("Invalid direction. Must be 'left', 'right' or 'auto'") return direction diff --git a/tests/unit/test_meas_models.py b/tests/unit/test_meas_models.py index f03e3d9..321e7e8 100644 --- a/tests/unit/test_meas_models.py +++ b/tests/unit/test_meas_models.py @@ -112,9 +112,7 @@ def test_range_pose_to_pose_se23(direction): @pytest.mark.parametrize("direction", ["right", "left"]) def test_global_position_se2(direction): - x = SE2State( - SE2.Exp([0.5, 1, 2]), stamp=0.0, state_id=2, direction=direction - ) + x = SE2State(SE2.Exp([0.5, 1, 2]), stamp=0.0, state_id=2, direction=direction) model = GlobalPosition(np.identity(3)) _jacobian_test(x, model, atol=1e-5) @@ -189,9 +187,7 @@ def test_altitude_se23(direction): @pytest.mark.parametrize("direction", ["right", "left"]) def test_gravity_so3(direction): - x = SO3State( - SO3.Exp([0.3, 0.1, 0.2]), stamp=0.0, state_id=2, direction=direction - ) + x = SO3State(SO3.Exp([0.3, 0.1, 0.2]), stamp=0.0, state_id=2, direction=direction) model = Gravitometer(np.identity(3)) _jacobian_test(x, model) @@ -219,9 +215,7 @@ def test_gravity_se23(direction): @pytest.mark.parametrize("direction", ["right", "left"]) def test_magnetometer_so3(direction): - x = SO3State( - SO3.Exp([0.3, 0.1, 0.2]), stamp=0.0, state_id=2, direction=direction - ) + x = SO3State(SO3.Exp([0.3, 0.1, 0.2]), stamp=0.0, state_id=2, direction=direction) model = Magnetometer(np.identity(3)) _jacobian_test(x, model) @@ -270,9 +264,7 @@ def test_camera_projection(direction: str = "right"): def test_invariant_magnetometer_so3(): - x = SO3State( - SO3.Exp([0.3, 0.1, 0.2]), stamp=0.0, state_id=2, direction="left" - ) + x = SO3State(SO3.Exp([0.3, 0.1, 0.2]), stamp=0.0, state_id=2, direction="left") b = [1, 0, 0] y = np.array(b) @@ -284,9 +276,7 @@ def test_invariant_magnetometer_so3(): def test_invariant_magnetometer_se3(): - x = SE3State( - SE3.Exp([0, 1, 2, 4, 5, 6]), stamp=0.0, state_id=2, direction="left" - ) + x = SE3State(SE3.Exp([0, 1, 2, 4, 5, 6]), stamp=0.0, state_id=2, direction="left") b = [1, 0, 0] y = np.array(b) @@ -315,6 +305,13 @@ def test_invariant_magnetometer_se23(): assert np.allclose(jac, jac_test, atol=1e-6) +@pytest.mark.parametrize("direction", ["right"]) +def test_landmark_relative_position_se2(direction): + x = SE2State(SE2.Exp([0, 1, 2]), stamp=0.0, state_id=2, direction=direction) + model = PointRelativePosition([1, 2], np.identity(2)) + _jacobian_test(x, model) + + @pytest.mark.parametrize("direction", ["right", "left"]) def test_landmark_relative_position_se3(direction): x = SE3State(