Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

during rolls sanity check #52

Merged
merged 24 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rb_ws/src/buggy/launch/sim_2d_2buggies.launch
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,6 @@
<node name="sc_auton_system" pkg="buggy" type="autonsystem.py" output="screen"
args="$(arg sc_autonsystem_args)"/>

<node name="sc_rolling_sanity_check" pkg="buggy" type="rolling_sanity_check.py" output="screen" args="SC"/>

</launch>
2 changes: 2 additions & 0 deletions rb_ws/src/buggy/launch/sim_2d_single.launch
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@
<node name="auton_system" pkg="buggy" type="autonsystem.py" output="screen"
args="$(arg autonsystem_args)"/>

<node name="rolling_sanity_check" pkg="buggy" type="rolling_sanity_check.py" output="screen" args="$(arg buggy_name)"/>

</launch>
6 changes: 0 additions & 6 deletions rb_ws/src/buggy/scripts/auton/autonsystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,6 @@ def update_other_steering_angle(self, msg):
with self.lock:
self.other_steering = msg.data

def update_rtk_status(self, msg):
with self.lock:
self.rtk_status = msg.data

def init_check(self):
# checks that messages are being receieved
# (from both buggies if relevant),
Expand Down Expand Up @@ -160,8 +156,6 @@ def tick_caller(self):
rospy.sleep(0.001)
print("done checking initialization status")
self.init_check_publisher.publish(True)


# initialize global trajectory index

with self.lock:
Expand Down
163 changes: 163 additions & 0 deletions rb_ws/src/buggy/scripts/auton/rolling_sanity_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
#! /usr/bin/env python3

import sys
import rospy

from nav_msgs.msg import Odometry
from std_msgs.msg import Bool, String, Int8MultiArray, Int8
from microstrain_inertial_msgs.msg import FilterStatus, ImuOverrangeStatus
from geometry_msgs.msg import PoseStamped

class SanityCheck:
def __init__(self,
self_name,
) -> None:

self.covariance = 0

self.filter_location = None
self.gps_location = None

self.imu_overrange_status = None

self.status_flag_val : int = 0
self.flags = []
# string list where indices match the meaning of relevant bits
self.error_messages : list = ["filter unstable", "filter converging", "roll/pitch warning", "heading warning", "position warning", "velocity warning", "IMU bias warning", "gnss clock warning", "antenna lever arm warning", "mounting transform warning", "solution error", "solution error", "solution error", "solution error", "solution error"]

# filter seperation = 2, other error = 1, nothing wrong = 0
self.warning = 0

self.warning_buffer_time = 1000 # warning flags active >1 second before human driver alerted

self.warning_durations = [0] * 18 # keeps track of how long covariance, overrange warning, any of the filter status flags are active

rospy.Subscriber(self_name + "/imu/overrange_status", ImuOverrangeStatus, self.update_overrange_status)
rospy.Subscriber(self_name + "/nav/status.status_flags", FilterStatus, self.update_status_flags)
rospy.Subscriber(self_name + "/gnss1/fix_Pose/", PoseStamped, self.update_gps_location)
rospy.Subscriber(self_name + "/nav/odom", Odometry, self.update_filter_location)


# these publishers are all bools as quick sanity checks (can display as indicators on foxglove for colors)
self.overrange_status_publisher = rospy.Publisher(self_name + "/debug/imu_overrange_status", Bool, queue_size=1)

self.filter_gps_status_publisher = rospy.Publisher(self_name + "/debug/filter_gps_seperation_status", Bool, queue_size=1)

self.covariance_status_publisher = rospy.Publisher(self_name + "/debug/covariance_status", Bool, queue_size=1)

self.error_message_publisher = rospy.Publisher(
self_name + "/nav/status/error_messages", String, queue_size=1
)

self.status_flags_publisher = rospy.Publisher(
self_name + "/nav/status/tripped_status_flags", Int8MultiArray, queue_size=1
)

self.overall_warning_publisher = rospy.Publisher(
self_name + "/debug/sanity_warning", Int8, queue_size=1
)

def update_overrange_status(self, msg : ImuOverrangeStatus):
self.imu_overrange_status = msg.status

def update_status_flags(self, msg : FilterStatus):
self.status_flag_val = msg.gq7_status_flags

def update_gps_location(self, msg : PoseStamped):
self.gps_location = msg.pose

def update_filter_location(self, msg):
self.filter_location = msg.pose

def update_warning_flags (self, idx):
self.warning_durations[idx] += 10
if (self.warning_durations[idx] > self.warning_buffer_time):
self.warning = 1

def calc_covariance(self):
if (self.filter_location == None):
self.covariance_status_publisher.publish(False)
else:
good_covariance = self.filter_location.covariance[0] ** 2 + self.filter_location.covariance[7] ** 2 <= 1**2
if (not good_covariance):
self.update_warning_flags(16)
else:
self.warning_durations[16] = 0
self.covariance_status_publisher.publish(good_covariance)


def calc_locations(self):
# currently comparing cross-track error, checking less than 0.5 m
if (self.filter_location == None or self.gps_location == None):
self.filter_gps_status_publisher.publish(False)
else:
good_seperation = abs(self.filter_location.pose.position.y - self.gps_location.position.y) < 0.5
if (not good_seperation):
self.warning = 2

self.filter_gps_status_publisher.publish(good_seperation)

def is_overrange (self):
s = self.imu_overrange_status
if (s == None):
self.overrange_status_publisher.publish(False)
else:
accel_status = s.status_accel_x or s.status_accel_y or s.status_accel_z
gyro_status = s.status_gyro_x or s.status_gyro_y or s.status_gyro_z
mag_status = s.status_mag_x or s.status_mag_y or s.status_mag_z
is_overrange = accel_status or gyro_status or mag_status or s.status_press
if (is_overrange):
self.update_warning_flags(17)
else:
self.warning_durations[17] = 0
# publishes true if ALL flags are FALSE (if the imu looks good)
self.imu_overrange_status.publish(not is_overrange)

def filter_status_warning (self):
b = bin(self.status_flag_val)
self.flags = Int8MultiArray()
self.flags.data = []
error_message = ""

# reading filter condition bits
if (b[0] == 1 and b[1] == 1): # filter unstable
self.update_warning_flags(0)
self.flags.append(0)
self.flags.append(1)
error_message += self.error_messages[0] + " "

if (b[0] == 1 and b[1] == 0): # filter converging
self.update_warning_flags(1)
self.flags.append(0)
error_message += self.error_messages[1] + " "

for i in range (len(b)):
if (i >= 2): # if not reading filter condiiton bits (already considered)
if (b[i] == '1'):
self.update_warning_flags(i)
self.flags.append(i)
error_message += self.error_messages[i] + " "
else:
self.warning_durations[i] = 0
# publish bit values
self.status_flags_publisher.publish(self.flags)

# publish string translations
self.error_message_publisher.publish(error_message)

def sanity_check(self):
self.warning = False
self.calc_covariance()
self.calc_locations()
self.is_overrange()
self.filter_status_warning()
self.overall_warning_publisher.publish(self.warning)

Jackack marked this conversation as resolved.
Show resolved Hide resolved
if __name__ == "__main__":
rospy.init_node("rolling_sanity_check")
check = SanityCheck(sys.argv[1])
rate = rospy.Rate(100)

while not rospy.is_shutdown():
check.sanity_check()
rate.sleep()
Loading