forked from CMU-Robotics-Club/RoboBuggy2
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge remote-tracking branch 'origin/main' into serial-port
- Loading branch information
Showing
12 changed files
with
1,016 additions
and
14 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
import threading | ||
import sys | ||
|
||
import numpy as np | ||
|
||
import rclpy | ||
from rclpy.node import Node | ||
|
||
from std_msgs.msg import Float32, Float64, Bool | ||
from nav_msgs.msg import Odometry | ||
|
||
sys.path.append("/rb_ws/src/buggy/buggy") | ||
from util.trajectory import Trajectory | ||
from controller.stanley_controller import StanleyController | ||
|
||
class Controller(Node): | ||
|
||
def __init__(self): | ||
""" | ||
Constructor for Controller class. | ||
Creates a ROS node with a publisher that periodically sends a message | ||
indicating whether the node is still alive. | ||
""" | ||
super().__init__('controller') | ||
self.get_logger().info('INITIALIZED.') | ||
|
||
|
||
#Parameters | ||
self.declare_parameter("dist", 0.0) #Starting Distance along path | ||
start_dist = self.get_parameter("dist").value | ||
|
||
self.declare_parameter("traj_name", "buggycourse_safe.json") | ||
traj_name = self.get_parameter("traj_name").value | ||
self.cur_traj = Trajectory(json_filepath="/rb_ws/src/buggy/paths/" + traj_name) #TODO: Fixed filepath, not good | ||
|
||
start_index = self.cur_traj.get_index_from_distance(start_dist) | ||
|
||
self.declare_parameter("controller_name", "stanley") | ||
|
||
controller_name = self.get_parameter("controller_name").value | ||
print(controller_name.lower) | ||
if (controller_name.lower() == "stanley"): | ||
self.controller = StanleyController(start_index = start_index, namespace = self.get_namespace(), node=self) #IMPORT STANLEY | ||
else: | ||
self.get_logger().error("Invalid Controller Name: " + controller_name.lower()) | ||
raise Exception("Invalid Controller Argument") | ||
|
||
# Publishers | ||
self.init_check_publisher = self.create_publisher(Bool, | ||
"debug/init_safety_check", 1 | ||
) | ||
self.steer_publisher = self.create_publisher( | ||
Float64, "input/steering", 1 | ||
) | ||
self.heading_publisher = self.create_publisher( | ||
Float32, "auton/debug/heading", 1 | ||
) | ||
|
||
# Subscribers | ||
self.odom_subscriber = self.create_subscription(Odometry, 'self/state', self.odom_listener, 1) | ||
self.traj_subscriber = self.create_subscription(Odometry, 'self/cur_traj', self.traj_listener, 1) | ||
|
||
self.lock = threading.Lock() | ||
|
||
self.odom = None | ||
self.passed_init = False | ||
|
||
timer_period = 0.01 # seconds (100 Hz) | ||
self.timer = self.create_timer(timer_period, self.loop) | ||
|
||
def odom_listener(self, msg : Odometry): | ||
''' | ||
This is the subscriber that updates the buggies state for navigation | ||
msg, should be a CLEAN state as defined in the wiki | ||
''' | ||
with self.lock: | ||
self.odom = msg | ||
|
||
def traj_listener(self, msg): | ||
''' | ||
This is the subscriber that updates the buggies trajectory for navigation | ||
''' | ||
with self.lock: | ||
self.cur_traj, self.controller.current_traj_index = Trajectory.unpack(msg) | ||
|
||
def init_check(self): | ||
""" | ||
Checks if it's safe to switch the buggy into autonomous driving mode. | ||
Specifically, it checks: | ||
if we can recieve odometry messages from the buggy | ||
if the covariance is acceptable (less than 1 meter) | ||
if the buggy thinks it is facing in the correct direction wrt the local trajectory (not 180 degrees flipped) | ||
Returns: | ||
A boolean describing the status of the buggy (safe for auton or unsafe for auton) | ||
""" | ||
if (self.odom == None): | ||
self.get_logger().warn("WARNING: no available position estimate") | ||
return False | ||
|
||
elif (self.odom.pose.covariance[0] ** 2 + self.odom.pose.covariance[7] ** 2 > 1**2): | ||
self.get_logger().warn("checking position estimate certainty") | ||
return False | ||
|
||
#Originally under a lock, doesn't seem necessary? | ||
current_heading = self.odom.pose.pose.orientation.z | ||
closest_heading = self.cur_traj.get_heading_by_index(self.cur_traj.get_closest_index_on_path(self.odom.pose.pose.position.x, self.odom.pose.pose.position.y)) | ||
|
||
self.get_logger().info("current heading: " + str(np.rad2deg(current_heading))) | ||
msg = Float32() | ||
msg.data = np.rad2deg(current_heading) | ||
self.heading_publisher.publish(msg) | ||
|
||
#Converting headings from [-pi, pi] to [0, 2pi] | ||
if (current_heading < 0): | ||
current_heading = 2 * np.pi + current_heading | ||
if (closest_heading < 0): | ||
closest_heading = 2 * np.pi + closest_heading | ||
|
||
if (abs(current_heading - closest_heading) >= np.pi/2): | ||
self.get_logger().warn("WARNING: INCORRECT HEADING! restart stack. Current heading [-180, 180]: " + str(np.rad2deg(current_heading))) | ||
return False | ||
|
||
return True | ||
|
||
def loop(self): | ||
if not self.passed_init: | ||
self.passed_init = self.init_check() | ||
msg = Bool() | ||
msg.data = self.passed_init | ||
self.init_check_publisher.publish(msg) | ||
if self.passed_init: | ||
self.get_logger().info("Passed Initialization Check") | ||
else: | ||
return | ||
|
||
self.heading_publisher.publish(Float32(data=np.rad2deg(self.odom.pose.pose.orientation.z))) | ||
steering_angle = self.controller.compute_control(self.odom, self.cur_traj) | ||
steering_angle_deg = np.rad2deg(steering_angle) | ||
self.steer_publisher.publish(Float64(data=float(steering_angle_deg))) | ||
|
||
|
||
|
||
def main(args=None): | ||
rclpy.init(args=args) | ||
|
||
controller = Controller() | ||
|
||
rclpy.spin(controller) | ||
|
||
# Destroy the node explicitly | ||
# (optional - otherwise it will be done automatically | ||
# when the garbage collector destroys the node object) | ||
controller.destroy_node() | ||
rclpy.shutdown() | ||
|
||
|
||
if __name__ == '__main__': | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
from abc import ABC, abstractmethod | ||
import sys | ||
|
||
from nav_msgs.msg import Odometry | ||
|
||
|
||
sys.path.append("/rb_ws/src/buggy/buggy") | ||
from util.trajectory import Trajectory | ||
|
||
class Controller(ABC): | ||
""" | ||
Base class for all controllers. | ||
The controller takes in the current state of the buggy and a reference | ||
trajectory. It must then compute the desired control output. | ||
The method that it does this by is up to the implementation, of course. | ||
Example schemes include Pure Pursuit, Stanley, and LQR. | ||
""" | ||
|
||
# TODO: move this to a constants class | ||
NAND_WHEELBASE = 1.3 | ||
SC_WHEELBASE = 1.104 | ||
|
||
def __init__(self, start_index: int, namespace : str, node) -> None: | ||
self.namespace = namespace | ||
if namespace.upper() == '/NAND': | ||
Controller.WHEELBASE = self.NAND_WHEELBASE | ||
else: | ||
Controller.WHEELBASE = self.SC_WHEELBASE | ||
|
||
self.current_traj_index = start_index | ||
self.node = node | ||
|
||
@abstractmethod | ||
def compute_control( | ||
self, state_msg: Odometry, trajectory: Trajectory, | ||
) -> float: | ||
""" | ||
Computes the desired control output given the current state and reference trajectory | ||
Args: | ||
state: (Odometry): state of the buggy, including position, attitude and associated twists | ||
trajectory (Trajectory): reference trajectory | ||
Returns: | ||
float (desired steering angle) | ||
""" | ||
raise NotImplementedError |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
import sys | ||
import numpy as np | ||
|
||
from sensor_msgs.msg import NavSatFix | ||
from geometry_msgs.msg import Pose as ROSPose | ||
from nav_msgs.msg import Odometry | ||
|
||
sys.path.append("/rb_ws/src/buggy/buggy") | ||
from util.trajectory import Trajectory | ||
from controller.controller_superclass import Controller | ||
from util.pose import Pose | ||
|
||
import utm | ||
|
||
|
||
class StanleyController(Controller): | ||
""" | ||
Stanley Controller (front axle used as reference point) | ||
Referenced from this paper: https://ai.stanford.edu/~gabeh/papers/hoffmann_stanley_control07.pdf | ||
""" | ||
|
||
LOOK_AHEAD_DIST_CONST = 0.05 # s | ||
MIN_LOOK_AHEAD_DIST = 0.1 #m | ||
MAX_LOOK_AHEAD_DIST = 2.0 #m | ||
|
||
CROSS_TRACK_GAIN = 1.3 | ||
K_SOFT = 1.0 # m/s | ||
K_D_YAW = 0.012 # rad / (rad/s) | ||
|
||
def __init__(self, start_index, namespace, node): | ||
super(StanleyController, self).__init__(start_index, namespace, node) | ||
self.debug_reference_pos_publisher = self.node.create_publisher( | ||
NavSatFix, "auton/debug/reference_navsat", 1 | ||
) | ||
self.debug_error_publisher = self.node.create_publisher( | ||
ROSPose, "auton/debug/error", 1 | ||
) | ||
|
||
def compute_control(self, state_msg : Odometry, trajectory : Trajectory): | ||
"""Computes the steering angle determined by Stanley controller. | ||
Does this by looking at the crosstrack error + heading error | ||
Args: | ||
state_msg: ros Odometry message | ||
trajectory (Trajectory): reference trajectory | ||
Returns: | ||
float (desired steering angle) | ||
""" | ||
if self.current_traj_index >= trajectory.get_num_points() - 1: | ||
self.node.get_logger.error("[Stanley]: Ran out of path to follow!") | ||
raise Exception("[Stanley]: Ran out of path to follow!") | ||
|
||
current_rospose = state_msg.pose.pose | ||
current_speed = np.sqrt( | ||
state_msg.twist.twist.linear.x**2 + state_msg.twist.twist.linear.y**2 | ||
) | ||
yaw_rate = state_msg.twist.twist.angular.z | ||
heading = current_rospose.orientation.z | ||
x, y = current_rospose.position.x, current_rospose.position.y #(Easting, Northing) | ||
|
||
front_x = x + StanleyController.WHEELBASE * np.cos(heading) | ||
front_y = y + StanleyController.WHEELBASE * np.sin(heading) | ||
|
||
# setting range of indices to search so we don't have to search the entire path | ||
traj_index = trajectory.get_closest_index_on_path( | ||
front_x, | ||
front_y, | ||
start_index=self.current_traj_index - 20, | ||
end_index=self.current_traj_index + 50, | ||
) | ||
self.current_traj_index = max(traj_index, self.current_traj_index) | ||
|
||
|
||
lookahead_dist = np.clip( | ||
self.LOOK_AHEAD_DIST_CONST * current_speed, | ||
self.MIN_LOOK_AHEAD_DIST, | ||
self.MAX_LOOK_AHEAD_DIST) | ||
|
||
traj_dist = trajectory.get_distance_from_index(self.current_traj_index) + lookahead_dist | ||
|
||
ref_heading = trajectory.get_heading_by_index( | ||
trajectory.get_index_from_distance(traj_dist)) | ||
|
||
error_heading = ref_heading - heading | ||
error_heading = np.arctan2(np.sin(error_heading), np.cos(error_heading)) #Bounds error_heading | ||
|
||
# Calculate cross track error by finding the distance from the buggy to the tangent line of | ||
# the reference trajectory | ||
closest_position = trajectory.get_position_by_index(self.current_traj_index) | ||
next_position = trajectory.get_position_by_index( | ||
self.current_traj_index + 0.0001 | ||
) | ||
x1 = closest_position[0] | ||
y1 = closest_position[1] | ||
x2 = next_position[0] | ||
y2 = next_position[1] | ||
error_dist = -((x - x1) * (y2 - y1) - (y - y1) * (x2 - x1)) / np.sqrt( | ||
(y2 - y1) ** 2 + (x2 - x1) ** 2 | ||
) | ||
|
||
|
||
cross_track_component = -np.arctan2( | ||
StanleyController.CROSS_TRACK_GAIN * error_dist, current_speed + StanleyController.K_SOFT | ||
) | ||
|
||
# Calculate yaw rate error | ||
r_meas = yaw_rate | ||
r_traj = current_speed * (trajectory.get_heading_by_index(trajectory.get_index_from_distance(traj_dist) + 0.05) | ||
- trajectory.get_heading_by_index(trajectory.get_index_from_distance(traj_dist))) / 0.05 | ||
|
||
#Determine steering_command | ||
steering_cmd = error_heading + cross_track_component + StanleyController.K_D_YAW * (r_traj - r_meas) | ||
steering_cmd = np.clip(steering_cmd, -np.pi / 9, np.pi / 9) | ||
|
||
|
||
#Calculate error, where x is in orientation of buggy, y is cross track error | ||
current_pose = Pose(current_rospose.position.x, current_rospose.position.y, heading) | ||
reference_error = current_pose.convert_point_from_global_to_local_frame(closest_position) | ||
reference_error -= np.array([StanleyController.WHEELBASE, 0]) #Translae back to back wheel to get accurate error | ||
|
||
error_pose = ROSPose() | ||
error_pose.position.x = reference_error[0] | ||
error_pose.position.y = reference_error[1] | ||
self.debug_error_publisher.publish(error_pose) | ||
|
||
# Publish reference position for debugging | ||
try: | ||
reference_navsat = NavSatFix() | ||
lat, lon = utm.to_latlon(closest_position[0], closest_position[1], 17, "T") | ||
reference_navsat.latitude = lat | ||
reference_navsat.longitude = lon | ||
self.debug_reference_pos_publisher.publish(reference_navsat) | ||
except Exception as e: | ||
self.node.get_logger().warn( | ||
"[Stanley] Unable to convert closest track position lat lon; Error: " + str(e) | ||
) | ||
|
||
return steering_cmd | ||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
Oops, something went wrong.