diff --git a/rb_ws/src/buggy/buggy/simulator/velocity_ui.py b/rb_ws/src/buggy/buggy/simulator/velocity_ui.py new file mode 100644 index 0000000..db7a855 --- /dev/null +++ b/rb_ws/src/buggy/buggy/simulator/velocity_ui.py @@ -0,0 +1,63 @@ +#! /usr/bin/env python3 +import sys +import threading +import tkinter as tk +from controller_2d import Controller +import rclpy +from rclpy.node import Node + +class VelocityUI(Node): + def __init__(self, init_vel: float, buggy_name: str): + super().__init__('velocity_ui') + + # So the buggy doesn't start moving without user input + self.buggy_vel = 0 + self.controller = Controller(buggy_name) + self.lock = threading.Lock() + + self.root = tk.Tk() + + self.root.title(buggy_name + ' Manual Velocity: scale = 0.1m/s') + self.root.geometry('600x100') + self.root.configure(background='#342d66') + + self.scale = tk.Scale(self.root, from_=0, to=300, orient=tk.HORIZONTAL, length=500, width=30) + self.scale.pack() + + self.root.bind("", lambda i: self.scale.set(self.scale.get() + 2)) + self.root.bind("", lambda d: self.scale.set(self.scale.get() - 2)) + + # ROS2 timer for stepping + self.create_timer(0.01, self.step) # equivalent to 100Hz (100 times per second) + + def step(self): + # Sets the velocity of the buggy to the current scale value + # called every tick + self.root.update_idletasks() + self.root.update() + # Update velocity of the buggy + self.buggy_vel = self.scale.get() / 10 # set velocity with 0.1 precision + self.controller.set_velocity(self.buggy_vel) + +def main(args=None): + rclpy.init(args=args) + + if len(sys.argv) < 3: + print("Usage: velocity_ui ") + sys.exit(1) + + init_vel = float(sys.argv[1]) + buggy_name = sys.argv[2] + + vel_ui = VelocityUI(init_vel, buggy_name) + + try: + rclpy.spin(vel_ui) + except KeyboardInterrupt: + print("Shutting down velocity UI") + finally: + vel_ui.destroy_node() + rclpy.shutdown() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/rb_ws/src/buggy/buggy/simulator/velocity_updater.py b/rb_ws/src/buggy/buggy/simulator/velocity_updater.py new file mode 100644 index 0000000..5c3b6ae --- /dev/null +++ b/rb_ws/src/buggy/buggy/simulator/velocity_updater.py @@ -0,0 +1,88 @@ +#! /usr/bin/env python3 +import rclpy +from rclpy.node import Node +import sys +from controller_2d import Controller +from geometry_msgs.msg import Pose +from geometry_msgs.msg import Point +import threading +import math + +class VelocityUpdater(Node): + RATE = 100 + # Bubbles for updating acceleration based on position + # represented as 4-tuples: (x-pos, y-pos, radius, acceleration) + CHECKPOINTS = [ + (589701, 4477160, 20, 0.5) + ] + + def __init__(self, init_vel: float, buggy_name: str): + super().__init__('vel_updater') + + self.buggy_vel = init_vel + self.accel = 0.0 + self.position = Point() + self.controller = Controller(buggy_name) + + self.lock = threading.Lock() + + # ROS2 subscription + self.pose_subscriber = self.create_subscription( + Pose, + f"{buggy_name}/sim_2d/utm", + self.update_position, + 10 # QoS profile + ) + + # ROS2 timer for stepping + self.timer = self.create_timer(1.0 / self.RATE, self.step) + + def update_position(self, new_pose: Pose): + '''Callback function to update internal position variable when new + buggy position is published + + Args: + new_pose (Pose): Pose object from topic + ''' + with self.lock: + self.position = new_pose.position + + def calculate_accel(self): + '''Check if the position of the buggy is in any of the checkpoints set + in self.CHECKPOINTS, and update acceleration of buggy accordingly + ''' + for (x, y, r, a) in self.CHECKPOINTS: + dist = math.sqrt((x - self.position.x)**2 + (y - self.position.y)**2) + if dist < r: + self.accel = a + break + + def step(self): + '''Update acceleration and velocity of buggy for one timestep''' + self.calculate_accel() + new_velocity = self.buggy_vel + self.accel / self.RATE + self.buggy_vel = new_velocity + self.controller.set_velocity(new_velocity) + +def main(args=None): + rclpy.init(args=args) + + if len(sys.argv) < 3: + print("Usage: vel_updater ") + sys.exit(1) + + init_vel = float(sys.argv[1]) + buggy_name = sys.argv[2] + + vel_updater = VelocityUpdater(init_vel, buggy_name) + + try: + rclpy.spin(vel_updater) + except KeyboardInterrupt: + print("Shutting down velocity updater") + finally: + vel_updater.destroy_node() + rclpy.shutdown() + +if __name__ == "__main__": + main() \ No newline at end of file