Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

serial port with updated packet structure #13

Merged
merged 20 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 29 additions & 26 deletions rb_ws/src/buggy/buggy/buggy_state_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,47 +12,51 @@ def __init__(self):
super().__init__("buggy_state_converter")

namespace = self.get_namespace()
if namespace == "/SC":
self.SC_raw_state_subscriber = self.create_subscription(
Odometry, "/raw_state", self.convert_SC_state_callback, 10
)

# Create publisher and subscriber for "self" namespace
self.self_raw_state_subscriber = self.create_subscription(
Odometry, "self/raw_state", self.self_raw_state_callback, 10
)
self.self_state_publisher = self.create_publisher(Odometry, "self/state", 10)
self.NAND_other_raw_state_subscriber = self.create_subscription(
Odometry, "/NAND_raw_state", self.convert_NAND_other_state_callback, 10
)

# Check if namespace is "SC" to add an "other" subscriber/publisher
if namespace == "/SC":
self.other_raw_state_subscriber = self.create_subscription(
Odometry, "other/raw_state", self.other_raw_state_callback, 10
self.other_state_publisher = self.create_publisher(Odometry, "/other/state", 10)

elif namespace == "/NAND":
self.NAND_raw_state_subscriber = self.create_subscription(
Odometry, "/raw_state", self.convert_NAND_state_callback, 10
)
self.other_state_publisher = self.create_publisher(Odometry, "other/state", 10)

else:
self.get_logger().warn(f"Namespace not recognized for buggy state conversion: {namespace}")

self.self_state_publisher = self.create_publisher(Odometry, "/state", 10)

# Initialize pyproj Transformer for ECEF -> UTM conversion for /SC
self.ecef_to_utm_transformer = pyproj.Transformer.from_crs(
"epsg:4978", "epsg:32617", always_xy=True
) # TODO: Confirm UTM EPSG code, using EPSG:32617 for UTM Zone 17N

def self_raw_state_callback(self, msg):
""" Callback for processing self/raw_state messages and publishing to self/state """
namespace = self.get_namespace()

if namespace == "/SC":
converted_msg = self.convert_SC_state(msg)
elif namespace == "/NAND":
converted_msg = self.convert_NAND_state(msg)
else:
self.get_logger().warn(f"Namespace not recognized for buggy state conversion: {namespace}")
return
def convert_SC_state_callback(self, msg):
""" Callback for processing SC/raw_state messages and publishing to self/state """
converted_msg = self.convert_SC_state(msg)
self.self_state_publisher.publish(converted_msg)

def convert_NAND_state_callback(self, msg):
""" Callback for processing NAND/raw_state messages and publishing to self/state """
converted_msg = self.convert_NAND_state(msg)
self.self_state_publisher.publish(converted_msg)

def other_raw_state_callback(self, msg):
""" Callback for processing other/raw_state messages and publishing to other/state """
# Convert the SC message and publish to other/state

def convert_NAND_other_state_callback(self, msg):
""" Callback for processing SC/NAND_raw_state messages and publishing to other/state """
converted_msg = self.convert_NAND_other_state(msg)
self.other_state_publisher.publish(converted_msg)


def convert_SC_state(self, msg):
"""
"""
Converts self/raw_state in SC namespace to clean state units and structure

Takes in ROS message in nav_msgs/Odometry format
Expand Down Expand Up @@ -127,7 +131,6 @@ def convert_NAND_state(self, msg):
converted_msg.twist.covariance = msg.twist.covariance

# ---- 4. Linear Velocities in m/s ----
# TODO: Check if scalar velocity is coming in from msg.twist.twist.linear.x
# Convert scalar speed to velocity x/y components using heading (orientation.z)
speed = msg.twist.twist.linear.x # m/s scalar velocity
heading = msg.pose.pose.orientation.z # heading in radians
Expand Down
152 changes: 123 additions & 29 deletions rb_ws/src/buggy/buggy/serial/host_comm.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,28 +58,104 @@ def update(self, b):

MAX_PAYLOAD_LEN = 100

MSG_TYPE_DEBUG = b'DB'
MSG_TYPE_ODOMETRY = b'OD'

# firmware --> software
MSG_TYPE_NAND_DEBUG = b'ND'
MSG_TYPE_NAND_UKF = b'NU'
MSG_TYPE_NAND_GPS = b'NG'
MSG_TYPE_RADIO = b'RA'
MSG_TYPE_SC_DEBUG = b'SD'
MSG_TYPE_SC_SENSORS = b'SS'
MSG_TYPE_ROUNDTRIP_TIMESTAMP = b'RT'

# software --> firmware
MSG_TYPE_STEERING = b'ST'
MSG_TYPE_BRAKE = b'BR'
MSG_TYPE_ALARM = b'AL'
MSG_TYPE_BNYATEL = b'BT'
MSG_TYPE_SOFTWARE_TIMESTAMP = b'TM'



@dataclass
class Odometry:
x: float
y: float
radio_seqnum: int
class NANDDebugInfo:
# 64 bits
heading_rate: float # double
encoder_angle: float # double
# 32 bits
timestamp: int
rc_steering_angle: float
software_steering_angle: float
true_steering_angle: float
mehulgoel873 marked this conversation as resolved.
Show resolved Hide resolved
rfm69_timeout_num: int
# 8 bits
operator_ready: bool
brake_status: bool
auton_steer: bool
tx12_state: bool
stepper_alarm: int # unsigned char
mehulgoel873 marked this conversation as resolved.
Show resolved Hide resolved
rc_uplink_quality: int # uint8

@dataclass
class NANDUKF:
# 64 bits
easting: float # double
northing: float # double
theta: float # double
heading_rate: float # double
velocity: float # double
# 32 bits
timestamp: int

@dataclass
class NANDRawGPS:
# 64 bits
easting: float # double
northing: float # double
# this is a 2D accuracy value
accuracy: float # double
gps_time: int # uint64
# 32 bits
gps_seqnum: int
timestamp: int
# 8 bits
gps_fix: int # uint8

@dataclass
class Radio:
nand_east_gps: float
nand_north_gps: float
gps_seqnum: int
nand_gps_fix: int # uint8

@dataclass
class SCDebugInfo:
# 64 bits
encoder_angle: float # double
# 32 bits
rc_steering_angle: float
software_steering_angle: float
true_steering_angle: float
missed_packets: int
timestamp: int
# 8 bits
tx12_state: bool
operator_ready: bool
stepper_alarm: int # unsigned char
brake_status: bool
auton_steer: bool
rc_uplink_quality: int # uint8

@dataclass
class BnyaTelemetry:
x: float
y: float
velocity: float # In METERS / SECOND
steering: float # In DEGREES
heading: float # In RADIANS
heading_rate: float # In RADIANS / SECOND
class SCSensors:
# 64 bits
velocity: float # double
# 32 bits
steering_angle: float
timestamp: int

@dataclass
class RoundtripTimestamp:
returned_time: float # double


class IncompletePacket(Exception):
pass
Expand Down Expand Up @@ -115,9 +191,11 @@ def send_steering(self, angle: float):
def send_alarm(self, status: int):
self.send_packet_raw(MSG_TYPE_ALARM, struct.pack('<B', status))

def send_timestamp(self, time: float):
self.send_packet_raw(MSG_TYPE_SOFTWARE_TIMESTAMP, struct.pack('<d', time))

def read_packet_raw(self):
self.rx_buffer += self.port.read_all() #type:ignore

try:
return self._try_parse_buffer()
except IncompletePacket:
Expand Down Expand Up @@ -184,21 +262,37 @@ def read_packet(self):
return None

msg_type, payload = packet
if msg_type == MSG_TYPE_ODOMETRY:
# Odometry message
x, y, radio_seqnum, gps_seqnum = struct.unpack('<ddII', payload)
return Odometry(x, y, radio_seqnum, gps_seqnum)
elif msg_type == MSG_TYPE_DEBUG:
# Debug message
debug = struct.unpack('<fff??B?BBxx', payload)
# print(debug)
return debug
elif msg_type == MSG_TYPE_BNYATEL:
x, y, vel, steering, heading, heading_rate = struct.unpack('<dddddd', payload)
return BnyaTelemetry(x, y, vel, steering, heading, heading_rate)
if msg_type == MSG_TYPE_NAND_DEBUG:
data = struct.unpack('<ddIfffI????BB', payload)
mehulgoel873 marked this conversation as resolved.
Show resolved Hide resolved
return NANDDebugInfo(*data)

elif msg_type == MSG_TYPE_NAND_UKF:
data = struct.unpack('<dddddI', payload)
return NANDUKF(*data)

elif msg_type == MSG_TYPE_NAND_GPS:
data = struct.unpack('<dddQIIB', payload)
return NANDRawGPS(*data)

elif msg_type == MSG_TYPE_RADIO:
data = struct.unpack('<ffIB', payload)
return Radio(*data)

elif msg_type == MSG_TYPE_SC_DEBUG:
data = struct.unpack('<dfffII??B??B', payload)
return SCDebugInfo(*data)

elif msg_type == MSG_TYPE_SC_SENSORS:
data = struct.unpack('<dfI', payload)
return SCSensors(*data)

elif msg_type == MSG_TYPE_ROUNDTRIP_TIMESTAMP:
time = struct.unpack('<d', payload)
return RoundtripTimestamp(time)
else:
print(f'Unknown packet type {msg_type}')
return None
# print(f'Unknown packet type {msg_type}')



def main():
Expand Down
Loading
Loading