forked from CMU-Robotics-Club/RoboBuggy2
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
move files from simulation-velocity-port
- Loading branch information
1 parent
07d062c
commit fc57679
Showing
2 changed files
with
176 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
#! /usr/bin/env python3 | ||
import sys | ||
import threading | ||
import tkinter as tk | ||
# from controller_2d import Controller | ||
import rclpy | ||
from rclpy.node import Node | ||
from std_msgs.msg import Float64 | ||
|
||
class VelocityUI(Node): | ||
def __init__(self): | ||
super().__init__('velocity_ui') | ||
self.get_logger().info('INITIALIZED.') | ||
|
||
# Declare parameters with default values | ||
self.declare_parameter('init_vel', 12) | ||
self.declare_parameter('buggy_name', 'SC') | ||
# Get the parameter values | ||
self.init_vel = self.get_parameter("init_vel").value | ||
self.buggy_name = self.get_parameter("buggy_name").value | ||
|
||
# initialize variables | ||
# TODO: uncomment after controller is implemented | ||
# # So the buggy doesn't start moving without user input | ||
# self.buggy_vel = 0 | ||
# # self.controller = Controller(buggy_name) | ||
# self.lock = threading.Lock() | ||
|
||
self.buggy_vel = self.init_vel | ||
|
||
# TODO: remove after controller is implemented | ||
# publish velocity | ||
self.velocity_publisher = self.create_publisher(Float64, "velocity", 1) | ||
|
||
|
||
# TODO: tk is not displaying rn | ||
self.root = tk.Tk() | ||
|
||
self.root.title(self.buggy_name + ' Manual Velocity: scale = 0.1m/s') | ||
self.root.geometry('600x100') | ||
self.root.configure(background='#342d66') | ||
|
||
self.scale = tk.Scale(self.root, from_=0, to=300, orient=tk.HORIZONTAL, | ||
length=500, width=30) | ||
self.scale.pack() | ||
|
||
self.root.bind("<Up>", lambda i: self.scale.set(self.scale.get() + 2)) | ||
self.root.bind("<Down>", lambda d: self.scale.set(self.scale.get() - 2)) | ||
|
||
# ROS2 timer for stepping | ||
# 0.01 is equivalent to 100Hz (100 times per second) | ||
# self.create_timer(0.01, self.step) | ||
|
||
def step(self): | ||
# Sets the velocity of the buggy to the current scale value | ||
# called every tick | ||
self.root.update_idletasks() | ||
self.root.update() | ||
# Update velocity of the buggy | ||
# '/10' set velocity with 0.1 precision | ||
self.buggy_vel = self.scale.get() / 10 | ||
|
||
# TODO: uncomment after controller is implemented | ||
# self.controller.set_velocity(self.buggy_vel) | ||
|
||
# TODO: remove after controller is implemented | ||
float_64_velocity = Float64() | ||
float_64_velocity.data = float(self.buggy_vel) | ||
self.velocity_publisher .publish(float_64_velocity) | ||
|
||
def main(args=None): | ||
rclpy.init(args=args) | ||
vel_ui = VelocityUI() | ||
rclpy.spin(vel_ui) | ||
rclpy.shutdown() | ||
|
||
if __name__ == "__main__": | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
#! /usr/bin/env python3 | ||
import sys | ||
import math | ||
import threading | ||
import rclpy | ||
from rclpy.node import Node | ||
# from controller_2d import Controller | ||
from geometry_msgs.msg import Pose | ||
from geometry_msgs.msg import Point | ||
from std_msgs.msg import Float64 | ||
|
||
class VelocityUpdater(Node): | ||
RATE = 100 | ||
# Bubbles for updating acceleration based on position | ||
# represented as 4-tuples: (x-pos, y-pos, radius, acceleration) | ||
# 'list[tuple[float,float,float,float]]' | ||
# need further update such as more data or import data from certain files | ||
CHECKPOINTS = [ | ||
# (589701, 4477160, 20, 0.5) | ||
(589701, 4477160, 10000000000, 100) # for testing | ||
] | ||
|
||
def __init__(self): | ||
super().__init__('velocity_updater') | ||
self.get_logger().info('INITIALIZED.') | ||
|
||
# Declare parameters with default values | ||
self.declare_parameter('init_vel', 12) | ||
self.declare_parameter('buggy_name', 'SC') | ||
# Get the parameter values | ||
self.init_vel = self.get_parameter("init_vel").value | ||
self.buggy_name = self.get_parameter("buggy_name").value | ||
|
||
# initialize variables | ||
self.buggy_vel = self.init_vel | ||
self.accel = 0.0 | ||
self.position = Point() | ||
# TODO: uncomment after controller is implemented | ||
# self.controller = Controller(self.buggy_name) | ||
# self.lock = threading.Lock() | ||
|
||
# subscribe pose | ||
self.pose_subscriber = self.create_subscription( | ||
Pose, | ||
f"{self.buggy_name}/sim_2d/utm", | ||
self.update_position, | ||
10 # QoS profile | ||
) | ||
|
||
# TODO: remove after controller is implemented | ||
# publish velocity | ||
self.velocity_publisher = self.create_publisher(Float64, "velocity", 1) | ||
|
||
# ROS2 timer for stepping | ||
self.timer = self.create_timer(1.0 / self.RATE, self.step) | ||
|
||
def update_position(self, new_pose: Pose): | ||
'''Callback function to update internal position variable when new | ||
buggy position is published | ||
Args: | ||
new_pose (Pose): Pose object from topic | ||
''' | ||
with self.lock: | ||
self.position = new_pose.position | ||
|
||
def calculate_accel(self): | ||
'''Check if the position of the buggy is in any of the checkpoints set | ||
in self.CHECKPOINTS, and update acceleration of buggy accordingly | ||
''' | ||
for (x, y, r, a) in self.CHECKPOINTS: | ||
dist = math.sqrt((x-self.position.x)**2 + (y-self.position.y)**2) | ||
if dist < r: | ||
self.accel = a | ||
break | ||
|
||
def step(self): | ||
'''Update acceleration and velocity of buggy for one timestep''' | ||
self.calculate_accel() | ||
new_velocity = self.buggy_vel + self.accel / self.RATE | ||
self.buggy_vel = new_velocity | ||
|
||
# TODO: uncomment after controller is implemented | ||
# self.controller.set_velocity(new_velocity) | ||
|
||
# TODO: remove after controller is implemented | ||
float_64_velocity = Float64() | ||
float_64_velocity.data = float(new_velocity) | ||
self.velocity_publisher.publish(float_64_velocity) | ||
|
||
def main(args=None): | ||
rclpy.init(args=args) | ||
vel_updater = VelocityUpdater() | ||
rclpy.spin(vel_updater) | ||
rclpy.shutdown() | ||
|
||
if __name__ == "__main__": | ||
main() |