Skip to content

Commit

Permalink
updated ros_to_bnyahaj.py
Browse files Browse the repository at this point in the history
  • Loading branch information
mehulgoel873 committed May 15, 2024
1 parent 7517fc9 commit 06bcbda
Showing 1 changed file with 51 additions and 11 deletions.
62 changes: 51 additions & 11 deletions rb_ws/src/buggy/scripts/serial/ros_to_bnyahaj.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,42 @@
from threading import Lock

import sys
#Allows import of world and pose from auton directory
sys.path.append("/rb_ws/src/buggy/scripts/auton")
from world import World
from pose import Pose

import argparse
import rospy


#Ros Message Imports
from std_msgs.msg import Float64, Bool, Int8, UInt8
from nav_msgs.msg import Odometry as ROSOdom

from host_comm import *

from world import World
from pose import Pose
"""
Translates the output from bnyahaj serial (interpreted from host_comm) to ros topics and vice versa.
Performs reading (from Bnya Serial) and writing (from Ros Topics) on different python threads, so
be careful of multithreading synchronizaiton issues.
SC:
(ROS) Self Steering topic --> (Bnyahaj) Stepper Motor
(Bnyahaj) NAND Odom --> (ROS) NAND Odom topic
NAND:
(ROS) Self Steering --> (Bnyahaj) Stepper Motor
(Bnyahaj) Self Odom from UKF --> (ROS) NAND Odom topic
"""
class Translator:
"""
self_name: Buggy namespace
other_name: Only requried by SC for passing buggy namespace
teensy_name: required for communication, different for SC and NAND
Initializes the subscribers, rates, and ros topics (including debug topics)
"""
def __init__(self, self_name, other_name, teensy_name):
self.comms = Comms("/dev/" + teensy_name)
self.steer_angle = 0
Expand All @@ -30,16 +50,18 @@ def __init__(self, self_name, other_name, teensy_name):
rospy.Subscriber(self_name + "/buggy/input/steering", Float64, self.set_steering)
rospy.Subscriber(self_name + "/debug/sanity_warning", Int8, self.set_alarm)


#ISSUE: https://github.com/CMU-Robotics-Club/RoboBuggy2/issues/83
if other_name is None and self_name == "NAND":
self.odom_publisher = rospy.Publisher(self_name + "/nav/odom", ROSOdom, queue_size=1)
else:
self.odom_publisher = rospy.Publisher(other_name + "/nav/odom", ROSOdom, queue_size=1)
# upper bound of steering update rate, make sure auton sends slower than this

# upper bound of steering update rate, make sure auton sends slower than 500 Hz or update / 2ms
self.steer_send_rate = rospy.Rate(500)
# upper bound of reading data from Bnyahaj Serial, at 1ms
self.read_rate = rospy.Rate(1000)

# DOES NAND GET ALL THIS DEBUG INFORMATION???
# ISSUE: https://github.com/CMU-Robotics-Club/RoboBuggy2/issues/90
self.rc_steering_angle_publisher = rospy.Publisher(self_name + "/buggy/debug/rc_steering_angle", Float64, queue_size=1)
self.steering_angle_publisher = rospy.Publisher(self_name + "/buggy/debug/steering_angle", Float64, queue_size=1)
self.battery_voltage_publisher = rospy.Publisher(self_name + "/buggy/debug/battery_voltage", Float64, queue_size=1)
Expand All @@ -50,18 +72,25 @@ def __init__(self, self_name, other_name, teensy_name):
self.rc_uplink_qual_publisher = rospy.Publisher(self_name + "/buggy/debug/rc_uplink_quality", UInt8, queue_size=1)
self.nand_fix_publisher = rospy.Publisher(self_name + "/buggy/debug/nand_fix", UInt8, queue_size=1)

#alarm ros topic reader, locked so that only one of the setters runs at once
def set_alarm(self, msg):
with self.lock:
self.alarm = msg.data

#Steering Angle Updater
#Steering Angle Updater, updates the steering angle locally if updated on ros stopic
def set_steering(self, msg):
rospy.loginfo(f"Read steeering angle of: {msg.data}")
#print("Steering angle: " + str(msg.data))
# print("SET STEERING: " + str(msg.data))
with self.lock:
self.steer_angle = msg.data
self.fresh_steer = True

"""
Sends ROS Topics to bnayhaj serial, only sends a steering angle when we receive a fresh one
Will send steering and alarm node.
TODO: Does alarm node exist for NAND?
"""
def writer_thread(self):
rospy.loginfo('Starting sending alarm and steering to teensy!')
while True:
Expand All @@ -75,11 +104,16 @@ def writer_thread(self):

self.steer_send_rate.sleep()

"""
Reads three different types of packets, that should have better ways of differntiating
Odometry -> (SC) NAND Odomotery
BnyaTelemetry -> (NAND) Self Odom
tuple -> (SC, maybe NAND?) Debug Info
"""
def reader_thread(self):
rospy.loginfo('Starting reading odom from teensy!')
while True:
packet = self.comms.read_packet()
# print("trying to read odom")

if isinstance(packet, Odometry):
rospy.logdebug("packet" + str(packet))
Expand All @@ -97,6 +131,8 @@ def reader_thread(self):
elif isinstance(packet, BnyaTelemetry):
rospy.logdebug("packet" + str(packet))
odom = ROSOdom()

#TODO: Not mock rolled accurately (Needs to be Fact Checked)
try:
lat, long = World.utm_to_gps(packet.y, packet.x)
odom.pose.pose.position.x = long
Expand All @@ -119,10 +155,7 @@ def reader_thread(self):
except Exception as e:
rospy.logwarn("Unable to convert other buggy position to lon lat" + str(e))



elif isinstance(packet, tuple): #Are there any other packet that is a tuple
# print(packet)
self.rc_steering_angle_publisher.publish(Float64(packet[0]))
self.steering_angle_publisher.publish(Float64(packet[1]))
self.battery_voltage_publisher.publish(Float64(packet[2]))
Expand All @@ -135,6 +168,7 @@ def reader_thread(self):

self.read_rate.sleep()

# Initialies the reader and writer thread, should theoretically never finish as there are while loops
def loop(self):
p1 = threading.Thread(target=self.writer_thread)
p2 = threading.Thread(target=self.reader_thread)
Expand All @@ -145,6 +179,8 @@ def loop(self):
p1.join()
p2.join()

# Initializes ros nodes, using self and other name
#other name is not requires, and if not submitted, use NONE
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--self_name", type=str, help="name of ego-buggy", required=True)
Expand All @@ -156,5 +192,9 @@ def loop(self):
teensy_name = args.teensy_name

rospy.init_node("ros_bnyahaj")
if self_name == "SC" and other_name == None:
rospy.loerr("Not reading NAND Odometry messages, double check roslaunch files for ros_to_bnyahaj")
elif other_name == None:
rospy.loginfo("No other name passed in, presuming that this is NAND ")
translate = Translator(self_name, other_name, teensy_name)
translate.loop()

0 comments on commit 06bcbda

Please sign in to comment.