forked from CMU-Robotics-Club/RoboBuggy2
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
edit the original velocity_ui.py and velocity_updater.py to ROS2, not…
… tested yet
- Loading branch information
Showing
2 changed files
with
151 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
#! /usr/bin/env python3 | ||
import sys | ||
import threading | ||
import tkinter as tk | ||
from controller_2d import Controller | ||
import rclpy | ||
from rclpy.node import Node | ||
|
||
class VelocityUI(Node): | ||
def __init__(self, init_vel: float, buggy_name: str): | ||
super().__init__('velocity_ui') | ||
|
||
# So the buggy doesn't start moving without user input | ||
self.buggy_vel = 0 | ||
self.controller = Controller(buggy_name) | ||
self.lock = threading.Lock() | ||
|
||
self.root = tk.Tk() | ||
|
||
self.root.title(buggy_name + ' Manual Velocity: scale = 0.1m/s') | ||
self.root.geometry('600x100') | ||
self.root.configure(background='#342d66') | ||
|
||
self.scale = tk.Scale(self.root, from_=0, to=300, orient=tk.HORIZONTAL, length=500, width=30) | ||
self.scale.pack() | ||
|
||
self.root.bind("<Up>", lambda i: self.scale.set(self.scale.get() + 2)) | ||
self.root.bind("<Down>", lambda d: self.scale.set(self.scale.get() - 2)) | ||
|
||
# ROS2 timer for stepping | ||
self.create_timer(0.01, self.step) # equivalent to 100Hz (100 times per second) | ||
|
||
def step(self): | ||
# Sets the velocity of the buggy to the current scale value | ||
# called every tick | ||
self.root.update_idletasks() | ||
self.root.update() | ||
# Update velocity of the buggy | ||
self.buggy_vel = self.scale.get() / 10 # set velocity with 0.1 precision | ||
self.controller.set_velocity(self.buggy_vel) | ||
|
||
def main(args=None): | ||
rclpy.init(args=args) | ||
|
||
if len(sys.argv) < 3: | ||
print("Usage: velocity_ui <initial_velocity> <buggy_name>") | ||
sys.exit(1) | ||
|
||
init_vel = float(sys.argv[1]) | ||
buggy_name = sys.argv[2] | ||
|
||
vel_ui = VelocityUI(init_vel, buggy_name) | ||
|
||
try: | ||
rclpy.spin(vel_ui) | ||
except KeyboardInterrupt: | ||
print("Shutting down velocity UI") | ||
finally: | ||
vel_ui.destroy_node() | ||
rclpy.shutdown() | ||
|
||
if __name__ == "__main__": | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
#! /usr/bin/env python3 | ||
import rclpy | ||
from rclpy.node import Node | ||
import sys | ||
from controller_2d import Controller | ||
from geometry_msgs.msg import Pose | ||
from geometry_msgs.msg import Point | ||
import threading | ||
import math | ||
|
||
class VelocityUpdater(Node): | ||
RATE = 100 | ||
# Bubbles for updating acceleration based on position | ||
# represented as 4-tuples: (x-pos, y-pos, radius, acceleration) | ||
CHECKPOINTS = [ | ||
(589701, 4477160, 20, 0.5) | ||
] | ||
|
||
def __init__(self, init_vel: float, buggy_name: str): | ||
super().__init__('vel_updater') | ||
|
||
self.buggy_vel = init_vel | ||
self.accel = 0.0 | ||
self.position = Point() | ||
self.controller = Controller(buggy_name) | ||
|
||
self.lock = threading.Lock() | ||
|
||
# ROS2 subscription | ||
self.pose_subscriber = self.create_subscription( | ||
Pose, | ||
f"{buggy_name}/sim_2d/utm", | ||
self.update_position, | ||
10 # QoS profile | ||
) | ||
|
||
# ROS2 timer for stepping | ||
self.timer = self.create_timer(1.0 / self.RATE, self.step) | ||
|
||
def update_position(self, new_pose: Pose): | ||
'''Callback function to update internal position variable when new | ||
buggy position is published | ||
Args: | ||
new_pose (Pose): Pose object from topic | ||
''' | ||
with self.lock: | ||
self.position = new_pose.position | ||
|
||
def calculate_accel(self): | ||
'''Check if the position of the buggy is in any of the checkpoints set | ||
in self.CHECKPOINTS, and update acceleration of buggy accordingly | ||
''' | ||
for (x, y, r, a) in self.CHECKPOINTS: | ||
dist = math.sqrt((x - self.position.x)**2 + (y - self.position.y)**2) | ||
if dist < r: | ||
self.accel = a | ||
break | ||
|
||
def step(self): | ||
'''Update acceleration and velocity of buggy for one timestep''' | ||
self.calculate_accel() | ||
new_velocity = self.buggy_vel + self.accel / self.RATE | ||
self.buggy_vel = new_velocity | ||
self.controller.set_velocity(new_velocity) | ||
|
||
def main(args=None): | ||
rclpy.init(args=args) | ||
|
||
if len(sys.argv) < 3: | ||
print("Usage: vel_updater <initial_velocity> <buggy_name>") | ||
sys.exit(1) | ||
|
||
init_vel = float(sys.argv[1]) | ||
buggy_name = sys.argv[2] | ||
|
||
vel_updater = VelocityUpdater(init_vel, buggy_name) | ||
|
||
try: | ||
rclpy.spin(vel_updater) | ||
except KeyboardInterrupt: | ||
print("Shutting down velocity updater") | ||
finally: | ||
vel_updater.destroy_node() | ||
rclpy.shutdown() | ||
|
||
if __name__ == "__main__": | ||
main() |