Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

serial port with updated packet structure #13

Merged
merged 20 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 123 additions & 29 deletions rb_ws/src/buggy/buggy/serial/host_comm.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,28 +58,104 @@ def update(self, b):

MAX_PAYLOAD_LEN = 100

MSG_TYPE_DEBUG = b'DB'
MSG_TYPE_ODOMETRY = b'OD'

# firmware --> software
MSG_TYPE_NAND_DEBUG = b'ND'
MSG_TYPE_NAND_UKF = b'NU'
MSG_TYPE_NAND_GPS = b'NG'
MSG_TYPE_RADIO = b'RA'
MSG_TYPE_SC_DEBUG = b'SD'
MSG_TYPE_SC_SENSORS = b'SS'
MSG_TYPE_ROUNDTRIP_TIMESTAMP = b'RT'

# software --> firmware
MSG_TYPE_STEERING = b'ST'
MSG_TYPE_BRAKE = b'BR'
MSG_TYPE_ALARM = b'AL'
MSG_TYPE_BNYATEL = b'BT'
MSG_TYPE_SOFTWARE_TIMESTAMP = b'TM'



@dataclass
class Odometry:
x: float
y: float
radio_seqnum: int
class NANDDebugInfo:
# 64 bits
heading_rate: float # double
encoder_angle: float # double
# 32 bits
timestamp: int
rc_steering_angle: float
software_steering_angle: float
true_steering_angle: float
mehulgoel873 marked this conversation as resolved.
Show resolved Hide resolved
rfm69_timeout_num: int
# 8 bits
operator_ready: bool
brake_status: bool
auton_steer: bool
tx12_state: bool
stepper_alarm: int # unsigned char
mehulgoel873 marked this conversation as resolved.
Show resolved Hide resolved
rc_uplink_quality: int # uint8

@dataclass
class NANDUKF:
# 64 bits
easting: float # double
northing: float # double
theta: float # double
heading_rate: float # double
velocity: float # double
# 32 bits
timestamp: int

@dataclass
class NANDRawGPS:
# 64 bits
easting: float # double
northing: float # double
# this is a 2D accuracy value
accuracy: float # double
gps_time: int # uint64
# 32 bits
gps_seqnum: int
timestamp: int
# 8 bits
gps_fix: int # uint8

@dataclass
class Radio:
nand_east_gps: float
nand_north_gps: float
gps_seqnum: int
nand_gps_fix: int # uint8

@dataclass
class SCDebugInfo:
# 64 bits
encoder_angle: float # double
# 32 bits
rc_steering_angle: float
software_steering_angle: float
true_steering_angle: float
missed_packets: int
timestamp: int
# 8 bits
tx12_state: bool
operator_ready: bool
stepper_alarm: int # unsigned char
brake_status: bool
auton_steer: bool
rc_uplink_quality: int # uint8

@dataclass
class BnyaTelemetry:
x: float
y: float
velocity: float # In METERS / SECOND
steering: float # In DEGREES
heading: float # In RADIANS
heading_rate: float # In RADIANS / SECOND
class SCSensors:
# 64 bits
velocity: float # double
# 32 bits
steering_angle: float
timestamp: int

@dataclass
class RoundtripTimestamp:
returned_time: float # double


class IncompletePacket(Exception):
pass
Expand Down Expand Up @@ -115,9 +191,11 @@ def send_steering(self, angle: float):
def send_alarm(self, status: int):
self.send_packet_raw(MSG_TYPE_ALARM, struct.pack('<B', status))

def send_timestamp(self, time: float):
self.send_packet_raw(MSG_TYPE_SOFTWARE_TIMESTAMP, struct.pack('<d', time))

def read_packet_raw(self):
self.rx_buffer += self.port.read_all() #type:ignore

try:
return self._try_parse_buffer()
except IncompletePacket:
Expand Down Expand Up @@ -184,21 +262,37 @@ def read_packet(self):
return None

msg_type, payload = packet
if msg_type == MSG_TYPE_ODOMETRY:
# Odometry message
x, y, radio_seqnum, gps_seqnum = struct.unpack('<ddII', payload)
return Odometry(x, y, radio_seqnum, gps_seqnum)
elif msg_type == MSG_TYPE_DEBUG:
# Debug message
debug = struct.unpack('<fff??B?BBxx', payload)
# print(debug)
return debug
elif msg_type == MSG_TYPE_BNYATEL:
x, y, vel, steering, heading, heading_rate = struct.unpack('<dddddd', payload)
return BnyaTelemetry(x, y, vel, steering, heading, heading_rate)
if msg_type == MSG_TYPE_NAND_DEBUG:
data = struct.unpack('<ddIfffI????BB', payload)
mehulgoel873 marked this conversation as resolved.
Show resolved Hide resolved
return NANDDebugInfo(*data)

elif msg_type == MSG_TYPE_NAND_UKF:
data = struct.unpack('<dddddI', payload)
return NANDUKF(*data)

elif msg_type == MSG_TYPE_NAND_GPS:
data = struct.unpack('<dddQIIB', payload)
return NANDRawGPS(*data)

elif msg_type == MSG_TYPE_RADIO:
data = struct.unpack('<ffIB', payload)
return Radio(*data)

elif msg_type == MSG_TYPE_SC_DEBUG:
data = struct.unpack('<dfffII??B??B', payload)
return SCDebugInfo(*data)

elif msg_type == MSG_TYPE_SC_SENSORS:
data = struct.unpack('<dfI', payload)
return SCSensors(*data)

elif msg_type == MSG_TYPE_ROUNDTRIP_TIMESTAMP:
time = struct.unpack('<d', payload)
return RoundtripTimestamp(time)
else:
print(f'Unknown packet type {msg_type}')
return None
# print(f'Unknown packet type {msg_type}')



def main():
Expand Down
Loading
Loading