Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slight batch modifications + Make PointRelativeMeasurement work with SE2. #129

Merged
merged 3 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions navlie/batch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
"""

from .estimator import BatchEstimator
from .problem import Problem
2 changes: 1 addition & 1 deletion navlie/batch/gaussian_mixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ def mix_errors(

nonlinear_part = np.array(np.log(alpha_max / alpha_k)).reshape(-1)
nonlinear_part = np.sqrt(2) * np.sqrt(nonlinear_part)
e_mix = np.concatenate([linear_part, nonlinear_part])
e_mix = np.concatenate([linear_part.reshape(-1), nonlinear_part])

reused_values = {"alphas": alphas, "res_values": res_values}

Expand Down
32 changes: 25 additions & 7 deletions navlie/batch/residuals.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ def jacobian_fd(self, states: List[State], step_size=1e-6) -> List[np.ndarray]:
Parameters
----------
states : List[State]
Evaluation point of Jacobians, a list of states that
Evaluation point of Jacobians, a list of states that
the residual is a function of.

Returns
-------
List[np.ndarray]
Expand All @@ -78,7 +78,7 @@ def jacobian_fd(self, states: List[State], step_size=1e-6) -> List[np.ndarray]:
w.r.t states[0], the second element is the Jacobian of the residual w.r.t states[1], etc.
"""
jac_list: List[np.ndarray] = [None] * len(states)

# Compute the Jacobian for each state via finite difference
for state_num, X_bar in enumerate(states):
e_bar = self.evaluate(states)
Expand All @@ -100,13 +100,14 @@ def jacobian_fd(self, states: List[State], step_size=1e-6) -> List[np.ndarray]:
jac_list[state_num] = jac_fd

return jac_list

def sqrt_info_matrix(self, states: List[State]):
"""
Returns the information matrix
"""
pass


class PriorResidual(Residual):
"""
A generic prior error.
Expand Down Expand Up @@ -157,6 +158,7 @@ def sqrt_info_matrix(self, states: List[State]):
"""
return self._L


