-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
54 emulate remote controller #63
base: main
Are you sure you want to change the base?
Changes from 16 commits
d57829b
a3b52c8
4d26270
ebb8b61
f9a639f
b02c79c
5782aa7
d9598c4
f1f51c1
8919031
44fe84c
6df6399
ce1cc62
3edad15
b20fcac
326bb00
a8eec70
65e5098
4039f1e
d03b85b
dcc4558
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
import rclpy | ||
from rclpy.node import Node | ||
from reseq_interfaces.msg import Remote | ||
|
||
class EmulatorMonitor(Node): | ||
def __init__(self): | ||
super().__init__("emulator_monitor") | ||
self.subscription = self.create_subscription(Remote,'/remote',self.monitor_callback,10) | ||
self.get_logger().info("EmulatorMonitor node started") | ||
self.msg = None | ||
|
||
def monitor_callback(self, msg): | ||
# self.get_logger().info(f"Received: {msg}") | ||
self.msg = msg | ||
|
||
def main(args = None): | ||
rclpy.init(args = args) | ||
try: | ||
emulator_monitor = EmulatorMonitor() | ||
rclpy.spin(emulator_monitor) | ||
except Exception as err: | ||
print("Error while starting EmulatorMonitor node: " + str(err)) | ||
rclpy.shutdown() | ||
else: | ||
emulator_monitor.destroy_node() | ||
rclpy.shutdown() | ||
|
||
if __name__ == "__main__": | ||
main() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,183 @@ | ||
import rclpy | ||
from reseq_interfaces.msg import Remote | ||
from rclpy.node import Node | ||
import sys, select, termios, tty | ||
|
||
# retrieve settings from file descriptor (stdin) | ||
settings = termios.tcgetattr(sys.stdin) | ||
|
||
class EmulatorRemoteController(Node): | ||
def __init__(self): | ||
super().__init__("emulator_remote_controller") | ||
# create teleop_twist_keyboard node | ||
self.emulator = rclpy.create_node("teleop_twist_keyboard") | ||
self.publisher = self.create_publisher(Remote, '/remote',10) | ||
self.get_logger().info("EmulatorRemoteController node started") | ||
# create timer to start the function | ||
self.timer = self.create_timer(0.08, self.readLoop) | ||
self.has_to_exit = False | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The exit is handled using an exception, this is not used. That is not to say that the exception is preferable (it is not) but inn any case this variable is unused |
||
# values should be already automatically be set to 0 | ||
self.previousKey = ' ' # initialise to a random value | ||
self.previousValue = 0.0 | ||
self.increment = 0.1 | ||
# self.states = {"Normal": self.normal_value, "Double": self.normal_value*2, "Half": self.normal_value/2} | ||
self.defaultMessage = Remote() | ||
self.defaultMessage.right.x = 0.0 | ||
self.defaultMessage.right.y = 0.0 | ||
self.defaultMessage.right.z = 0.0 | ||
self.defaultMessage.left.x = 0.0 | ||
self.defaultMessage.left.y = 0.0 | ||
self.defaultMessage.left.z = 0.0 | ||
Comment on lines
+36
to
+41
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are you sure this is needed? (especially given your comment). Why are you inserting un-needed lines of code, do you believe that these improve readability? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moreover I don't think this would ever be needed (you can simply call |
||
|
||
def readKey(self): | ||
tty.setraw(sys.stdin.fileno()) | ||
# add timeout, so select.select() will wait for that time and then it will return | ||
# it returns three lists one for each of the three parameters | ||
readyToRead, _,_ = select.select([sys.stdin], [], [], 0.08) | ||
if readyToRead: | ||
key = sys.stdin.read(1) | ||
else: | ||
# use default key | ||
key = ' ' | ||
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, settings) | ||
return key | ||
|
||
def handleKey(self, key, message = Remote()): | ||
# make lower case | ||
key = key.lower() | ||
if key == 'w': | ||
# print("forward") | ||
if (key == self.previousKey): | ||
message.left.y = min(1.0,self.previousValue * (self.increment+1)) | ||
self.previousValue = min(1.0,self.previousValue * (self.increment+1)) | ||
else: | ||
message.left.y = self.increment | ||
self.previousValue = self.increment | ||
self.previousKey = key | ||
self.publisher.publish(message) | ||
elif key == 's': | ||
# print("backward") | ||
if (key == self.previousKey): | ||
message.left.y = -min(1.0,-self.previousValue * (self.increment+1)) | ||
self.previousValue = -min(1.0,-self.previousValue * (self.increment+1)) | ||
else: | ||
message.left.y = -self.increment | ||
self.previousValue = -self.increment | ||
self.previousKey = key | ||
self.publisher.publish(message) | ||
elif key == 'a': | ||
# print("left") | ||
if (key == self.previousKey): | ||
message.left.x = -min(1.0,-self.previousValue * (self.increment+1)) | ||
self.previousValue = -min(1.0,-self.previousValue * (self.increment+1)) | ||
else: | ||
message.left.x = -self.increment | ||
self.previousValue = -self.increment | ||
self.previousKey = key | ||
self.publisher.publish(message) | ||
elif key == 'd': | ||
# print("right") | ||
if (key == self.previousKey): | ||
message.left.x = min(1.0,self.previousValue * (self.increment+1)) | ||
self.previousValue = min(1.0,self.previousValue * (self.increment+1)) | ||
else: | ||
message.left.x = self.increment | ||
self.previousValue = self.increment | ||
self.previousKey = key | ||
self.publisher.publish(message) | ||
elif key == 'q': | ||
# print("CCW") | ||
if (key == self.previousKey): | ||
message.left.z = -min(1.0,-self.previousValue * (self.increment+1)) | ||
self.previousValue = -min(1.0,-self.previousValue * (self.increment+1)) | ||
else: | ||
message.left.z = -self.increment | ||
self.previousValue = -self.increment | ||
self.previousKey = key | ||
self.publisher.publish(message) | ||
elif key == 'e': | ||
# print("CW") | ||
if (key == self.previousKey): | ||
message.left.z = min(1.0,self.previousValue * (self.increment+1)) | ||
self.previousValue = min(1.0,self.previousValue * (self.increment+1)) | ||
else: | ||
message.left.z = self.increment | ||
self.previousValue = self.increment | ||
self.previousKey = key | ||
self.publisher.publish(message) | ||
# Start cmd_vel commands | ||
elif key == 'i': | ||
# print("forward") | ||
if (key == self.previousKey): | ||
message.right.y = min(1.0,self.previousValue * (self.increment+1)) | ||
self.previousValue = min(1.0,self.previousValue * (self.increment+1)) | ||
else: | ||
message.right.y = self.increment | ||
self.previousValue = self.increment | ||
self.previousKey = key | ||
self.publisher.publish(message) | ||
elif key == 'k': | ||
# print("backward") | ||
if (key == self.previousKey): | ||
message.right.y = -min(1.0,-self.previousValue * (self.increment+1)) | ||
self.previousValue = -min(1.0,-self.previousValue * (self.increment+1)) | ||
else: | ||
message.right.y = -self.increment | ||
self.previousValue = -self.increment | ||
self.previousKey = key | ||
self.publisher.publish(message) | ||
elif key == 'l': | ||
# print("right") | ||
if (key == self.previousKey): | ||
message.right.x = min(1.0,self.previousValue * (self.increment+1)) | ||
self.previousValue = min(1.0,self.previousValue * (self.increment+1)) | ||
else: | ||
message.right.x = self.increment | ||
self.previousValue = self.increment | ||
self.previousKey = key | ||
self.publisher.publish(message) | ||
elif key == 'j': | ||
# print("left") | ||
if (key == self.previousKey): | ||
message.right.x = -min(1.0,-self.previousValue * (self.increment+1)) | ||
self.previousValue = -min(1.0,-self.previousValue * (self.increment+1)) | ||
else: | ||
message.right.x = -self.increment | ||
self.previousValue = -self.increment | ||
self.previousKey = key | ||
self.publisher.publish(message) | ||
# if b pressed, make cmd_vel movements faster | ||
elif key == 'b': | ||
self.increment /= 2 | ||
elif key == 'h': | ||
self.increment *= 2 | ||
elif key == 'z': | ||
raise Exception("Exit requested") | ||
else: | ||
# pass to default | ||
self.publisher.publish(self.defaultMessage) | ||
return self.has_to_exit | ||
|
||
def readLoop(self): | ||
message = Remote() | ||
key = self.readKey() | ||
if self.handleKey(key, message): | ||
# destroy node | ||
self.emulator.destroy_node() | ||
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, settings) | ||
Comment on lines
+175
to
+178
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as above |
||
|
||
def main(args = None): | ||
rclpy.init(args = args) | ||
try: | ||
emulator_remote_controller = EmulatorRemoteController() | ||
except Exception as err: | ||
print("Error while starting EmulatorRemoteController node: " + str(err)) | ||
raise err | ||
else: | ||
rclpy.spin(emulator_remote_controller) | ||
emulator_remote_controller.emulator.destroy_node() | ||
emulator_remote_controller.destroy_node() | ||
rclpy.shutdown() | ||
|
||
if __name__ == "__main__": | ||
main() |
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is just a fixed trajectory, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is not ideal. At this point, if we have a fixed trajectory, let's just use a pre-recorded rosbag. What do you think? |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
import rclpy | ||
from reseq_interfaces.msg import Remote | ||
from rclpy.node import Node | ||
|
||
class EmulatorTrajectory(Node): | ||
def __init__(self): | ||
super().__init__("emulator_trajectory") | ||
self.publisher = self.create_publisher(Remote, '/remote',10) | ||
self.get_logger().info("EmulatorTrajectory node started") | ||
self.timer = self.create_timer(0.08, self.readLoop) | ||
self.circleMessage = Remote() | ||
self.circleMessage.right.x = 0.1 | ||
self.circleMessage.right.y = 0.1 | ||
self.circleMessage.right.z = 0.1 | ||
self.circleMessage.left.x = 0.1 | ||
self.circleMessage.left.y = 0.1 | ||
self.circleMessage.left.z = 0.1 | ||
self.stop_emulator_timer = self.create_timer(60.0, self.stopEmulator) | ||
self.has_to_exit = False | ||
|
||
def readLoop(self): | ||
self.publisher.publish(self.circleMessage) | ||
|
||
def stopEmulator(self): | ||
raise Exception("Exit requested") | ||
|
||
def main(args = None): | ||
rclpy.init(args = args) | ||
try: | ||
emulator_trajectory = EmulatorTrajectory() | ||
except Exception as err: | ||
print("Error while starting EmulatorTrajectory node: " + str(err)) | ||
raise err | ||
else: | ||
rclpy.spin(emulator_trajectory) | ||
emulator_trajectory.emulator.destroy_node() | ||
emulator_trajectory.destroy_node() | ||
rclpy.shutdown() | ||
|
||
if __name__ == "__main__": | ||
main() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure that we need this kind of node?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deleted