diff --git a/.gitignore b/.gitignore
index 60a64061..88667fbf 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,8 +1,15 @@
.DS_Store
-rb_ws/bags
.docker-compose.yml.un~
.python-requirements.txt.un~
docker-compose.yml~
*TEMP_DO_NOT_EDIT.txt
-rb_ws/src/buggy/bags/*
-*.bag
\ No newline at end of file
+
+# BAGS
+rb_ws/bags
+rb_ws/src/buggy/bags/*
+*.bag
+rb_ws/rosbag2/
+
+# VISION
+.vision/
+vision/data/
diff --git a/Dockerfile b/Dockerfile
index 6725e778..4ffeb8de 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -23,16 +23,8 @@ RUN pip3 install -r python-requirements.txt
RUN echo 'source "/opt/ros/humble/setup.bash" --' >> ~/.bashrc && \
echo 'cd rb_ws' >> ~/.bashrc && \
echo 'colcon build --symlink-install' >> ~/.bashrc && \
- echo 'source install/local_setup.bash' >> ~/.bashrc
-# RUN echo 'source "/opt/ros/humble/setup.bash" --' >> ~/.bashrc && \
-# echo 'cd rb_ws' >> ~/.bashrc && \
-# echo 'catkin_make >/dev/null' >> ~/.bashrc && \
-# echo 'source devel/setup.bash' >> ~/.bashrc
-
-
-
-# RUN echo "exec firefox" > ~/.xinitrc && chmod +x ~/.xinitrc
-# CMD ["x11vnc", "-create", "-forever"]
+ echo 'source install/local_setup.bash' >> ~/.bashrc && \
+ echo 'chmod -R +x src/buggy/scripts/' >> ~/.bashrc
# add mouse to tmux
RUN echo 'set -g mouse on' >> ~/.tmux.conf
diff --git a/python-requirements.txt b/python-requirements.txt
index 68b0c6fc..d75cc066 100644
--- a/python-requirements.txt
+++ b/python-requirements.txt
@@ -5,6 +5,8 @@ numpy
osqp
pandas
pymap3d
+pyproj
+pyserial
scipy
setuptools==58.2.0
trimesh
diff --git a/rb_ws/src/buggy/CMakeLists.txt b/rb_ws/src/buggy/CMakeLists.txt
new file mode 100644
index 00000000..a8a51421
--- /dev/null
+++ b/rb_ws/src/buggy/CMakeLists.txt
@@ -0,0 +1,48 @@
+cmake_minimum_required(VERSION 3.8)
+project(buggy)
+
+# Default to C++14
+if(NOT CMAKE_CXX_STANDARD)
+ set(CMAKE_CXX_STANDARD 14)
+endif()
+
+if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
+ add_compile_options(-Wall -Wextra -Wpedantic)
+endif()
+
+# Find dependencies
+find_package(ament_cmake REQUIRED)
+find_package(ament_cmake_python REQUIRED)
+find_package(rclcpp REQUIRED)
+find_package(rclpy REQUIRED)
+
+# Install Launch Files
+install(DIRECTORY
+ launch
+ DESTINATION share/${PROJECT_NAME}
+)
+
+# Install Python modules
+# ament_python_install_package(${PROJECT_NAME})
+
+# Install Python executables
+install(PROGRAMS
+ scripts/hello_world.py
+ scripts/controller/controller_node.py
+ scripts/path_planner/path_planner.py
+ scripts/simulator/engine.py
+ scripts/watchdog/watchdog_node.py
+ scripts/buggy_state_converter.py
+ scripts/serial/ros_to_bnyahaj.py
+ DESTINATION lib/${PROJECT_NAME}
+)
+
+
+find_package(rosidl_default_generators REQUIRED)
+
+rosidl_generate_interfaces(${PROJECT_NAME}
+ "msg/TrajectoryMsg.msg"
+)
+ament_export_dependencies(rosidl_default_runtime)
+
+ament_package()
\ No newline at end of file
diff --git a/rb_ws/src/buggy/buggy/visualization/telematics.py b/rb_ws/src/buggy/buggy/visualization/telematics.py
new file mode 100644
index 00000000..9d1fd804
--- /dev/null
+++ b/rb_ws/src/buggy/buggy/visualization/telematics.py
@@ -0,0 +1,63 @@
+#! /usr/bin/env python3
+# Runs the conversion script for all telematics data
+
+import rclpy
+import utm
+from nav_msgs.msg import Odometry
+from rclpy.node import Node
+from sensor_msgs.msg import NavSatFix
+
+
+class Telematics(Node):
+ """
+ Converts subscribers and publishers that need to be reformated, so that they are readible.
+ """
+
+ def __init__(self):
+ """Generate all the subscribers and publishers that need to be reformatted.
+ """
+ super().__init__('telematics')
+
+ # Implements behavior of callback_args from rospy.Subscriber
+ def wrap_args(callback, callback_args):
+ return lambda msg: callback(msg, callback_args)
+
+ self.self_publisher = self.create_publisher(NavSatFix, "/self/state_navsatfix", 10)
+ self.self_subscriber = self.create_subscription(Odometry, "/self/state", wrap_args(self.convert_buggystate, self.self_publisher), 1)
+
+ self.other_publisher = self.create_publisher(NavSatFix, "/other/state_navsatfix", 10)
+ self.other_subscriber = self.create_subscription(Odometry, "/other/state", wrap_args(self.convert_buggystate, self.other_publisher), 1)
+
+ # TODO Make this a static method?
+ def convert_buggystate(self, msg, publisher):
+ """Converts BuggyState/Odometry message to NavSatFix and publishes to specified publisher
+
+ Args:
+ msg (Odometry): Buggy state to convert
+ publisher (Publisher): Publisher to send NavSatFix message to
+ """
+ try:
+ y = msg.pose.pose.position.y
+ x = msg.pose.pose.position.x
+ lat, long = utm.to_latlon(x, y, 17, "T")
+ down = msg.pose.pose.position.z
+ new_msg = NavSatFix()
+ new_msg.header = msg.header
+ new_msg.latitude = lat
+ new_msg.longitude = long
+ new_msg.altitude = down
+ publisher.publish(new_msg)
+
+ except Exception as e:
+ self.get_logger().warn(
+ "Unable to convert other buggy position to lon lat" + str(e)
+ )
+
+
+if __name__ == "__main__":
+ rclpy.init()
+ telem = Telematics()
+ rclpy.spin(telem)
+
+ telem.destroy_node()
+ rclpy.shutdown()
\ No newline at end of file
diff --git a/rb_ws/src/buggy/launch/sim_2d_double.xml b/rb_ws/src/buggy/launch/sim_2d_double.xml
new file mode 100644
index 00000000..b9d5ba61
--- /dev/null
+++ b/rb_ws/src/buggy/launch/sim_2d_double.xml
@@ -0,0 +1,31 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/rb_ws/src/buggy/launch/sim_2d_single.xml b/rb_ws/src/buggy/launch/sim_2d_single.xml
index ca458b80..f0e72042 100755
--- a/rb_ws/src/buggy/launch/sim_2d_single.xml
+++ b/rb_ws/src/buggy/launch/sim_2d_single.xml
@@ -1,16 +1,15 @@
-
-
+
+
-
-
-
+
+
+
+
+
-
-
\ No newline at end of file
diff --git a/rb_ws/src/buggy/launch/watchdog.xml b/rb_ws/src/buggy/launch/watchdog.xml
index 28c784df..5c74690e 100644
--- a/rb_ws/src/buggy/launch/watchdog.xml
+++ b/rb_ws/src/buggy/launch/watchdog.xml
@@ -1,4 +1,4 @@
-
+
\ No newline at end of file
diff --git a/rb_ws/src/buggy/package.xml b/rb_ws/src/buggy/package.xml
index e1471e2c..26879e3d 100644
--- a/rb_ws/src/buggy/package.xml
+++ b/rb_ws/src/buggy/package.xml
@@ -2,18 +2,30 @@
buggy
- 0.0.0
- TODO: Package description
- root
- TODO: License declaration
+ 2.0.0
+ CMU's First Robotic Buggy
+ CMU's Robotic Club Officers
+ MIT
+
+ ament_cmake
+ ament_cmake_python
+
+
+ rclcpp
+ rclpy
+
- ament_copyright
- ament_flake8
- ament_pep257
- python3-pytest
ros2launch
+ ament_lint_auto
+ ament_lint_common
+
+ rosidl_default_generators
+ rosidl_default_runtime
+ rosidl_interface_packages
+
+
- ament_python
+ ament_cmake
-
+
\ No newline at end of file
diff --git a/rb_ws/src/buggy/resource/buggy b/rb_ws/src/buggy/resource/buggy
deleted file mode 100644
index e69de29b..00000000
diff --git a/rb_ws/src/buggy/scripts/buggy_state_converter.py b/rb_ws/src/buggy/scripts/buggy_state_converter.py
new file mode 100755
index 00000000..acc3d807
--- /dev/null
+++ b/rb_ws/src/buggy/scripts/buggy_state_converter.py
@@ -0,0 +1,201 @@
+#!/usr/bin/env python3
+
+import rclpy
+from rclpy.node import Node
+from nav_msgs.msg import Odometry
+import numpy as np
+import pyproj
+from scipy.spatial.transform import Rotation
+
+class BuggyStateConverter(Node):
+ def __init__(self):
+ super().__init__("buggy_state_converter")
+
+ namespace = self.get_namespace()
+ if namespace == "/SC":
+ self.SC_raw_state_subscriber = self.create_subscription(
+ Odometry, "/raw_state", self.convert_SC_state_callback, 10
+ )
+
+ self.NAND_other_raw_state_subscriber = self.create_subscription(
+ Odometry, "/NAND_raw_state", self.convert_NAND_other_state_callback, 10
+ )
+
+ self.other_state_publisher = self.create_publisher(Odometry, "/other/state", 10)
+
+ elif namespace == "/NAND":
+ self.NAND_raw_state_subscriber = self.create_subscription(
+ Odometry, "/raw_state", self.convert_NAND_state_callback, 10
+ )
+
+ else:
+ self.get_logger().warn(f"Namespace not recognized for buggy state conversion: {namespace}")
+
+ self.self_state_publisher = self.create_publisher(Odometry, "/state", 10)
+
+ # Initialize pyproj Transformer for ECEF -> UTM conversion for /SC
+ self.ecef_to_utm_transformer = pyproj.Transformer.from_crs(
+ "epsg:4978", "epsg:32617", always_xy=True
+ ) # TODO: Confirm UTM EPSG code, using EPSG:32617 for UTM Zone 17N
+
+ def convert_SC_state_callback(self, msg):
+ """ Callback for processing SC/raw_state messages and publishing to self/state """
+ converted_msg = self.convert_SC_state(msg)
+ self.self_state_publisher.publish(converted_msg)
+
+ def convert_NAND_state_callback(self, msg):
+ """ Callback for processing NAND/raw_state messages and publishing to self/state """
+ converted_msg = self.convert_NAND_state(msg)
+ self.self_state_publisher.publish(converted_msg)
+
+
+ def convert_NAND_other_state_callback(self, msg):
+ """ Callback for processing SC/NAND_raw_state messages and publishing to other/state """
+ converted_msg = self.convert_NAND_other_state(msg)
+ self.other_state_publisher.publish(converted_msg)
+
+
+ def convert_SC_state(self, msg):
+ """
+ Converts self/raw_state in SC namespace to clean state units and structure
+
+ Takes in ROS message in nav_msgs/Odometry format
+ Assumes that the SC namespace is using ECEF coordinates and quaternion orientation
+ """
+
+ converted_msg = Odometry()
+ converted_msg.header = msg.header
+
+ # ---- 1. Convert ECEF Position to UTM Coordinates ----
+ ecef_x = msg.pose.pose.position.x
+ ecef_y = msg.pose.pose.position.y
+ ecef_z = msg.pose.pose.position.z
+
+ # Convert ECEF to UTM
+ utm_x, utm_y, utm_z = self.ecef_to_utm_transformer.transform(ecef_x, ecef_y, ecef_z)
+ converted_msg.pose.pose.position.x = utm_x # UTM Easting
+ converted_msg.pose.pose.position.y = utm_y # UTM Northing
+ converted_msg.pose.pose.position.z = utm_z # UTM Altitude
+
+ # ---- 2. Convert Quaternion to Heading (Radians) ----
+ qx, qy, qz, qw = msg.pose.pose.orientation.x, msg.pose.pose.orientation.y, msg.pose.pose.orientation.z, msg.pose.pose.orientation.w
+
+ # Use Rotation.from_quat to get roll, pitch, yaw
+ roll, pitch, yaw = Rotation.from_quat([qx, qy, qz, qw]).as_euler('xyz')
+ # roll, pitch, yaw = euler_from_quaternion([qx, qy, qz, qw]) # tf_transformations bad
+
+ # Store the heading in the x component of the orientation
+ converted_msg.pose.pose.orientation.x = roll
+ converted_msg.pose.pose.orientation.y = pitch
+ converted_msg.pose.pose.orientation.z = yaw
+ converted_msg.pose.pose.orientation.w = 0.0 # fourth (quaternion) term irrelevant for euler angles
+
+ # ---- 3. Copy Covariances (Unchanged) ----
+ converted_msg.pose.covariance = msg.pose.covariance
+ converted_msg.twist.covariance = msg.twist.covariance
+
+ # ---- 4. Copy Linear Velocities ----
+ converted_msg.twist.twist.linear.x = msg.twist.twist.linear.x # m/s in x-direction
+ converted_msg.twist.twist.linear.y = msg.twist.twist.linear.y # m/s in x-direction
+ converted_msg.twist.twist.linear.z = msg.twist.twist.linear.z # keep original Z velocity
+
+ # ---- 5. Copy Angular Velocities ----
+ converted_msg.twist.twist.angular.x = msg.twist.twist.angular.x # copying over
+ converted_msg.twist.twist.angular.y = msg.twist.twist.angular.y # copying over
+ converted_msg.twist.twist.angular.z = msg.twist.twist.angular.z # rad/s, heading change rate
+
+ return converted_msg
+
+ def convert_NAND_state(self, msg):
+ """
+ Converts self/raw_state in NAND namespace to clean state units and structure
+ Takes in ROS message in nav_msgs/Odometry format
+ """
+
+ converted_msg = Odometry()
+ converted_msg.header = msg.header
+
+ # ---- 1. Directly use UTM Coordinates ----
+ converted_msg.pose.pose.position.x = msg.pose.pose.position.x # UTM Easting
+ converted_msg.pose.pose.position.y = msg.pose.pose.position.y # UTM Northing
+ converted_msg.pose.pose.position.z = msg.pose.pose.position.z # UTM Altitude
+
+ # ---- 2. Orientation in Radians ----
+ converted_msg.pose.pose.orientation.x = msg.pose.pose.orientation.x
+ converted_msg.pose.pose.orientation.y = msg.pose.pose.orientation.y
+ converted_msg.pose.pose.orientation.z = msg.pose.pose.orientation.z
+ converted_msg.pose.pose.orientation.w = 0.0 # fourth (quaternion) term irrelevant for euler angles
+
+ # ---- 3. Copy Covariances (Unchanged) ----
+ converted_msg.pose.covariance = msg.pose.covariance
+ converted_msg.twist.covariance = msg.twist.covariance
+
+ # ---- 4. Linear Velocities in m/s ----
+ # Convert scalar speed to velocity x/y components using heading (orientation.z)
+ speed = msg.twist.twist.linear.x # m/s scalar velocity
+ heading = msg.pose.pose.orientation.z # heading in radians
+
+ # Calculate velocity components
+ converted_msg.twist.twist.linear.x = speed * np.cos(heading) # m/s in x-direction
+ converted_msg.twist.twist.linear.y = speed * np.sin(heading) # m/s in y-direction
+ converted_msg.twist.twist.linear.z = 0.0
+
+ # ---- 5. Copy Angular Velocities ----
+ converted_msg.twist.twist.angular.x = msg.twist.twist.angular.x # copying over
+ converted_msg.twist.twist.angular.y = msg.twist.twist.angular.y # copying over
+ converted_msg.twist.twist.angular.z = msg.twist.twist.angular.z # rad/s, heading change rate
+
+ return converted_msg
+
+ def convert_NAND_other_state(self, msg):
+ """ Converts other/raw_state in SC namespace (NAND data) to clean state units and structure """
+ converted_msg = Odometry()
+ converted_msg.header = msg.header
+
+ # ---- 1. Directly use UTM Coordinates ----
+ converted_msg.pose.pose.position.x = msg.x # UTM Easting
+ converted_msg.pose.pose.position.y = msg.y # UTM Northing
+ converted_msg.pose.pose.position.z = msg.z # UTM Altitude (not provided in other/raw_state, defaults to 0.0)
+
+ # ---- 2. Orientation in Radians ----
+ converted_msg.pose.pose.orientation.x = msg.roll # (roll not provided in other/raw_state, defaults to 0.0)
+ converted_msg.pose.pose.orientation.y = msg.pitch # (pitch not provided in other/raw_state, defaults to 0.0)
+ converted_msg.pose.pose.orientation.z = msg.heading # heading in radians
+ converted_msg.pose.pose.orientation.w = 0.0 # fourth quaternion term irrelevant for euler angles
+
+ # ---- 3. Copy Covariances (Unchanged) ----
+ converted_msg.pose.covariance = msg.pose_covariance # (not provided in other/raw_state)
+ converted_msg.twist.covariance = msg.twist_covariance # (not provided in other/raw_state)
+
+ # ---- 4. Linear Velocities in m/s ----
+ # Convert scalar speed to velocity x/y components using heading (msg.heading)
+ speed = msg.speed # m/s scalar velocity
+ heading = msg.heading # heading in radians
+
+ # Calculate velocity components
+ converted_msg.twist.twist.linear.x = speed * np.cos(heading) # m/s in x-direction
+ converted_msg.twist.twist.linear.y = speed * np.sin(heading) # m/s in y-direction
+ converted_msg.twist.twist.linear.z = 0.0
+
+ # ---- 5. Angular Velocities ----
+ converted_msg.twist.twist.angular.x = msg.roll_rate # (roll rate not provided in other/raw_state, defaults to 0.0)
+ converted_msg.twist.twist.angular.y = msg.pitch_rate # (pitch rate not provided in other/raw_state, defaults to 0.0)
+ converted_msg.twist.twist.angular.z = msg.heading_rate # rad/s, heading change rate
+
+ return converted_msg
+
+
+def main(args=None):
+ rclpy.init(args=args)
+
+ # Create the BuggyStateConverter node and spin it
+ state_converter = BuggyStateConverter()
+ rclpy.spin(state_converter)
+
+ # Shutdown when done
+ state_converter.destroy_node()
+ rclpy.shutdown()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/rb_ws/src/buggy/scripts/controller/controller_node.py b/rb_ws/src/buggy/scripts/controller/controller_node.py
new file mode 100755
index 00000000..3094ca84
--- /dev/null
+++ b/rb_ws/src/buggy/scripts/controller/controller_node.py
@@ -0,0 +1,164 @@
+#!/usr/bin/env python3
+
+import threading
+import sys
+
+import numpy as np
+
+import rclpy
+from rclpy.node import Node
+
+from std_msgs.msg import Float32, Float64, Bool
+from nav_msgs.msg import Odometry
+from buggy.msg import TrajectoryMsg
+
+sys.path.append("/rb_ws/src/buggy/scripts")
+from util.trajectory import Trajectory
+from controller.stanley_controller import StanleyController
+
+class Controller(Node):
+
+ def __init__(self):
+ """
+ Constructor for Controller class.
+
+ Creates a ROS node with a publisher that periodically sends a message
+ indicating whether the node is still alive.
+
+ """
+ super().__init__('controller')
+ self.get_logger().info('INITIALIZED.')
+
+
+ #Parameters
+ self.declare_parameter("dist", 0.0) #Starting Distance along path
+ start_dist = self.get_parameter("dist").value
+
+ self.declare_parameter("traj_name", "buggycourse_safe.json")
+ traj_name = self.get_parameter("traj_name").value
+ self.cur_traj = Trajectory(json_filepath="/rb_ws/src/buggy/paths/" + traj_name) #TODO: Fixed filepath, not good
+
+ start_index = self.cur_traj.get_index_from_distance(start_dist)
+
+ self.declare_parameter("controller_name", "stanley")
+
+ controller_name = self.get_parameter("controller_name").value
+ print(controller_name.lower)
+ if (controller_name.lower() == "stanley"):
+ self.controller = StanleyController(start_index = start_index, namespace = self.get_namespace(), node=self) #IMPORT STANLEY
+ else:
+ self.get_logger().error("Invalid Controller Name: " + controller_name.lower())
+ raise Exception("Invalid Controller Argument")
+
+ # Publishers
+ self.init_check_publisher = self.create_publisher(Bool,
+ "debug/init_safety_check", 1
+ )
+ self.steer_publisher = self.create_publisher(
+ Float64, "input/steering", 1
+ )
+ self.heading_publisher = self.create_publisher(
+ Float32, "debug/heading", 1
+ )
+
+ # Subscribers
+ self.odom_subscriber = self.create_subscription(Odometry, 'self/state', self.odom_listener, 1)
+ self.traj_subscriber = self.create_subscription(TrajectoryMsg, 'self/cur_traj', self.traj_listener, 1)
+
+ self.lock = threading.Lock()
+
+ self.odom = None
+ self.passed_init = False
+
+ timer_period = 0.01 # seconds (100 Hz)
+ self.timer = self.create_timer(timer_period, self.loop)
+
+ def odom_listener(self, msg : Odometry):
+ '''
+ This is the subscriber that updates the buggies state for navigation
+ msg, should be a CLEAN state as defined in the wiki
+ '''
+ with self.lock:
+ self.odom = msg
+
+ def traj_listener(self, msg):
+ '''
+ This is the subscriber that updates the buggies trajectory for navigation
+ '''
+ with self.lock:
+ self.cur_traj, self.controller.current_traj_index = Trajectory.unpack(msg)
+
+ def init_check(self):
+ """
+ Checks if it's safe to switch the buggy into autonomous driving mode.
+ Specifically, it checks:
+ if we can recieve odometry messages from the buggy
+ if the covariance is acceptable (less than 1 meter)
+ if the buggy thinks it is facing in the correct direction wrt the local trajectory (not 180 degrees flipped)
+
+ Returns:
+ A boolean describing the status of the buggy (safe for auton or unsafe for auton)
+ """
+ if (self.odom == None):
+ self.get_logger().warn("WARNING: no available position estimate")
+ return False
+
+ elif (self.odom.pose.covariance[0] ** 2 + self.odom.pose.covariance[7] ** 2 > 1**2):
+ self.get_logger().warn("checking position estimate certainty")
+ return False
+
+ #Originally under a lock, doesn't seem necessary?
+ current_heading = self.odom.pose.pose.orientation.z
+ closest_heading = self.cur_traj.get_heading_by_index(self.cur_traj.get_closest_index_on_path(self.odom.pose.pose.position.x, self.odom.pose.pose.position.y))
+
+ self.get_logger().info("current heading: " + str(np.rad2deg(current_heading)))
+ msg = Float32()
+ msg.data = np.rad2deg(current_heading)
+ self.heading_publisher.publish(msg)
+
+ #Converting headings from [-pi, pi] to [0, 2pi]
+ if (current_heading < 0):
+ current_heading = 2 * np.pi + current_heading
+ if (closest_heading < 0):
+ closest_heading = 2 * np.pi + closest_heading
+
+ if (abs(current_heading - closest_heading) >= np.pi/2):
+ self.get_logger().warn("WARNING: INCORRECT HEADING! restart stack. Current heading [-180, 180]: " + str(np.rad2deg(current_heading)))
+ return False
+
+ return True
+
+ def loop(self):
+ if not self.passed_init:
+ self.passed_init = self.init_check()
+ msg = Bool()
+ msg.data = self.passed_init
+ self.init_check_publisher.publish(msg)
+ if self.passed_init:
+ self.get_logger().info("Passed Initialization Check")
+ else:
+ return
+
+ self.heading_publisher.publish(Float32(data=np.rad2deg(self.odom.pose.pose.orientation.z)))
+ steering_angle = self.controller.compute_control(self.odom, self.cur_traj)
+ steering_angle_deg = np.rad2deg(steering_angle)
+ self.steer_publisher.publish(Float64(data=float(steering_angle_deg.item())))
+
+
+
+def main(args=None):
+ rclpy.init(args=args)
+
+ controller = Controller()
+
+ rclpy.spin(controller)
+
+ # Destroy the node explicitly
+ # (optional - otherwise it will be done automatically
+ # when the garbage collector destroys the node object)
+ controller.destroy_node()
+ rclpy.shutdown()
+
+
+if __name__ == '__main__':
+ main()
\ No newline at end of file
diff --git a/rb_ws/src/buggy/scripts/controller/controller_superclass.py b/rb_ws/src/buggy/scripts/controller/controller_superclass.py
new file mode 100755
index 00000000..731c96ee
--- /dev/null
+++ b/rb_ws/src/buggy/scripts/controller/controller_superclass.py
@@ -0,0 +1,49 @@
+from abc import ABC, abstractmethod
+import sys
+
+from nav_msgs.msg import Odometry
+
+
+sys.path.append("/rb_ws/src/buggy/scripts")
+from util.trajectory import Trajectory
+
+class Controller(ABC):
+ """
+ Base class for all controllers.
+
+ The controller takes in the current state of the buggy and a reference
+ trajectory. It must then compute the desired control output.
+
+ The method that it does this by is up to the implementation, of course.
+ Example schemes include Pure Pursuit, Stanley, and LQR.
+ """
+
+ # TODO: move this to a constants class
+ NAND_WHEELBASE = 1.3
+ SC_WHEELBASE = 1.104
+
+ def __init__(self, start_index: int, namespace : str, node) -> None:
+ self.namespace = namespace
+ if namespace.upper() == '/NAND':
+ Controller.WHEELBASE = self.NAND_WHEELBASE
+ else:
+ Controller.WHEELBASE = self.SC_WHEELBASE
+
+ self.current_traj_index = start_index
+ self.node = node
+
+ @abstractmethod
+ def compute_control(
+ self, state_msg: Odometry, trajectory: Trajectory,
+ ) -> float:
+ """
+ Computes the desired control output given the current state and reference trajectory
+
+ Args:
+ state: (Odometry): state of the buggy, including position, attitude and associated twists
+ trajectory (Trajectory): reference trajectory
+
+ Returns:
+ float (desired steering angle)
+ """
+ raise NotImplementedError
\ No newline at end of file
diff --git a/rb_ws/src/buggy/scripts/controller/stanley_controller.py b/rb_ws/src/buggy/scripts/controller/stanley_controller.py
new file mode 100755
index 00000000..ce9b8113
--- /dev/null
+++ b/rb_ws/src/buggy/scripts/controller/stanley_controller.py
@@ -0,0 +1,150 @@
+import sys
+import numpy as np
+
+from sensor_msgs.msg import NavSatFix
+from geometry_msgs.msg import Pose as ROSPose
+from nav_msgs.msg import Odometry
+
+sys.path.append("/rb_ws/src/buggy/scripts")
+from util.trajectory import Trajectory
+from controller.controller_superclass import Controller
+from util.pose import Pose
+
+import utm
+
+
+class StanleyController(Controller):
+ """
+ Stanley Controller (front axle used as reference point)
+ Referenced from this paper: https://ai.stanford.edu/~gabeh/papers/hoffmann_stanley_control07.pdf
+ """
+
+ LOOK_AHEAD_DIST_CONST = 0.05 # s
+ MIN_LOOK_AHEAD_DIST = 0.1 #m
+ MAX_LOOK_AHEAD_DIST = 2.0 #m
+
+ CROSS_TRACK_GAIN = 1.3
+ K_SOFT = 1.0 # m/s
+ K_D_YAW = 0.012 # rad / (rad/s)
+
+ def __init__(self, start_index, namespace, node):
+ super(StanleyController, self).__init__(start_index, namespace, node)
+ self.debug_reference_pos_publisher = self.node.create_publisher(
+ NavSatFix, "controller/debug/reference_navsat", 1
+ )
+ self.debug_error_publisher = self.node.create_publisher(
+ ROSPose, "controller/debug/stanley_error", 1
+ )
+
+ def compute_control(self, state_msg : Odometry, trajectory : Trajectory):
+ """Computes the steering angle determined by Stanley controller.
+ Does this by looking at the crosstrack error + heading error
+
+ Args:
+ state_msg: ros Odometry message
+ trajectory (Trajectory): reference trajectory
+
+ Returns:
+ float (desired steering angle)
+ """
+ if self.current_traj_index >= trajectory.get_num_points() - 1:
+ self.node.get_logger.error("[Stanley]: Ran out of path to follow!")
+ raise Exception("[Stanley]: Ran out of path to follow!")
+
+ current_rospose = state_msg.pose.pose
+ current_speed = np.sqrt(
+ state_msg.twist.twist.linear.x**2 + state_msg.twist.twist.linear.y**2
+ )
+ yaw_rate = state_msg.twist.twist.angular.z
+ heading = current_rospose.orientation.z
+ x, y = current_rospose.position.x, current_rospose.position.y #(Easting, Northing)
+
+ front_x = x + StanleyController.WHEELBASE * np.cos(heading)
+ front_y = y + StanleyController.WHEELBASE * np.sin(heading)
+
+ # setting range of indices to search so we don't have to search the entire path
+ traj_index = trajectory.get_closest_index_on_path(
+ front_x,
+ front_y,
+ start_index=self.current_traj_index - 20,
+ end_index=self.current_traj_index + 50,
+ )
+ self.current_traj_index = max(traj_index, self.current_traj_index)
+
+
+ lookahead_dist = np.clip(
+ self.LOOK_AHEAD_DIST_CONST * current_speed,
+ self.MIN_LOOK_AHEAD_DIST,
+ self.MAX_LOOK_AHEAD_DIST)
+
+ traj_dist = trajectory.get_distance_from_index(self.current_traj_index) + lookahead_dist
+
+ ref_heading = trajectory.get_heading_by_index(
+ trajectory.get_index_from_distance(traj_dist))
+
+ error_heading = ref_heading - heading
+ error_heading = np.arctan2(np.sin(error_heading), np.cos(error_heading)) #Bounds error_heading
+
+ # Calculate cross track error by finding the distance from the buggy to the tangent line of
+ # the reference trajectory
+ closest_position = trajectory.get_position_by_index(self.current_traj_index)
+ next_position = trajectory.get_position_by_index(
+ self.current_traj_index + 0.0001
+ )
+ x1 = closest_position[0]
+ y1 = closest_position[1]
+ x2 = next_position[0]
+ y2 = next_position[1]
+ error_dist = -((x - x1) * (y2 - y1) - (y - y1) * (x2 - x1)) / np.sqrt(
+ (y2 - y1) ** 2 + (x2 - x1) ** 2
+ )
+
+
+ cross_track_component = -np.arctan2(
+ StanleyController.CROSS_TRACK_GAIN * error_dist, current_speed + StanleyController.K_SOFT
+ )
+
+ # Calculate yaw rate error
+ r_meas = yaw_rate
+ r_traj = current_speed * (trajectory.get_heading_by_index(trajectory.get_index_from_distance(traj_dist) + 0.05)
+ - trajectory.get_heading_by_index(trajectory.get_index_from_distance(traj_dist))) / 0.05
+
+ #Determine steering_command
+ steering_cmd = error_heading + cross_track_component + StanleyController.K_D_YAW * (r_traj - r_meas)
+ steering_cmd = np.clip(steering_cmd, -np.pi / 9, np.pi / 9)
+
+
+ #Calculate error, where x is in orientation of buggy, y is cross track error
+ current_pose = Pose(current_rospose.position.x, current_rospose.position.y, heading)
+ reference_error = current_pose.convert_point_from_global_to_local_frame(closest_position)
+ reference_error -= np.array([StanleyController.WHEELBASE, 0]) #Translae back to back wheel to get accurate error
+
+ error_pose = ROSPose()
+ error_pose.position.x = reference_error[0]
+ error_pose.position.y = reference_error[1]
+ self.debug_error_publisher.publish(error_pose)
+
+ # Publish reference position for debugging
+ try:
+ reference_navsat = NavSatFix()
+ lat, lon = utm.to_latlon(closest_position[0], closest_position[1], 17, "T")
+ reference_navsat.latitude = lat
+ reference_navsat.longitude = lon
+ self.debug_reference_pos_publisher.publish(reference_navsat)
+ except Exception as e:
+ self.node.get_logger().warn(
+ "[Stanley] Unable to convert closest track position lat lon; Error: " + str(e)
+ )
+
+ return steering_cmd
+
+
+
+
+
+
+
+
+
+
+
diff --git a/rb_ws/src/buggy/buggy/hello_world.py b/rb_ws/src/buggy/scripts/hello_world.py
old mode 100644
new mode 100755
similarity index 50%
rename from rb_ws/src/buggy/buggy/hello_world.py
rename to rb_ws/src/buggy/scripts/hello_world.py
index 0a119aa0..0a2d9ba7
--- a/rb_ws/src/buggy/buggy/hello_world.py
+++ b/rb_ws/src/buggy/scripts/hello_world.py
@@ -1,5 +1,7 @@
+#!/usr/bin/env python3
+
def main():
- print('Hi from buggy. Hello again')
+ print('Hi from buggy.')
if __name__ == '__main__':
diff --git a/rb_ws/src/buggy/scripts/path_planner/path_planner.py b/rb_ws/src/buggy/scripts/path_planner/path_planner.py
new file mode 100755
index 00000000..a42e6a1d
--- /dev/null
+++ b/rb_ws/src/buggy/scripts/path_planner/path_planner.py
@@ -0,0 +1,247 @@
+#!/usr/bin/env python3
+
+import sys
+from threading import Lock
+
+import numpy as np
+import rclpy
+from rclpy.node import Node
+
+from nav_msgs.msg import Odometry
+from std_msgs.msg import Float64
+from geometry_msgs.msg import Pose
+from buggy.msg import TrajectoryMsg
+
+sys.path.append("/rb_ws/src/buggy/scripts")
+from util.trajectory import Trajectory
+
+class PathPlanner(Node):
+ """
+ Class to generate new trajectory splices for SC autonomous system.
+
+ Takes in a default trajectory and an inner curb trajectory.
+
+ """
+
+ # move the curb towards the center of the course by CURB_MARGIN meters
+ # for a margin of safety
+ CURB_MARGIN = 1 #m
+
+ # the offset is calculated as a mirrored sigmoid function of distance
+ OFFSET_SCALE_CROSS_TRACK = 2 #m
+ OFFSET_SCALE_ALONG_TRACK = 0.2
+ ACTIVATE_OTHER_SCALE_ALONG_TRACK = 0.1
+ OFFSET_SHIFT_ALONG_TRACK = 4 #m
+
+ # number of meters ahead of the buggy to generate local trajectory for
+ LOCAL_TRAJ_LEN = 50#m
+
+ # start generating local trajectory this many meters ahead of current position
+ LOOKAHEAD = 2#m
+
+ # number of points to sample along the nominal trajectory
+ RESOLUTION = 150
+
+ # frequency to run in hz
+ FREQUENCY = 10
+
+ def __init__(self) -> None:
+ super().__init__('path_planner')
+ self.get_logger().info('INITIALIZED.')
+
+
+ #Parameters
+ self.declare_parameter("traj_name", "buggycourse_safe.json")
+ traj_name = self.get_parameter("traj_name").value
+ self.nominal_traj = Trajectory(json_filepath="/rb_ws/src/buggy/paths/" + traj_name) #TODO: Fixed filepath, not good
+
+ self.declare_parameter("curb_name", "")
+ curb_name = self.get_parameter("curb_name").value
+ curb_name = None if curb_name == "" else curb_name
+ if curb_name is None:
+ self.left_curb = None
+ else:
+ self.left_curb = Trajectory(json_filepath="/rb_ws/src/buggy/paths/" + curb_name) #TODO: Fixed filepath, not good
+
+ #Publishers
+ self.other_buggy_xtrack_publisher = self.create_publisher(Float64, "debug/other_buggy_xtrack", 10)
+ self.traj_publisher = self.create_publisher(TrajectoryMsg, "self/cur_traj", 10)
+
+ #Subscribers
+ self.self_pose_subscriber = self.create_subscription(Odometry, 'self/state', self.self_pose_callback, 1)
+ self.other_pose_subscriber = self.create_subscription(Odometry, 'other/state', self.other_pose_callback, 1)
+
+ self.timer = self.create_timer(1/self.FREQUENCY, self.timer_callback)
+
+ # set up subscriber and callback for pose variables
+ self.self_pose = None
+ self.other_pose = None
+
+ self.self_pose_lock = Lock()
+ self.other_pose_lock = Lock()
+
+ def self_pose_callback(self, msg):
+ with self.self_pose_lock:
+ self.self_pose = msg.pose.pose
+
+ def other_pose_callback(self, msg):
+ with self.other_pose_lock:
+ self.other_pose = msg.pose.pose
+ self.get_logger().debug("Received position of other buggy.")
+
+ def timer_callback(self):
+ with self.self_pose_lock and self.other_pose_lock:
+ if (self.self_pose is not None) and (self.other_pose is not None):
+ self.compute_traj(self.self_pose, self.other_pose)
+
+ def offset_func(self, dist):
+ """
+ Args:
+ dist: (N, ) numpy array, distances between ego-buggy and obstacle,
+ along the trajectory
+ Returns:
+ (N, ) numpy array, offsets from nominal trajectory required to overtake,
+ defined by a sigmoid function
+ """
+
+ return self.OFFSET_SCALE_CROSS_TRACK / \
+ (1 + np.exp(-(-self.OFFSET_SCALE_ALONG_TRACK * dist +
+ self.OFFSET_SHIFT_ALONG_TRACK)))
+
+ def activate_other_crosstrack_func(self, dist):
+ """
+ Args:
+ dist: (N, ) numpy array, distances between ego-buggy and obstacle,
+ along the trajectory
+ Returns:
+ (N, ) numpy array, multiplier used to weigh the cross-track distance of
+ the obstacle into the passing offset calculation.
+ """
+ return 1 / \
+ (1 + np.exp(-(-self.ACTIVATE_OTHER_SCALE_ALONG_TRACK * dist +
+ self.OFFSET_SHIFT_ALONG_TRACK)))
+
+
+ def compute_traj(
+ self,
+ self_pose: Pose,
+ other_pose: Pose,
+ ) -> None:
+ """
+ draw trajectory starting at the current pose and ending at a fixed distance
+ ahead. For each trajectory point, calculate the required offset perpendicular to the nominal
+ trajectory. A sigmoid function of the distance along track to the other buggy is used to
+ weigh the other buggy's cross-track distance. This calculation produces a line that
+ allows the ego-buggy's trajectory to go through the other buggy. Since we want to pass
+ the other buggy at some constant distance to the left, another sigmoid function is multiplied
+ by that constant distance to produce a smooth trajectory that passes the other buggy.
+
+ Finally, the trajectory is bounded to the left by the left curb (if it exists), and to the right
+ by the nominal trajectory. (we never pass on the right)
+
+ passing offsets =
+ activate_other_crosstrack_func(distance to other buggy along track) *
+ other buggy cross track distance +
+ offset_func(distance to other buggy along track)
+
+ trajectory = nominal trajectory +
+ left nominal trajectory unit normal vector *
+ clamp(passing offsets, 0, distance from nominal trajectory to left curb)
+
+ Args:
+ other_pose (Pose): Pose containing NAND's easting (x),
+ northing(y), and heading (theta), in UTM
+
+ other_speed (float): speed in m/s of NAND
+ Publishes:
+ Trajectory: list of x,y coords for ego-buggy to follow.
+ """
+ # grab slice of nominal trajectory
+ nominal_idx = self.nominal_traj.get_closest_index_on_path(self_pose.position.x, self_pose.position.y)
+ nominal_dist_along = self.nominal_traj.get_distance_from_index(nominal_idx)
+
+ nominal_slice = np.empty((self.RESOLUTION, 2))
+
+ # compute the distance along nominal trajectory between samples and the obstacle
+ nominal_slice_dists = np.linspace(
+ nominal_dist_along + self.LOOKAHEAD,
+ nominal_dist_along + self.LOOKAHEAD + self.LOCAL_TRAJ_LEN,
+ self.RESOLUTION)
+
+ for i in range(self.RESOLUTION):
+ nominal_slice[i, :] = np.array(self.nominal_traj.get_position_by_distance(
+ nominal_slice_dists[i]
+ ))
+
+ # get index of the other buggy along the trajetory and convert to distance
+ other_idx = self.nominal_traj.get_closest_index_on_path(other_pose.position.x, other_pose.position.y)
+ other_dist = self.nominal_traj.get_distance_from_index(other_idx)
+ nominal_slice_to_other_dist = np.abs(nominal_slice_dists - other_dist)
+
+ passing_offsets = self.offset_func(nominal_slice_to_other_dist)
+
+ # compute signed cross-track distance between NAND and nominal
+ nominal_to_other = np.array((other_pose.position.x, other_pose.position.y)) - \
+ np.array(self.nominal_traj.get_position_by_index(other_idx))
+
+ # dot product with the unit normal to produce left-positive signed distance
+ other_normal = self.nominal_traj.get_unit_normal_by_index(np.array(other_idx.ravel()))
+ other_cross_track_dist = np.sum(
+ nominal_to_other * other_normal, axis=1)
+
+ self.other_buggy_xtrack_publisher.publish(Float64(data=float(other_cross_track_dist.item())))
+
+ # here, use passing offsets to weight NAND's cross track signed distance:
+ # if the sample point is far from SC, the cross track distance doesn't matter
+ # if the sample point is near, we add cross track distance to the offset,
+ # such that the resulting offset is adjusted by position of NAND
+
+ passing_offsets = passing_offsets + \
+ self.activate_other_crosstrack_func(nominal_slice_to_other_dist) * other_cross_track_dist
+
+ # clamp passing offset distances to distance to the curb
+ if self.left_curb is not None:
+ # grab slice of curb correponding to slice of nominal trajectory.
+ curb_idx = self.left_curb.get_closest_index_on_path(self_pose.position.x, self_pose.position.y)
+ curb_dist_along = self.left_curb.get_distance_from_index(curb_idx)
+ curb_idx_end = self.left_curb.get_closest_index_on_path(nominal_slice[-1, 0], nominal_slice[-1, 1])
+ curb_dist_along_end = self.left_curb.get_distance_from_index(curb_idx_end)
+ curb_dists = np.linspace(curb_dist_along, curb_dist_along_end, self.RESOLUTION)
+
+ curb_slice = np.empty((self.RESOLUTION, 2))
+ for i in range(self.RESOLUTION):
+ curb_slice[i, :] = np.array(self.left_curb.get_position_by_distance(
+ curb_dists[i]
+ ))
+
+ # compute distances from the sample points to the curb
+ nominal_slice_to_curb_dist = np.linalg.norm(curb_slice - nominal_slice, axis=1)
+ passing_offsets = np.minimum(passing_offsets, nominal_slice_to_curb_dist - self.CURB_MARGIN)
+
+ # clamp negative passing offsets to zero, since we always pass on the left,
+ # the passing offsets should never pull SC to the right.
+ passing_offsets = np.maximum(0, passing_offsets)
+
+ # shift the nominal slice by passing offsets
+ nominal_normals = self.nominal_traj.get_unit_normal_by_index(
+ self.nominal_traj.get_index_from_distance(nominal_slice_dists)
+ )
+ positions = nominal_slice + (passing_offsets[:, None] * nominal_normals)
+
+ local_traj = Trajectory(json_filepath=None, positions=positions)
+ self.traj_publisher.publish(local_traj.pack(self_pose.position.x, self_pose.position.y ))
+
+
+def main(args=None):
+ rclpy.init(args=args)
+ # TODO: set file paths here
+ # At the time of writing the below snippet, config management is TBD
+
+ path_planner = PathPlanner()
+ rclpy.spin(path_planner)
+
+ path_planner.destroy_node()
+ rclpy.shutdown()
+
+if __name__ == '__main__':
+ main()
\ No newline at end of file
diff --git a/rb_ws/src/buggy/scripts/serial/host_comm.py b/rb_ws/src/buggy/scripts/serial/host_comm.py
new file mode 100755
index 00000000..766247e3
--- /dev/null
+++ b/rb_ws/src/buggy/scripts/serial/host_comm.py
@@ -0,0 +1,312 @@
+import struct
+import time
+from dataclasses import dataclass
+
+from serial import Serial
+
+class Crc16:
+ def __init__(self):
+ self.accum = 0
+
+ def update(self, b):
+ if isinstance(b, int):
+ assert(0 <= b < 256)
+
+ crc_table = [
+ 0x0000, 0x8005, 0x800F, 0x000A, 0x801B, 0x001E, 0x0014, 0x8011,
+ 0x8033, 0x0036, 0x003C, 0x8039, 0x0028, 0x802D, 0x8027, 0x0022,
+ 0x8063, 0x0066, 0x006C, 0x8069, 0x0078, 0x807D, 0x8077, 0x0072,
+ 0x0050, 0x8055, 0x805F, 0x005A, 0x804B, 0x004E, 0x0044, 0x8041,
+ 0x80C3, 0x00C6, 0x00CC, 0x80C9, 0x00D8, 0x80DD, 0x80D7, 0x00D2,
+ 0x00F0, 0x80F5, 0x80FF, 0x00FA, 0x80EB, 0x00EE, 0x00E4, 0x80E1,
+ 0x00A0, 0x80A5, 0x80AF, 0x00AA, 0x80BB, 0x00BE, 0x00B4, 0x80B1,
+ 0x8093, 0x0096, 0x009C, 0x8099, 0x0088, 0x808D, 0x8087, 0x0082,
+ 0x8183, 0x0186, 0x018C, 0x8189, 0x0198, 0x819D, 0x8197, 0x0192,
+ 0x01B0, 0x81B5, 0x81BF, 0x01BA, 0x81AB, 0x01AE, 0x01A4, 0x81A1,
+ 0x01E0, 0x81E5, 0x81EF, 0x01EA, 0x81FB, 0x01FE, 0x01F4, 0x81F1,
+ 0x81D3, 0x01D6, 0x01DC, 0x81D9, 0x01C8, 0x81CD, 0x81C7, 0x01C2,
+ 0x0140, 0x8145, 0x814F, 0x014A, 0x815B, 0x015E, 0x0154, 0x8151,
+ 0x8173, 0x0176, 0x017C, 0x8179, 0x0168, 0x816D, 0x8167, 0x0162,
+ 0x8123, 0x0126, 0x012C, 0x8129, 0x0138, 0x813D, 0x8137, 0x0132,
+ 0x0110, 0x8115, 0x811F, 0x011A, 0x810B, 0x010E, 0x0104, 0x8101,
+ 0x8303, 0x0306, 0x030C, 0x8309, 0x0318, 0x831D, 0x8317, 0x0312,
+ 0x0330, 0x8335, 0x833F, 0x033A, 0x832B, 0x032E, 0x0324, 0x8321,
+ 0x0360, 0x8365, 0x836F, 0x036A, 0x837B, 0x037E, 0x0374, 0x8371,
+ 0x8353, 0x0356, 0x035C, 0x8359, 0x0348, 0x834D, 0x8347, 0x0342,
+ 0x03C0, 0x83C5, 0x83CF, 0x03CA, 0x83DB, 0x03DE, 0x03D4, 0x83D1,
+ 0x83F3, 0x03F6, 0x03FC, 0x83F9, 0x03E8, 0x83ED, 0x83E7, 0x03E2,
+ 0x83A3, 0x03A6, 0x03AC, 0x83A9, 0x03B8, 0x83BD, 0x83B7, 0x03B2,
+ 0x0390, 0x8395, 0x839F, 0x039A, 0x838B, 0x038E, 0x0384, 0x8381,
+ 0x0280, 0x8285, 0x828F, 0x028A, 0x829B, 0x029E, 0x0294, 0x8291,
+ 0x82B3, 0x02B6, 0x02BC, 0x82B9, 0x02A8, 0x82AD, 0x82A7, 0x02A2,
+ 0x82E3, 0x02E6, 0x02EC, 0x82E9, 0x02F8, 0x82FD, 0x82F7, 0x02F2,
+ 0x02D0, 0x82D5, 0x82DF, 0x02DA, 0x82CB, 0x02CE, 0x02C4, 0x82C1,
+ 0x8243, 0x0246, 0x024C, 0x8249, 0x0258, 0x825D, 0x8257, 0x0252,
+ 0x0270, 0x8275, 0x827F, 0x027A, 0x826B, 0x026E, 0x0264, 0x8261,
+ 0x0220, 0x8225, 0x822F, 0x022A, 0x823B, 0x023E, 0x0234, 0x8231,
+ 0x8213, 0x0216, 0x021C, 0x8219, 0x0208, 0x820D, 0x8207, 0x0202
+ ]
+
+ i = ((self.accum >> 8) ^ b) & 0xFF;
+ i %= 256
+ self.accum = ((self.accum << 8) & 0xFF00) ^ crc_table[i];
+ else:
+ for one_byte in b:
+ self.update(one_byte)
+
+SYNC_WORD = b'\xAA\xFF\x00\x55'
+
+MAX_PAYLOAD_LEN = 100
+
+
+# firmware --> software
+MSG_TYPE_NAND_DEBUG = b'ND'
+MSG_TYPE_NAND_UKF = b'NU'
+MSG_TYPE_NAND_GPS = b'NG'
+MSG_TYPE_RADIO = b'RA'
+MSG_TYPE_SC_DEBUG = b'SD'
+MSG_TYPE_SC_SENSORS = b'SS'
+MSG_TYPE_ROUNDTRIP_TIMESTAMP = b'RT'
+
+# software --> firmware
+MSG_TYPE_STEERING = b'ST'
+MSG_TYPE_ALARM = b'AL'
+MSG_TYPE_SOFTWARE_TIMESTAMP = b'TM'
+
+
+
+@dataclass
+class NANDDebugInfo:
+ # 64 bits
+ heading_rate: float # double
+ encoder_angle: float # double
+ # 32 bits
+ timestamp: int
+ rc_steering_angle: float
+ software_steering_angle: float
+ true_steering_angle: float
+ rfm69_timeout_num: int
+ # 8 bits
+ operator_ready: bool
+ brake_status: bool
+ auton_steer: bool
+ tx12_state: bool
+ stepper_alarm: int # unsigned char
+ rc_uplink_quality: int # uint8
+
+@dataclass
+class NANDUKF:
+ # 64 bits
+ easting: float # double
+ northing: float # double
+ theta: float # double
+ heading_rate: float # double
+ velocity: float # double
+ # 32 bits
+ timestamp: int
+
+@dataclass
+class NANDRawGPS:
+ # 64 bits
+ easting: float # double
+ northing: float # double
+ # this is a 2D accuracy value
+ accuracy: float # double
+ gps_time: int # uint64
+ # 32 bits
+ gps_seqnum: int
+ timestamp: int
+ # 8 bits
+ gps_fix: int # uint8
+
+@dataclass
+class Radio:
+ nand_east_gps: float
+ nand_north_gps: float
+ gps_seqnum: int
+ nand_gps_fix: int # uint8
+
+@dataclass
+class SCDebugInfo:
+ # 64 bits
+ encoder_angle: float # double
+ # 32 bits
+ rc_steering_angle: float
+ software_steering_angle: float
+ true_steering_angle: float
+ missed_packets: int
+ timestamp: int
+ # 8 bits
+ tx12_state: bool
+ operator_ready: bool
+ stepper_alarm: int # unsigned char
+ brake_status: bool
+ auton_steer: bool
+ rc_uplink_quality: int # uint8
+
+@dataclass
+class SCSensors:
+ # 64 bits
+ velocity: float # double
+ # 32 bits
+ steering_angle: float
+ timestamp: int
+
+@dataclass
+class RoundtripTimestamp:
+ returned_time: float # double
+
+
+class IncompletePacket(Exception):
+ pass
+
+class ChecksumMismatch(Exception):
+ pass
+
+class Comms:
+ def __init__(self, path_to_port):
+ self.port = Serial(path_to_port, 1_000_000)
+ self.rx_buffer = b''
+
+ def send_packet_raw(self, msg_type: bytes, payload: bytes):
+ assert(len(msg_type) == 2)
+ assert(len(payload) < MAX_PAYLOAD_LEN)
+
+ checksum = Crc16()
+ def write_and_checksum(data: bytes):
+ nonlocal checksum
+
+ self.port.write(data)
+ checksum.update(data)
+
+ self.port.write(SYNC_WORD)
+ write_and_checksum(msg_type)
+ write_and_checksum(len(payload).to_bytes(2, 'little'))
+ write_and_checksum(payload)
+ self.port.write(checksum.accum.to_bytes(2, 'little'))
+
+ def send_steering(self, angle: float):
+ self.send_packet_raw(MSG_TYPE_STEERING, struct.pack(' len(self.rx_buffer):
+ raise IncompletePacket()
+
+ data = self.rx_buffer[index:][:count]
+ index += count
+ return data
+
+ def read_and_checksum(count: int):
+ nonlocal checksum
+
+ data = read(count)
+ checksum.update(data)
+ return data
+
+ if read(1) != SYNC_WORD[0:1]:
+ self.rx_buffer = self.rx_buffer[1:]
+ continue
+ if read(1) != SYNC_WORD[1:2]:
+ self.rx_buffer = self.rx_buffer[1:]
+ continue
+ if read(1) != SYNC_WORD[2:3]:
+ self.rx_buffer = self.rx_buffer[1:]
+ continue
+ if read(1) != SYNC_WORD[3:4]:
+ self.rx_buffer = self.rx_buffer[1:]
+ continue
+
+ msg_type = read_and_checksum(2)
+ msg_len = int.from_bytes(read_and_checksum(2), 'little')
+
+ if msg_len > MAX_PAYLOAD_LEN:
+ self.rx_buffer = self.rx_buffer[1:]
+ continue
+
+ payload = read_and_checksum(msg_len)
+
+ rx_crc = int.from_bytes(read(2), 'little')
+
+ self.rx_buffer = self.rx_buffer[4 + 3 + 2 + msg_len + 2:]
+
+ if rx_crc != checksum.accum:
+ raise ChecksumMismatch()
+
+ return (msg_type, payload)
+
+ def read_packet(self):
+ packet = self.read_packet_raw()
+ if packet is None:
+ return None
+
+ msg_type, payload = packet
+ if msg_type == MSG_TYPE_NAND_DEBUG:
+ data = struct.unpack(' 0.01:
+ print(packet)
+ last_time = time.time()
+ comms.send_steering(1234.5)
+
+if __name__ == '__main__':
+ main()
\ No newline at end of file
diff --git a/rb_ws/src/buggy/scripts/serial/ros_to_bnyahaj.py b/rb_ws/src/buggy/scripts/serial/ros_to_bnyahaj.py
new file mode 100755
index 00000000..0efffb61
--- /dev/null
+++ b/rb_ws/src/buggy/scripts/serial/ros_to_bnyahaj.py
@@ -0,0 +1,311 @@
+#!/usr/bin/env python3
+
+import argparse
+from threading import Lock
+import threading
+import rclpy
+from host_comm import *
+from rclpy.node import Node
+
+from std_msgs.msg import Float64, Int8, Int32, UInt8, Bool, UInt64
+from nav_msgs.msg import Odometry
+
+class Translator(Node):
+ """
+ Translates the output from bnyahaj serial (interpreted from host_comm) to ros topics and vice versa.
+ Performs reading (from Bnya Serial) and writing (from Ros Topics) on different python threads, so
+ be careful of multithreading synchronizaiton issues.
+ """
+
+ def __init__(self, teensy_name):
+ """
+ teensy_name: required for communication, different for SC and NAND
+
+ Initializes the subscribers, rates, and ros topics (including debug topics)
+ """
+
+ super().__init__('ROS_serial_translator')
+
+ self.comms = Comms("/dev/" + teensy_name)
+ namespace = self.get_namespace()
+ if namespace == "/SC":
+ self.self_name = "SC"
+ else:
+ self.self_name = "NAND"
+
+ self.steer_angle = 0
+ self.alarm = 0
+ self.fresh_steer = False
+ self.lock = Lock()
+
+ self.create_subscription(
+ Float64, "/input/steering", self.set_steering, 1
+ )
+ self.create_subscription(Int8, "/input/sanity_warning", self.set_alarm, 1)
+
+ # upper bound of steering update rate, make sure auton sends slower than 500 Hz or update / 2ms
+ self.steer_send_rate = self.create_rate(500)
+
+ # upper bound of reading data from Bnyahaj Serial, at 1ms
+ self.read_rate = self.create_rate(1000)
+
+ # DEBUG MESSAGE PUBLISHERS:
+ self.heading_rate_publisher = self.create_publisher(
+ Float64, "/debug/heading_rate", 1
+ )
+ self.encoder_angle_publisher = self.create_publisher(
+ Float64, "/debug/encoder_angle", 1
+ )
+ self.rc_steering_angle_publisher = self.create_publisher(
+ Float64, "/debug/rc_steering_angle", 1
+ )
+ self.software_steering_angle_publisher = self.create_publisher(
+ Float64, "/debug/software_steering_angle", 1
+ )
+ self.true_steering_angle_publisher = self.create_publisher(
+ Float64, "/debug/true_steering_angle", 1
+ )
+ self.rfm69_timeout_num_publisher = self.create_publisher(
+ Int32, "/debug/rfm_timeout_num", 1
+ )
+ self.operator_ready_publisher = self.create_publisher(
+ Bool, "/debug/operator_ready", 1
+ )
+ self.brake_status_publisher = self.create_publisher(
+ Bool, "/debug/brake_status", 1
+ )
+ self.use_auton_steer_publisher = self.create_publisher(
+ Bool, "/debug/use_auton_steer", 1
+ )
+ self.tx12_state_publisher = self.create_publisher(
+ Bool, "/debug/tx12_connected", 1
+ )
+ self.stepper_alarm_publisher = self.create_publisher(
+ UInt8, "/debug/steering_alarm", 1
+ )
+ self.rc_uplink_qual_publisher = self.create_publisher(
+ UInt8, "/debug/rc_uplink_quality", 1
+ )
+ self.nand_gps_seqnum_publisher = self.create_publisher(
+ Int32, "/debug/NAND_gps_seqnum", 1
+ )
+
+ # SERIAL DEBUG PUBLISHERS
+ self.roundtrip_time_publisher = self.create_publisher(
+ Float64, "/debug/roundtrip_time", 1
+ )
+
+ if self.self_name == "NAND":
+ # NAND POSITION PUBLISHERS
+ self.nand_ukf_odom_publisher = self.create_publisher(
+ Odometry, "/raw_state", 1
+ )
+ self.nand_gps_odom_publisher = self.create_publisher(
+ Odometry, "/debug/gps_odom", 1
+ )
+
+ self.nand_gps_fix_publisher = self.create_publisher(
+ UInt8, "/debug/gps_fix", 1
+ )
+ self.nand_gps_acc_publisher = self.create_publisher(
+ Float64, "/debug/gps_accuracy", 1
+ )
+
+ self.nand_gps_time_publisher = self.create_publisher(
+ UInt64, "/debug/gps_time", 1
+ )
+
+ if self.self_name == "SC":
+
+ # SC SENSOR PUBLISHERS
+ self.sc_velocity_publisher = self.create_publisher(
+ Float64, "/sensors/velocity", 1
+ )
+ self.sc_steering_angle_publisher = self.create_publisher(
+ Float64, "/sensors/steering_angle", 1
+ )
+
+ # RADIO DATA PUBLISHER
+ self.observed_nand_odom_publisher = self.create_publisher(
+ Odometry, "/NAND_raw_state", 1
+ )
+
+
+ def set_alarm(self, msg):
+ """
+ alarm ros topic reader, locked so that only one of the setters runs at once
+ """
+ with self.lock:
+ self.get_logger().debug(f"Reading alarm of {msg.data}")
+ self.alarm = msg.data
+
+ def set_steering(self, msg):
+ """
+ Steering Angle Updater, updates the steering angle locally if updated on ros stopic
+ """
+ self.get_logger().debug(f"Read steering angle of: {msg.data}")
+ with self.lock:
+ self.steer_angle = msg.data
+ self.fresh_steer = True
+
+
+ def writer_thread(self):
+ """
+ Sends ROS Topics to bnayhaj serial, only sends a steering angle when we receive a fresh one
+ Will send steering and alarm node.
+ """
+ self.get_logger().info("Starting sending alarm and steering to teensy!")
+
+ while rclpy.ok():
+ if self.fresh_steer:
+ with self.lock:
+ self.comms.send_steering(self.steer_angle)
+ self.get_logger().debug(f"Sent steering angle of: {self.steer_angle}")
+ self.fresh_steer = False
+
+ with self.lock:
+ self.comms.send_alarm(self.alarm)
+ with self.lock:
+ self.comms.send_timestamp(time.time())
+
+ self.steer_send_rate.sleep()
+
+ def reader_thread(self):
+ self.get_logger().info("Starting reading odom from teensy!")
+ while rclpy.ok():
+ packet = self.comms.read_packet()
+ self.get_logger().debug("packet" + str(packet))
+
+ if isinstance(packet, NANDDebugInfo):
+ self.heading_rate_publisher.publish(packet.heading_rate)
+ self.encoder_angle_publisher.publish(packet.encoder_angle)
+ self.rc_steering_angle_publisher.publish(packet.rc_steering_angle)
+ self.software_steering_angle_publisher.publish(packet.software_steering_angle)
+ self.true_steering_angle_publisher.publish(packet.true_steering_angle)
+ self.rfm69_timeout_num_publisher.publish(packet.rfm69_timeout_num)
+ self.operator_ready_publisher.publish(packet.operator_ready)
+ self.brake_status_publisher.publish(packet.brake_status)
+ self.use_auton_steer_publisher.publish(packet.auton_steer)
+ self.tx12_state_publisher.publish(packet.tx12_state)
+ self.stepper_alarm_publisher.publish(packet.stepper_alarm)
+ self.rc_uplink_qual_publisher.publish(packet.rc_uplink_quality)
+ self.get_logger().debug(f'NAND Debug Timestamp: {packet.timestamp}')
+ elif isinstance(packet, NANDUKF):
+ odom = Odometry()
+ odom.pose.pose.position.x = packet.easting
+ odom.pose.pose.position.y = packet.northing
+ odom.pose.pose.orientation.z = packet.theta
+
+ odom.twist.twist.linear.x = packet.velocity
+ odom.twist.twist.angular.z = packet.heading_rate
+
+ self.nand_ukf_odom_publisher.publish(odom)
+ self.get_logger().debug(f'NAND UKF Timestamp: {packet.timestamp}')
+
+
+ elif isinstance(packet, NANDRawGPS):
+ odom = Odometry()
+ odom.pose.pose.position.x = packet.easting
+ odom.pose.pose.position.y = packet.northing
+ odom.pose.pose.orientation.z = 0
+ odom.twist.twist.linear.x = 0
+ odom.twist.twist.linear.y = 0
+ odom.twist.twist.angular.z = 0
+
+ self.nand_gps_odom_publisher.publish(odom)
+ self.nand_gps_fix_publisher.publish(packet.gps_fix)
+ self.nand_gps_acc_publisher.publish(packet.accuracy)
+ self.nand_gps_seqnum_publisher.publish(packet.gps_seqnum)
+ self.nand_gps_time_publisher.publish(packet.gps_time)
+ self.get_logger().debug(f'NAND Raw GPS Timestamp: {packet.timestamp}')
+
+
+ # this packet is received on Short Circuit
+ elif isinstance(packet, Radio):
+
+ # Publish to odom topic x and y coord
+ odom = Odometry()
+
+ odom.pose.pose.position.x = packet.nand_east_gps
+ odom.pose.pose.position.y = packet.nand_north_gps
+ self.observed_nand_odom_publisher.publish(odom)
+ self.nand_gps_seqnum_publisher.publish(packet.gps_seqnum)
+
+
+
+
+ elif isinstance(packet, SCDebugInfo):
+ self.encoder_angle_publisher.publish(packet.encoder_angle)
+ self.rc_steering_angle_publisher.publish(packet.rc_steering_angle)
+ self.software_steering_angle_publisher.publish(packet.software_steering_angle)
+ self.true_steering_angle_publisher.publish(packet.true_steering_angle)
+ self.operator_ready_publisher.publish(packet.operator_ready)
+ self.brake_status_publisher.publish(packet.brake_status)
+ self.use_auton_steer_publisher.publish(packet.auton_steer)
+ self.tx12_state_publisher.publish(packet.tx12_state)
+ self.stepper_alarm_publisher.publish(packet.stepper_alarm)
+ self.rc_uplink_qual_publisher.publish(packet.rc_uplink_quality)
+ self.get_logger().debug(f'SC Debug Timestamp: {packet.timestamp}')
+
+
+ elif isinstance(packet, SCSensors):
+ self.sc_velocity_publisher.publish(packet.velocity)
+ self.sc_steering_angle_publisher.publish(packet.steering_angle)
+ self.get_logger().debug(f'SC Sensors Timestamp: {packet.timestamp}')
+
+
+ elif isinstance(packet, RoundtripTimestamp):
+ self.roundtrip_time_publisher.publish(time.time() - packet.returned_time)
+
+ self.read_rate.sleep()
+
+ def loop(self):
+ """
+ Initialies the reader and writer thread, should theoretically never finish as there are while loops
+ """
+ p1 = threading.Thread(target=self.writer_thread)
+ p2 = threading.Thread(target=self.reader_thread)
+
+ p1.start()
+ p2.start()
+
+ p1.join()
+ p2.join()
+
+
+# Initializes ros nodes, using self and other name
+# other name is not requires, and if not submitted, use NONE
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ "--self_name", type=str, help="name of ego-buggy", required=True
+ )
+ parser.add_argument(
+ "--other_name",
+ type=str,
+ help="name of other buggy",
+ required=False,
+ default=None,
+ )
+ parser.add_argument(
+ "--teensy_name", type=str, help="name of teensy port", required=True
+ )
+ args, _ = parser.parse_known_args()
+ self_name = args.self_name
+ other_name = args.other_name
+ teensy_name = args.teensy_name
+
+ rclpy.init()
+
+ translate = Translator(self_name, other_name, teensy_name)
+
+ if self_name == "SC" and other_name is None:
+ translate.get_logger().warn(
+ "Not reading NAND Odometry messages, double check roslaunch files for ros_to_bnyahaj"
+ )
+ elif other_name is None:
+ translate.get_logger().info("No other name passed in, presuming that this is NAND ")
+
+ rclpy.spin(translate)
+
+ rclpy.shutdown()
diff --git a/rb_ws/src/buggy/buggy/simulator/engine.py b/rb_ws/src/buggy/scripts/simulator/engine.py
old mode 100644
new mode 100755
similarity index 70%
rename from rb_ws/src/buggy/buggy/simulator/engine.py
rename to rb_ws/src/buggy/scripts/simulator/engine.py
index 361ae4bf..af2bdc5b
--- a/rb_ws/src/buggy/buggy/simulator/engine.py
+++ b/rb_ws/src/buggy/scripts/simulator/engine.py
@@ -1,5 +1,7 @@
#! /usr/bin/env python3
import threading
+import sys
+import time
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Pose, Twist, PoseWithCovariance, TwistWithCovariance
@@ -9,15 +11,11 @@
import numpy as np
import utm
+sys.path.append("/rb_ws/src/buggy/scripts")
+from util.constants import Constants
+
class Simulator(Node):
- # simulator constants:
- UTM_EAST_ZERO = 589702.87
- UTM_NORTH_ZERO = 4477172.947
- UTM_ZONE_NUM = 17
- UTM_ZONE_LETTER = "T"
- #TODO: make these values accurate
- WHEELBASE_SC = 1.17
- WHEELBASE_NAND= 1.17
+
def __init__(self):
super().__init__('sim_single')
@@ -25,8 +23,8 @@ def __init__(self):
self.starting_poses = {
- "Hill1_NAND": (Simulator.UTM_EAST_ZERO + 0, Simulator.UTM_NORTH_ZERO + 0, -110),
- "Hill1_SC": (Simulator.UTM_EAST_ZERO + 20, Simulator.UTM_NORTH_ZERO + 30, -110),
+ "Hill1_NAND": (Constants.UTM_EAST_ZERO + 0, Constants.UTM_NORTH_ZERO + 0, -110),
+ "Hill1_SC": (Constants.UTM_EAST_ZERO + 20, Constants.UTM_NORTH_ZERO + 30, -110),
"WESTINGHOUSE": (589647, 4477143, -150),
"UC_TO_PURNELL": (589635, 4477468, 160),
"UC": (589681, 4477457, 160),
@@ -39,57 +37,53 @@ def __init__(self):
"RACELINE_PASS": (589468.02, 4476993.07, -160),
}
- self.number_publisher = self.create_publisher(Float64, 'numbers', 1)
- self.i = 0
-
- # for X11 matplotlib (direction included)
- self.plot_publisher = self.create_publisher(Pose, "sim_2d/utm", 1)
-
- # simulate the INS's outputs (noise included)
- self.pose_publisher = self.create_publisher(Odometry, "nav/odom", 1)
-
- self.steering_subscriber = self.create_subscription(
- Float64, "buggy/input/steering", self.update_steering_angle, 1
- )
- # To read from velocity
- self.velocity_subscriber = self.create_subscription(
- Float64, "velocity", self.update_velocity, 1
- )
- self.navsatfix_noisy_publisher = self.create_publisher(
- NavSatFix, "state/pose_navsat_noisy", 1
- )
-
-
-
-
-
self.declare_parameter('velocity', 12)
if (self.get_namespace() == "/SC"):
self.buggy_name = "SC"
self.declare_parameter('pose', "Hill1_SC")
- self.wheelbase = Simulator.WHEELBASE_SC
+ self.wheelbase = Constants.WHEELBASE_SC
if (self.get_namespace() == "/NAND"):
self.buggy_name = "NAND"
self.declare_parameter('pose', "Hill1_NAND")
- self.wheelbase = Simulator.WHEELBASE_NAND
+ self.wheelbase = Constants.WHEELBASE_NAND
self.velocity = self.get_parameter("velocity").value
init_pose_name = self.get_parameter("pose").value
+
self.init_pose = self.starting_poses[init_pose_name]
self.e_utm, self.n_utm, self.heading = self.init_pose
self.steering_angle = 0 # degrees
- self.rate = 200 # Hz
- self.pub_skip = 2 # publish every pub_skip ticks
- self.pub_tick_count = 0
+ self.rate = 100 # Hz
+ self.tick_count = 0
+ self.interval = 2 # how frequently to publish
self.lock = threading.Lock()
- freq = 10
- timer_period = 1/freq # seconds
+ timer_period = 1/self.rate # seconds
self.timer = self.create_timer(timer_period, self.loop)
+ self.steering_subscriber = self.create_subscription(
+ Float64, "input/steering", self.update_steering_angle, 1
+ )
+
+ # To read from velocity
+ self.velocity_subscriber = self.create_subscription(
+ Float64, "sim/velocity", self.update_velocity, 1
+ )
+
+ # for X11 matplotlib (direction included)
+ self.plot_publisher = self.create_publisher(Pose, "sim_2d/utm", 1)
+
+ # simulate the INS's outputs (noise included)
+ # this is published as a BuggyState (UTM and radians)
+ self.pose_publisher = self.create_publisher(Odometry, "self/state", 1)
+
+ self.navsatfix_noisy_publisher = self.create_publisher(
+ NavSatFix, "self/pose_navsat_noisy", 1
+ )
+
def update_steering_angle(self, data: Float64):
with self.lock:
self.steering_angle = data.data
@@ -98,14 +92,6 @@ def update_velocity(self, data: Float64):
with self.lock:
self.velocity = data.data
- def get_steering_arc(self):
- with self.lock:
- steering_angle = self.steering_angle
- if steering_angle == 0.0:
- return np.inf
-
- return Simulator.WHEELBASE / np.tan(np.deg2rad(steering_angle))
-
def dynamics(self, state, v):
l = self.wheelbase
_, _, theta, delta = state
@@ -146,7 +132,7 @@ def publish(self):
with self.lock:
p.position.x = self.e_utm
p.position.y = self.n_utm
- p.position.z = self.heading
+ p.position.z = float(self.heading)
velocity = self.velocity
self.plot_publisher.publish(p)
@@ -154,18 +140,13 @@ def publish(self):
(lat, long) = utm.to_latlon(
p.position.x,
p.position.y,
- Simulator.UTM_ZONE_NUM,
- Simulator.UTM_ZONE_LETTER,
+ Constants.UTM_ZONE_NUM,
+ Constants.UTM_ZONE_LETTER,
)
- nsf = NavSatFix()
- nsf.latitude = lat
- nsf.longitude = long
- nsf.altitude = float(260)
- nsf.header.stamp = time_stamp
-
lat_noisy = lat + np.random.normal(0, 1e-6)
long_noisy = long + np.random.normal(0, 1e-6)
+
nsf_noisy = NavSatFix()
nsf_noisy.latitude = lat_noisy
nsf_noisy.longitude = long_noisy
@@ -176,13 +157,14 @@ def publish(self):
odom.header.stamp = time_stamp
odom_pose = Pose()
- odom_pose.position.x = float(long)
- odom_pose.position.y = float(lat)
+ east, north, _, _ = utm.from_latlon(lat_noisy, long_noisy)
+ odom_pose.position.x = float(east)
+ odom_pose.position.y = float(north)
odom_pose.position.z = float(260)
- odom_pose.orientation.z = np.sin(np.deg2rad(self.heading) / 2)
- odom_pose.orientation.w = np.cos(np.deg2rad(self.heading) / 2)
+ odom_pose.orientation.z = np.deg2rad(self.heading)
+ # NOTE: autonsystem only cares about magnitude of velocity, so we don't need to split into components
odom_twist = Twist()
odom_twist.linear.x = float(velocity)
@@ -192,26 +174,25 @@ def publish(self):
self.pose_publisher.publish(odom)
def loop(self):
- msg = Float64()
- msg.data = float(self.i)
-
- self.number_publisher.publish(msg)
- self.i += 1
-
self.step()
- if self.pub_tick_count == self.pub_skip:
+ if self.tick_count % self.interval == 0:
self.publish()
- self.pub_tick_count = 0
- else:
- self.pub_tick_count += 1
-
-
+ self.tick_count += 1
+ self.get_logger().debug("SIMULATED UTM: ({}, {}), HEADING: {}".format(self.e_utm, self.n_utm, self.heading))
def main(args=None):
rclpy.init(args=args)
sim = Simulator()
+ for _ in range(500):
+ time.sleep(0.01)
+ sim.publish()
+
+
+ sim.get_logger().info("STARTED PUBLISHING")
rclpy.spin(sim)
+
+ sim.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
diff --git a/rb_ws/src/buggy/buggy/simulator/velocity_ui.py b/rb_ws/src/buggy/scripts/simulator/velocity_ui.py
similarity index 100%
rename from rb_ws/src/buggy/buggy/simulator/velocity_ui.py
rename to rb_ws/src/buggy/scripts/simulator/velocity_ui.py
diff --git a/rb_ws/src/buggy/buggy/simulator/velocity_updater.py b/rb_ws/src/buggy/scripts/simulator/velocity_updater.py
similarity index 100%
rename from rb_ws/src/buggy/buggy/simulator/velocity_updater.py
rename to rb_ws/src/buggy/scripts/simulator/velocity_updater.py
diff --git a/rb_ws/src/buggy/scripts/util/constants.py b/rb_ws/src/buggy/scripts/util/constants.py
new file mode 100755
index 00000000..0bdfc8e3
--- /dev/null
+++ b/rb_ws/src/buggy/scripts/util/constants.py
@@ -0,0 +1,8 @@
+class Constants:
+ UTM_EAST_ZERO = 589702.87
+ UTM_NORTH_ZERO = 4477172.947
+ UTM_ZONE_NUM = 17
+ UTM_ZONE_LETTER = "T"
+ #TODO: make these values accurate
+ WHEELBASE_SC = 1.17
+ WHEELBASE_NAND= 1.17
\ No newline at end of file
diff --git a/rb_ws/src/buggy/scripts/util/pose.py b/rb_ws/src/buggy/scripts/util/pose.py
new file mode 100755
index 00000000..f12a4ffd
--- /dev/null
+++ b/rb_ws/src/buggy/scripts/util/pose.py
@@ -0,0 +1,203 @@
+import numpy as np
+
+from geometry_msgs.msg import Pose as ROSPose
+from scipy.spatial.transform import Rotation
+
+
+class Pose:
+ """
+ A data structure for storing 2D poses, as well as a set of
+ convenience methods for transforming/manipulating poses
+
+ TODO: All Pose objects are with respect to World coordinates.
+ """
+
+ __x = None
+ __y = None
+ __theta = None
+
+ @staticmethod
+ def rospose_to_pose(rospose: ROSPose):
+ """
+ Converts a geometry_msgs/Pose to a pose3d/Pose
+
+ Args:
+ posestamped (geometry_msgs/Pose): pose to convert
+
+ Returns:
+ Pose: converted pose
+ """
+ (_, _, yaw) = Rotation.from_quat(
+ [
+ rospose.orientation.x,
+ rospose.orientation.y,
+ rospose.orientation.z,
+ rospose.orientation.w,
+ ]
+ ).as_euler('xyz')
+
+ p = Pose(rospose.position.x, rospose.position.y, yaw)
+ return p
+
+ def heading_to_quaternion(heading):
+ q = Rotation.from_euler([0, 0, heading]).as_quat()
+ return q
+
+ def __init__(self, x, y, theta):
+ self.x = x
+ self.y = y
+ self.theta = theta
+
+ def __repr__(self) -> str:
+ return f"Pose(x={self.x}, y={self.y}, theta={self.theta})"
+
+ def copy(self):
+ return Pose(self.x, self.y, self.theta)
+
+ @property
+ def x(self):
+ return self.__x
+
+ @x.setter
+ def x(self, x):
+ self.__x = x
+
+ @property
+ def y(self):
+ return self.__y
+
+ @y.setter
+ def y(self, y):
+ self.__y = y
+
+ @property
+ def theta(self):
+ if self.__theta > np.pi or self.__theta < -np.pi:
+ raise ValueError("Theta is not in [-pi, pi]")
+ return self.__theta
+
+ @theta.setter
+ def theta(self, theta):
+ # normalize theta to [-pi, pi]
+ self.__theta = np.arctan2(np.sin(theta), np.cos(theta))
+
+ def to_mat(self):
+ """
+ Returns the pose as a 3x3 homogeneous transformation matrix
+ """
+ return np.array(
+ [
+ [np.cos(self.theta), -np.sin(self.theta), self.x],
+ [np.sin(self.theta), np.cos(self.theta), self.y],
+ [0, 0, 1],
+ ]
+ )
+
+ @staticmethod
+ def from_mat(mat):
+ """
+ Creates a pose from a 3x3 homogeneous transformation matrix
+ """
+ return Pose(mat[0, 2], mat[1, 2], np.arctan2(mat[1, 0], mat[0, 0]))
+
+ def invert(self):
+ """
+ Inverts the pose
+ """
+ return Pose.from_mat(np.linalg.inv(self.to_mat()))
+
+ def convert_pose_from_global_to_local_frame(self, pose):
+ """
+ Converts the inputted pose from the global frame to the local frame
+ (relative to the current pose)
+ """
+ return pose / self
+
+ def convert_pose_from_local_to_global_frame(self, pose):
+ """
+ Converts the inputted pose from the local frame to the global frame
+ (relative to the current pose)
+ """
+ return pose * self
+
+ def convert_point_from_global_to_local_frame(self, point):
+ """
+ Converts the inputted point from the global frame to the local frame
+ (relative to the current pose)
+ """
+ point_hg = np.array([point[0], point[1], 1])
+ point_hg_local = np.linalg.inv(self.to_mat()) @ point_hg
+ return (
+ point_hg_local[0] / point_hg_local[2],
+ point_hg_local[1] / point_hg_local[2],
+ )
+
+ def convert_point_from_local_to_global_frame(self, point):
+ """
+ Converts the inputted point from the local frame to the global frame
+ (relative to the current pose)
+ """
+ point_hg = np.array([point[0], point[1], 1])
+ point_hg_global = self.to_mat() @ point_hg
+ return (
+ point_hg_global[0] / point_hg_global[2],
+ point_hg_global[1] / point_hg_global[2],
+ )
+
+ def convert_point_array_from_global_to_local_frame(self, points):
+ """
+ Converts the inputted point array from the global frame to the local frame
+ (relative to the current pose)
+
+ Args:
+ points (np.ndarray) [Size: (N,2)]: array of points to convert
+ """
+ points_hg = np.array([points[:, 0], points[:, 1], np.ones(len(points))])
+ points_hg_local = np.linalg.inv(self.to_mat()) @ points_hg.T
+ return (points_hg_local[:2, :] / points_hg_local[2, :]).T
+
+ def convert_point_array_from_local_to_global_frame(self, points):
+ """
+ Converts the inputted point array from the local frame to the global frame
+ (relative to the current pose)
+
+ Args:
+ points (np.ndarray) [Size: (N,2)]: array of points to convert
+ """
+ points_hg = np.array([points[:, 0], points[:, 1], np.ones(len(points))])
+ points_hg_global = self.to_mat() @ points_hg.T
+ return (points_hg_global[:2, :] / points_hg_global[2, :]).T
+
+ def rotate(self, angle):
+ """
+ Rotates the pose by the given angle
+ """
+ self.theta += angle
+
+ def translate(self, x, y):
+ """
+ Translates the pose by the given x and y distances
+ """
+ self.x += x
+ self.y += y
+
+ def __add__(self, other):
+ return Pose(self.x + other.x, self.y + other.y, self.theta + other.theta)
+
+ def __sub__(self, other):
+ return Pose(self.x - other.x, self.y - other.y, self.theta - other.theta)
+
+ def __neg__(self):
+ return Pose(-self.x, -self.y, -self.theta)
+
+ def __mul__(self, other):
+ p1_mat = self.to_mat()
+ p2_mat = other.to_mat()
+
+ return Pose.from_mat(p1_mat @ p2_mat)
+
+ def __truediv__(self, other):
+ p1_mat = self.to_mat()
+ p2_mat = other.to_mat()
+
+ return Pose.from_mat(np.linalg.inv(p2_mat) @ p1_mat)
diff --git a/rb_ws/src/buggy/scripts/util/trajectory.py b/rb_ws/src/buggy/scripts/util/trajectory.py
new file mode 100755
index 00000000..1bbdfb46
--- /dev/null
+++ b/rb_ws/src/buggy/scripts/util/trajectory.py
@@ -0,0 +1,363 @@
+import json
+import time
+
+# from buggy.msg import TrajectoryMsg
+
+import numpy as np
+from scipy.interpolate import Akima1DInterpolator, CubicSpline
+from buggy.msg import TrajectoryMsg
+
+import utm
+
+class Trajectory:
+ """A wrapper around a trajectory JSON file that does some under-the-hood math. Will
+ interpolate the data points and calculate other information such as distance along the trajectory
+ and instantaneous curvature.
+
+ Currently, the trajectory is assumed to be a straight line between each waypoint. This is
+ not a good assumption, but it's good enough for now. This means that the curvature is zero
+ anywhere between waypoints.
+
+ This class also has a method that will find the closest point on the trajectory to a given
+ point in the world. This is useful for finding where the buggy is on the trajectory.
+
+ Use https://rdudhagra.github.io/eracer-portal/ to make trajectories and save the JSON file
+ """
+
+ def __init__(self, json_filepath=None, positions = None, interpolator="CubicSpline") -> None:
+ """
+ Args:
+ json_filepath (String): file path to the path json file (begins at /rb_ws)
+ positions [[float, float]]: reference trajectory in world coordinates
+ current_speed (float): current speed of the buggy
+
+ Returns:
+ float (desired steering angle)
+ """
+ self.distances = np.zeros((0, 1)) # (N/dt x 1) [d, d, ...]
+ self.positions = np.zeros((0, 2)) # (N x 2) [(x,y), (x,y), ...]
+ self.indices = None # (N x 1) [0, 1, 2, ...]
+ self.interpolation = None # scipy.interpolate.PPoly
+
+ # read positions from file
+ if positions is None:
+ positions = []
+ # Load the json file
+ with open(json_filepath, "r") as f:
+ data = json.load(f)
+
+ # Iterate through the waypoints and extract the positions
+ num_waypoints = len(data)
+ for i in range(0, num_waypoints):
+
+ waypoint = data[i]
+
+ lat = waypoint["lat"]
+ lon = waypoint["lon"]
+
+ # Convert to world coordinates
+ x, y, _, _ = utm.from_latlon(lat, lon)
+ positions.append([x, y])
+
+ positions = np.array(positions)
+
+ num_indices = positions.shape[0]
+
+ if interpolator == "Akima":
+ self.positions = positions
+ self.indices = np.arange(num_indices)
+ self.interpolation = Akima1DInterpolator(self.indices, self.positions)
+ self.interpolation.extrapolate = True
+ elif interpolator == "CubicSpline":
+ temp_traj = Trajectory(positions=positions, interpolator="Akima")
+ tot_len = temp_traj.distances[-1]
+ interp_dists = np.linspace(0, tot_len, num_indices)
+
+ self.indices = np.arange(num_indices)
+ self.positions = [
+ temp_traj.get_position_by_distance(interp_dist)
+ for interp_dist in interp_dists
+ ]
+ self.positions = np.array(self.positions)
+
+ self.interpolation = CubicSpline(self.indices, self.positions)
+ self.interpolation.extrapolate = True
+
+ # TODO: check units
+ # Calculate the distances along the trajectory
+ dt = 0.01 #dt is time step (in seconds (?))
+ ts = np.arange(len(self.positions) - 1, step=dt) # times corresponding to each position (?)
+ drdt = self.interpolation(
+ ts, nu=1
+ ) # Calculate derivatives of interpolated path wrt indices
+ ds = np.sqrt(drdt[:, 0] ** 2 + drdt[:, 1] ** 2) * dt # distances to each interpolated point
+ s = np.cumsum(np.hstack([[0], ds[:-1]]))
+ self.distances = s
+ self.dt = dt
+
+ def get_num_points(self):
+ """Gets the number of points along the trajectory
+
+ Returns:
+ int: number of points
+ """
+ return len(self.positions)
+
+ def get_heading_by_index(self, index):
+ """Gets the heading at given index along trajectory,
+ interpolating if necessary
+
+ Args:
+ index (float): index along the trajectory
+
+ Returns:
+ float: (theta) in rads
+ """
+ # theta = np.interp(index, self.indices, self.positions[:, 2])
+ dxdt, dydt = self.interpolation(index, nu=1).reshape((-1, 2)).T
+ theta = np.arctan2(dydt, dxdt)
+
+ return theta
+
+ def get_heading_by_distance(self, distance):
+ """Gets the heading at given distance along trajectory,
+ interpolating if necessary
+
+ Args:
+ distance (float): distance along the trajectory in meters
+
+ Returns:
+ float: (theta) in rads
+ """
+ index = self.get_index_from_distance(distance)
+ return self.get_heading_by_index(index)
+
+ def get_position_by_index(self, index):
+ """Gets the position at a given index along the trajectory,
+ interpolating if necessary
+
+ Args:
+ index (float): index along the trajectory
+
+ Returns:
+ tuple: (x, y)
+ """
+ # Interpolate the position
+
+ return self.interpolation(index)
+
+ def get_position_by_distance(self, distance):
+ """Gets the position at a given distance along the trajectory,
+ interpolating if necessary
+
+ Args:
+ distance (float): distance along the trajectory in meters
+
+ Returns:
+ tuple: (x, y)
+ """
+ # Interpolate the position
+ index = self.get_index_from_distance(distance)
+ return self.get_position_by_index(index)
+
+ def get_steering_angle_by_index(self, index, wheelbase):
+ """Gets the bicycle-model steering angle at a given distance along the trajectory,
+ interpolating if necessary. Assumes that the origin point of the buggy is at the
+ center of the rear axle.
+
+ Args:
+ index (float): index along the trajectory
+ wheelbase (float): wheelbase of the buggy in meters
+
+ Returns:
+ float: steering angle in rads
+ """
+ curvature = self.get_curvature_by_index(index)
+ return np.arctan(wheelbase * curvature)
+
+ def get_steering_angle_by_distance(self, distance, wheelbase):
+ """Gets the bicycle-model steering angle at a given distance along the trajectory,
+ interpolating if necessary. Assumes that the origin point of the buggy is at the
+ center of the rear axle.
+
+ Args:
+ distance (float): distance along the trajectory in meters
+ wheelbase (float): wheelbase of the buggy in meters
+
+ Returns:
+ float: steering angle in rads
+ """
+ index = self.get_index_from_distance(distance)
+ return self.get_steering_angle_by_index(index, wheelbase)
+
+ def get_index_from_distance(self, distance):
+ """Gets the index at a given distance along the trajectory,
+ interpolating if necessary
+
+ Args:
+ distance (float): distance along the trajectory in meters
+
+ Returns:
+ int: index along the trajectory
+ """
+ # Interpolate the index
+ index = np.interp(
+ distance,
+ self.distances,
+ np.linspace(0, len(self.distances), len(self.distances)),
+ )
+
+ return index * self.dt
+
+ def get_distance_from_index(self, index):
+ """Gets the distance at a given index along the trajectory,
+ interpolating if necessary
+
+ Args:
+ index (float): index along the trajectory
+
+ Returns:
+ float: distance along the trajectory in meters
+ """
+ # Interpolate the distance
+ distance = np.interp(
+ index / self.dt, np.arange(len(self.distances)), self.distances
+ )
+
+ return distance
+
+ def get_curvature_by_index(self, index):
+ """Gets the curvature at a given index along the trajectory,
+ interpolating if necessary
+
+ Args:
+ index (float): index along the trajectory
+
+ Returns:
+ float: curvature
+ """
+ # Interpolate the curvature
+ dxdt, dydt = self.interpolation(index, nu=1).reshape((-1, 2)).T
+ ddxdtt, ddydtt = self.interpolation(index, nu=2).reshape((-1, 2)).T
+
+ curvature = (dxdt * ddydtt - dydt * ddxdtt) / (
+ (dxdt**2 + dydt**2) ** (3 / 2)
+ )
+
+ return curvature
+
+ def get_curvature_by_distance(self, distance):
+ """Gets the curvature at a given distance along the trajectory,
+ interpolating if necessary
+
+ Args:
+ distance (float): distance along the trajectory in meters
+
+ Returns:
+ float: curvature
+ """
+ # Interpolate the curvature
+ index = self.get_index_from_distance(distance)
+
+ return self.get_curvature_by_index(index)
+
+ def get_dynamics_by_index(self, index, wheelbase):
+ """Gets the dynamics at a given index along the trajectory,
+ interpolating if necessary. Convenience function that returns
+ all of the dynamics at once rather than requiring multiple
+ calls to other helper functions. In this way, we can reuse calls
+ to the interpolator, improving performance.
+
+ Args:
+ index (float): index along the trajectory
+
+ Returns:
+ tuple: (x, y, theta, steering_angle)
+ """
+ # Interpolate the dynamics
+ x, y = self.interpolation(index).reshape((-1, 2)).T
+ dxdt, dydt = self.interpolation(index, nu=1).reshape((-1, 2)).T
+ ddxdtt, ddydtt = self.interpolation(index, nu=2).reshape((-1, 2)).T
+
+ curvature = (dxdt * ddydtt - dydt * ddxdtt) / (
+ (dxdt**2 + dydt**2) ** (3 / 2)
+ )
+ theta = np.arctan2(dydt, dxdt)
+
+ return np.stack((x, y, theta, np.arctan(wheelbase * curvature)), axis=-1)
+
+ def get_unit_normal_by_index(self, index):
+ """Gets the index of the closest point on the trajectory to the given point
+
+ Args:
+ index: Nx1 numpy array: indexes along trajectory
+ Returns:
+ Nx2 numpy array: unit normal of the trajectory at index
+ """
+
+ derivative = self.interpolation(index, nu=1)
+ unit_derivative = derivative / np.linalg.norm(derivative, axis=1)[:, None]
+
+ # (x, y), rotated by 90 deg ccw = (-y, x)
+ unit_normal = np.vstack((-unit_derivative[:, 1], unit_derivative[:, 0])).T
+ return unit_normal
+
+ def get_closest_index_on_path(
+ self, x, y, start_index=0, end_index=None, subsample_resolution=1000
+ ):
+ """Gets the index of the closest point on the trajectory to the given point
+
+ Args:
+ x (float): x coordinate
+ y (float): y coordinate
+ start_index (int, optional): index to start searching from. Defaults to 0.
+ end_index (int, optional): index to end searching at. Defaults to None (disable).
+ subsample_resolution: resolution of the resulting interpolation
+ Returns:
+ float: index along the trajectory
+ """
+ # If end_index is not specified, use the length of the trajectory
+ if end_index is None:
+ end_index = len(self.positions) #sketch, 0-indexing where??
+
+ # Floor/ceil the start/end indices
+ start_index = max(0, int(np.floor(start_index)))
+ end_index = int(np.ceil(end_index))
+
+ # Calculate the distance from the point to each point on the trajectory
+ distances = (self.positions[start_index : end_index + 1, 0] - x) ** 2 + (
+ self.positions[start_index : end_index + 1, 1] - y
+ ) ** 2 #Don't need to squareroot as it is a relative distance
+
+ min_ind = np.argmin(distances) + start_index
+
+ start_index = max(0, min_ind - 1) #Protection in case min_ind is too low
+ end_index = min(len(self.positions), min_ind + 1) #Prtoecting in case min_ind too high
+ #Theoretically start_index and end_index are just two apart
+
+ # Now interpolate at a higher resolution to get a more accurate result
+ r_interp = self.interpolation(
+ np.linspace(start_index, end_index, subsample_resolution + 1))
+ x_interp, y_interp = r_interp[:, 0], r_interp[:, 1] #x_interp, y_interp are numpy column vectors
+
+ distances = (x_interp - x) ** 2 + (y_interp - y) ** 2 #Again distances are relative
+
+ # Return the rational index of the closest point
+ return (
+ np.argmin(distances) / subsample_resolution * (end_index - start_index)
+ + start_index
+ )
+
+ def pack(self, x, y) -> TrajectoryMsg:
+ traj = TrajectoryMsg()
+ traj.easting = list(self.positions[:, 0])
+ traj.northing = list(self.positions[:, 1])
+ traj.time = time.time()
+ traj.cur_idx = self.get_closest_index_on_path(x,y)
+ return traj
+
+ def unpack(trajMsg : TrajectoryMsg):
+ pos = np.array([trajMsg.easting, trajMsg.northing]).transpose(1, 0)
+ cur_idx = trajMsg.cur_idx
+ return Trajectory(positions=pos), cur_idx
+
diff --git a/rb_ws/src/buggy/buggy/watchdog/watchdog_node.py b/rb_ws/src/buggy/scripts/watchdog/watchdog_node.py
old mode 100644
new mode 100755
similarity index 98%
rename from rb_ws/src/buggy/buggy/watchdog/watchdog_node.py
rename to rb_ws/src/buggy/scripts/watchdog/watchdog_node.py
index b0338da8..7c155e32
--- a/rb_ws/src/buggy/buggy/watchdog/watchdog_node.py
+++ b/rb_ws/src/buggy/scripts/watchdog/watchdog_node.py
@@ -1,3 +1,5 @@
+#!/usr/bin/env python3
+
import rclpy
from rclpy.node import Node
diff --git a/rb_ws/src/buggy/setup.cfg b/rb_ws/src/buggy/setup.cfg
deleted file mode 100644
index c2b78e07..00000000
--- a/rb_ws/src/buggy/setup.cfg
+++ /dev/null
@@ -1,4 +0,0 @@
-[develop]
-script_dir=$base/lib/buggy
-[install]
-install_scripts=$base/lib/buggy
diff --git a/rb_ws/src/buggy/test/test_copyright.py b/rb_ws/src/buggy/test/test_copyright.py
deleted file mode 100644
index 97a39196..00000000
--- a/rb_ws/src/buggy/test/test_copyright.py
+++ /dev/null
@@ -1,25 +0,0 @@
-# Copyright 2015 Open Source Robotics Foundation, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ament_copyright.main import main
-import pytest
-
-
-# Remove the `skip` decorator once the source file(s) have a copyright header
-@pytest.mark.skip(reason='No copyright header has been placed in the generated source file.')
-@pytest.mark.copyright
-@pytest.mark.linter
-def test_copyright():
- rc = main(argv=['.', 'test'])
- assert rc == 0, 'Found errors'
diff --git a/rb_ws/src/buggy/test/test_flake8.py b/rb_ws/src/buggy/test/test_flake8.py
deleted file mode 100644
index 27ee1078..00000000
--- a/rb_ws/src/buggy/test/test_flake8.py
+++ /dev/null
@@ -1,25 +0,0 @@
-# Copyright 2017 Open Source Robotics Foundation, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ament_flake8.main import main_with_errors
-import pytest
-
-
-@pytest.mark.flake8
-@pytest.mark.linter
-def test_flake8():
- rc, errors = main_with_errors(argv=[])
- assert rc == 0, \
- 'Found %d code style errors / warnings:\n' % len(errors) + \
- '\n'.join(errors)
diff --git a/rb_ws/src/buggy/test/test_pep257.py b/rb_ws/src/buggy/test/test_pep257.py
deleted file mode 100644
index b234a384..00000000
--- a/rb_ws/src/buggy/test/test_pep257.py
+++ /dev/null
@@ -1,23 +0,0 @@
-# Copyright 2015 Open Source Robotics Foundation, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ament_pep257.main import main
-import pytest
-
-
-@pytest.mark.linter
-@pytest.mark.pep257
-def test_pep257():
- rc = main(argv=['.', 'test'])
- assert rc == 0, 'Found code style errors / warnings'