class ProcessResidual(Residual):
"""
A generic process residual.
Expand Down Expand Up @@ -211,16 +213,24 @@ def evaluate(
if compute_jacobians:
jac_list = [None] * len(states)
if compute_jacobians[0]:
jac_list[0] = -L.T @ self._process_model.jacobian(
x_km1, self._u, dt
)
jac_list[0] = -L.T @ self._process_model.jacobian(x_km1, self._u, dt)
if compute_jacobians[1]:
jac_list[1] = L.T @ x_k.minus_jacobian(x_k_hat)

return e, jac_list

return e

def sqrt_info_matrix(self, states: List[State]):
"""
Returns the square root of the information matrix
"""
x_km1 = states[0]
x_k = states[1]
dt = x_k.stamp - x_km1.stamp
L = self._process_model.sqrt_information(x_km1, self._u, dt)
return L


class MeasurementResidual(Residual):
"""
Expand Down Expand Up @@ -268,3 +278,11 @@ def evaluate(
return e, jacobians

return e

def sqrt_info_matrix(self, states: List[State]):
"""
Returns the square root of the information matrix
"""
x = states[0]
L = self._y.model.sqrt_information(x)
return L
98 changes: 34 additions & 64 deletions navlie/lib/models.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from navlie.lib.states import SE2State, SE3State, SE23State
from navlie.types import (
Measurement,
ProcessModel,
Expand Down Expand Up @@ -154,9 +155,7 @@ def __init__(self, Q: np.ndarray):
self._Q = Q
self.dim = int(Q.shape[0] / 2)

def evaluate(
self, x: VectorState, u: VectorInput, dt: float
) -> VectorState:
def evaluate(self, x: VectorState, u: VectorInput, dt: float) -> VectorState:
x = x.copy()
u = u.copy()
Ad = super().jacobian(x, u, dt)
Expand Down Expand Up @@ -244,9 +243,7 @@ def evaluate(
x.value = x.value @ x.group.Exp(u.value * dt)
return x

def jacobian(
self, x: MatrixLieGroupState, u: VectorInput, dt: float
) -> np.ndarray:
def jacobian(self, x: MatrixLieGroupState, u: VectorInput, dt: float) -> np.ndarray:
if x.direction == "right":
return x.group.adjoint(x.group.Exp(-u.value * dt))
elif x.direction == "left":
Expand Down Expand Up @@ -321,16 +318,12 @@ def evaluate(
x.value = x.group.Exp(-u[0] * dt) @ x.value @ x.group.Exp(u[1] * dt)
return x

def jacobian(
self, x: MatrixLieGroupState, u: VectorInput, dt: float
) -> np.ndarray:
def jacobian(self, x: MatrixLieGroupState, u: VectorInput, dt: float) -> np.ndarray:
u = u.value.reshape((2, round(u.value.size / 2)))
if x.direction == "right":
return x.group.adjoint(x.group.Exp(-u[1] * dt))
else:
raise NotImplementedError(
"TODO: left jacobian not yet implemented."
)
raise NotImplementedError("TODO: left jacobian not yet implemented.")

def covariance(
self, x: MatrixLieGroupState, u: VectorInput, dt: float
Expand All @@ -347,9 +340,7 @@ def covariance(
L2 = dt * x.group.left_jacobian(-dt * u2)
return L1 @ self._Q1 @ L1.T + L2 @ self._Q2 @ L2.T
else:
raise NotImplementedError(
"TODO: left covariance not yet implemented."
)
raise NotImplementedError("TODO: left covariance not yet implemented.")


class LinearMeasurement(MeasurementModel):
Expand Down Expand Up @@ -479,16 +470,21 @@ def jacobian(self, x: MatrixLieGroupState) -> np.ndarray:
r_zw_a = x.position.reshape((-1, 1))
C_ab = x.attitude
r_pw_a = self._landmark_position.reshape((-1, 1))
y = C_ab.T @ (r_pw_a - r_zw_a)
y = C_ab.T @ (r_pw_a - r_zw_a).reshape(-1, 1)

if C_ab.shape[0] == 2:
group = SO2
if C_ab.shape[0] == 3:
group = SO3

if x.direction == "right":
return x.jacobian_from_blocks(
attitude=-SO3.odot(y), position=-np.identity(r_zw_a.shape[0])
attitude=-group.odot(y), position=-np.identity(r_zw_a.shape[0])
)

elif x.direction == "left":
return x.jacobian_from_blocks(
attitude=-C_ab.T @ SO3.odot(r_pw_a), position=-C_ab.T
attitude=-C_ab.T @ group.odot(r_pw_a), position=-C_ab.T
)

def covariance(self, x: MatrixLieGroupState) -> np.ndarray:
Expand Down Expand Up @@ -541,7 +537,9 @@ def jacobian(self, x: CompositeState) -> np.ndarray:
r_pw_a = landmark.value.reshape((-1, 1))
y = C_ab.T @ (r_pw_a - r_zw_a)

# Compute Jacobian of measurement model with respect to the state
# Compute Jacobian of measurement model with respect to the stated
# TODO: Implement SE2 version.

if pose.direction == "right":
pose_jacobian = pose.jacobian_from_blocks(
attitude=-SO3.odot(y), position=-np.identity(r_zw_a.shape[0])
Expand Down Expand Up @@ -624,9 +622,7 @@ def jacobian(self, x: CompositeState) -> np.ndarray:

# Compute the Jacobian of the measurement model
if pose.direction == "right":
r_pc_c = self._camera.resolve_landmark_in_cam_frame(
pose, landmark.value
)
r_pc_c = self._camera.resolve_landmark_in_cam_frame(pose, landmark.value)
dg_dr = self._compute_jac_of_projection_model(r_pc_c)
# Extract relevant states
C_bc = self._camera.T_bc.attitude
Expand Down Expand Up @@ -660,15 +656,11 @@ def jacobian(self, x: CompositeState) -> np.ndarray:
def covariance(self, x: CompositeState) -> np.ndarray:
return self._R

def _compute_jac_of_projection_model(
self, r_pc_c: np.ndarray
) -> np.ndarray:
def _compute_jac_of_projection_model(self, r_pc_c: np.ndarray) -> np.ndarray:
x, y, z = r_pc_c.ravel()
fu = self._camera.fu
fv = self._camera.fv
dg_dr = (1.0 / z) * np.array(
[[fu, 0, -fu * x / z], [0, fv, -fv * y / z]]
)
dg_dr = (1.0 / z) * np.array([[fu, 0, -fu * x / z], [0, fv, -fv * y / z]])

return dg_dr

Expand Down Expand Up @@ -805,9 +797,7 @@ class RangePoseToPose(MeasurementModel):
Compatible with ``SE2State, SE3State, SE23State, IMUState``.
"""

def __init__(
self, tag_body_position1, tag_body_position2, state_id1, state_id2, R
):
def __init__(self, tag_body_position1, tag_body_position2, state_id1, state_id2, R):
"""
Parameters
----------
Expand Down Expand Up @@ -842,9 +832,7 @@ def evaluate(self, x: CompositeState) -> np.ndarray:
C_a2 = x2.attitude
r_t1_1 = self.tag_body_position1.reshape((-1, 1))
r_t2_2 = self.tag_body_position2.reshape((-1, 1))
r_t1t2_a: np.ndarray = (C_a1 @ r_t1_1 + r_1w_a) - (
C_a2 @ r_t2_2 + r_2w_a
)
r_t1t2_a: np.ndarray = (C_a1 @ r_t1_1 + r_1w_a) - (C_a2 @ r_t2_2 + r_2w_a)
return np.array(np.linalg.norm(r_t1t2_a.flatten()))

def jacobian(self, x: CompositeState) -> np.ndarray:
Expand All @@ -856,18 +844,16 @@ def jacobian(self, x: CompositeState) -> np.ndarray:
C_a2 = x2.attitude
r_t1_1 = self.tag_body_position1.reshape((-1, 1))
r_t2_2 = self.tag_body_position2.reshape((-1, 1))
r_t1t2_a: np.ndarray = (C_a1 @ r_t1_1 + r_1w_a) - (
C_a2 @ r_t2_2 + r_2w_a
)
r_t1t2_a: np.ndarray = (C_a1 @ r_t1_1 + r_1w_a) - (C_a2 @ r_t2_2 + r_2w_a)

if C_a1.shape == (2, 2):
att_group = SO2
elif C_a1.shape == (3, 3):
att_group = SO3

rho: np.ndarray = (
r_t1t2_a / np.linalg.norm(r_t1t2_a.flatten())
).reshape((-1, 1))
rho: np.ndarray = (r_t1t2_a / np.linalg.norm(r_t1t2_a.flatten())).reshape(
(-1, 1)
)

if x1.direction == "right":
jac1 = x1.jacobian_from_blocks(
Expand All @@ -891,9 +877,7 @@ def jacobian(self, x: CompositeState) -> np.ndarray:
position=-rho.T @ np.identity(r_t2_2.size),
)

return x.jacobian_from_blocks(
{self.state_id1: jac1, self.state_id2: jac2}
)
return x.jacobian_from_blocks({self.state_id1: jac1, self.state_id2: jac2})

def covariance(self, x: CompositeState) -> np.ndarray:
return self._R
Expand Down Expand Up @@ -1061,9 +1045,7 @@ def evaluate(self, x: MatrixLieGroupState):

def jacobian(self, x: MatrixLieGroupState):
if x.direction == "right":
return x.jacobian_from_blocks(
position=x.attitude[2, :].reshape((1, -1))
)
return x.jacobian_from_blocks(position=x.attitude[2, :].reshape((1, -1)))
elif x.direction == "left":
return x.jacobian_from_blocks(
attitude=SO3.odot(x.position)[2, :].reshape((1, -1)),
Expand Down Expand Up @@ -1107,13 +1089,9 @@ def evaluate(self, x: MatrixLieGroupState):

def jacobian(self, x: MatrixLieGroupState):
if x.direction == "right":
return x.jacobian_from_blocks(
attitude=-SO3.odot(x.attitude.T @ self._g_a)
)
return x.jacobian_from_blocks(attitude=-SO3.odot(x.attitude.T @ self._g_a))
elif x.direction == "left":
return x.jacobian_from_blocks(
attitude=x.attitude.T @ -SO3.odot(self._g_a)
)
return x.jacobian_from_blocks(attitude=x.attitude.T @ -SO3.odot(self._g_a))

def covariance(self, x: MatrixLieGroupState) -> np.ndarray:
if np.isscalar(self.R):
Expand Down Expand Up @@ -1156,13 +1134,9 @@ def evaluate(self, x: MatrixLieGroupState):

def jacobian(self, x: MatrixLieGroupState):
if x.direction == "right":
return x.jacobian_from_blocks(
attitude=-SO3.odot(x.attitude.T @ self._m_a)
)
return x.jacobian_from_blocks(attitude=-SO3.odot(x.attitude.T @ self._m_a))
elif x.direction == "left":
return x.jacobian_from_blocks(
attitude=-x.attitude.T @ SO3.odot(self._m_a)
)
return x.jacobian_from_blocks(attitude=-x.attitude.T @ SO3.odot(self._m_a))

def covariance(self, x: MatrixLieGroupState) -> np.ndarray:
if np.isscalar(self.R):
Expand All @@ -1172,9 +1146,7 @@ def covariance(self, x: MatrixLieGroupState) -> np.ndarray:


class _InvariantInnovation(MeasurementModel):
def __init__(
self, y: np.ndarray, model: MeasurementModel, direction="right"
):
def __init__(self, y: np.ndarray, model: MeasurementModel, direction="right"):
self.measurement_model = model
self.y = y.ravel()
self.direction = direction
Expand Down Expand Up @@ -1225,9 +1197,7 @@ def _compute_direction(self, x: MatrixLieGroupState) -> str:
elif x.direction == "right":
direction = "left"
else:
raise ValueError(
"Invalid direction. Must be 'left', 'right' or 'auto'"
)
raise ValueError("Invalid direction. Must be 'left', 'right' or 'auto'")
return direction


Expand Down
Loading
Loading