-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
54 emulate remote controller #63
base: main
Are you sure you want to change the base?
Changes from all commits
d57829b
a3b52c8
4d26270
ebb8b61
f9a639f
b02c79c
5782aa7
d9598c4
f1f51c1
8919031
44fe84c
6df6399
ce1cc62
3edad15
b20fcac
326bb00
a8eec70
65e5098
4039f1e
d03b85b
dcc4558
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,196 @@ | ||
import sys | ||
import select | ||
import termios | ||
import tty | ||
|
||
import rclpy | ||
from rclpy.node import Node | ||
|
||
from reseq_interfaces.msg import Remote | ||
|
||
# retrieve settings from file descriptor (stdin) | ||
try: | ||
sys.stdin.fileno() | ||
boolStdin = True | ||
settings = termios.tcgetattr(sys.stdin) | ||
except Exception: | ||
boolStdin = False | ||
settings = None | ||
|
||
|
||
class EmulatorRemoteController(Node): | ||
def __init__(self): | ||
super().__init__('emulator_remote_controller') | ||
# create teleop_twist_keyboard node | ||
self.emulator = rclpy.create_node('teleop_twist_keyboard') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do you create a second node here? You already have self |
||
self.publisher = self.create_publisher(Remote, '/remote', 10) | ||
self.get_logger().info('EmulatorRemoteController node started') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe that it is not that sensible to provide here this logging message, when some operations are yet to be done during init |
||
# create timer to start the function | ||
self.timer = self.create_timer(0.08, self.readLoop) | ||
self.has_to_exit = False | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The exit is handled using an exception, this is not used. That is not to say that the exception is preferable (it is not) but inn any case this variable is unused |
||
# values should be already automatically be set to 0 | ||
self.previousKey = ' ' # initialise to a random value | ||
self.previousValue = 0.0 | ||
self.increment = 0.1 | ||
self.defaultMessage = Remote() | ||
self.defaultMessage.right.x = 0.0 | ||
self.defaultMessage.right.y = 0.0 | ||
self.defaultMessage.right.z = 0.0 | ||
self.defaultMessage.left.x = 0.0 | ||
self.defaultMessage.left.y = 0.0 | ||
self.defaultMessage.left.z = 0.0 | ||
Comment on lines
+36
to
+41
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are you sure this is needed? (especially given your comment). Why are you inserting un-needed lines of code, do you believe that these improve readability? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moreover I don't think this would ever be needed (you can simply call |
||
|
||
def readKey(self): | ||
tty.setraw(sys.stdin.fileno()) | ||
# add timeout, so select.select() will wait for that time and then it will return | ||
# it returns three lists one for each of the three parameters | ||
readyToRead, _, _ = select.select([sys.stdin], [], [], 0.08) | ||
if readyToRead: | ||
key = sys.stdin.read(1) | ||
else: | ||
# use default key | ||
key = ' ' | ||
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, settings) | ||
return key | ||
Comment on lines
+43
to
+54
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given the specificity of this code, and that it is copied from another repo, I would insert a comment with a permalink to the current implementation in the other package for future reference |
||
|
||
def handleKey(self, key, message=Remote()): | ||
# make lower case | ||
key = key.lower() | ||
if key == 'w': | ||
# print("forward") | ||
if (key == self.previousKey): | ||
message.left.y = min(1.0, self.previousValue * (self.increment+1)) | ||
self.previousValue = min(1.0, self.previousValue * (self.increment+1)) | ||
else: | ||
message.left.y = self.increment | ||
self.previousValue = self.increment | ||
self.previousKey = key | ||
self.publisher.publish(message) | ||
elif key == 's': | ||
# print("backward") | ||
if (key == self.previousKey): | ||
message.left.y = -min(1.0, -self.previousValue * (self.increment+1)) | ||
self.previousValue = -min(1.0, -self.previousValue * (self.increment+1)) | ||
else: | ||
message.left.y = -self.increment | ||
self.previousValue = -self.increment | ||
self.previousKey = key | ||
self.publisher.publish(message) | ||
elif key == 'a': | ||
# print("left") | ||
if (key == self.previousKey): | ||
message.left.x = -min(1.0, -self.previousValue * (self.increment+1)) | ||
self.previousValue = -min(1.0, -self.previousValue * (self.increment+1)) | ||
else: | ||
message.left.x = -self.increment | ||
self.previousValue = -self.increment | ||
self.previousKey = key | ||
self.publisher.publish(message) | ||
elif key == 'd': | ||
# print("right") | ||
if (key == self.previousKey): | ||
message.left.x = min(1.0, self.previousValue * (self.increment+1)) | ||
self.previousValue = min(1.0, self.previousValue * (self.increment+1)) | ||
else: | ||
message.left.x = self.increment | ||
self.previousValue = self.increment | ||
self.previousKey = key | ||
self.publisher.publish(message) | ||
elif key == 'q': | ||
# print("CCW") | ||
if (key == self.previousKey): | ||
message.left.z = -min(1.0, -self.previousValue * (self.increment+1)) | ||
self.previousValue = -min(1.0, -self.previousValue * (self.increment+1)) | ||
else: | ||
message.left.z = -self.increment | ||
self.previousValue = -self.increment | ||
self.previousKey = key | ||
self.publisher.publish(message) | ||
elif key == 'e': | ||
# print("CW") | ||
if (key == self.previousKey): | ||
message.left.z = min(1.0, self.previousValue * (self.increment+1)) | ||
self.previousValue = min(1.0, self.previousValue * (self.increment+1)) | ||
else: | ||
message.left.z = self.increment | ||
self.previousValue = self.increment | ||
self.previousKey = key | ||
self.publisher.publish(message) | ||
# Start cmd_vel commands | ||
elif key == 'i': | ||
# print("forward") | ||
if (key == self.previousKey): | ||
message.right.y = min(1.0, self.previousValue * (self.increment+1)) | ||
self.previousValue = min(1.0, self.previousValue * (self.increment+1)) | ||
else: | ||
message.right.y = self.increment | ||
self.previousValue = self.increment | ||
self.previousKey = key | ||
self.publisher.publish(message) | ||
elif key == 'k': | ||
# print("backward") | ||
if (key == self.previousKey): | ||
message.right.y = -min(1.0, -self.previousValue * (self.increment+1)) | ||
self.previousValue = -min(1.0, -self.previousValue * (self.increment+1)) | ||
else: | ||
message.right.y = -self.increment | ||
self.previousValue = -self.increment | ||
self.previousKey = key | ||
self.publisher.publish(message) | ||
elif key == 'l': | ||
# print("right") | ||
if (key == self.previousKey): | ||
message.right.x = min(1.0, self.previousValue * (self.increment+1)) | ||
self.previousValue = min(1.0, self.previousValue * (self.increment+1)) | ||
else: | ||
message.right.x = self.increment | ||
self.previousValue = self.increment | ||
self.previousKey = key | ||
self.publisher.publish(message) | ||
elif key == 'j': | ||
# print("left") | ||
if (key == self.previousKey): | ||
message.right.x = -min(1.0, -self.previousValue * (self.increment+1)) | ||
self.previousValue = -min(1.0, -self.previousValue * (self.increment+1)) | ||
else: | ||
message.right.x = -self.increment | ||
self.previousValue = -self.increment | ||
self.previousKey = key | ||
self.publisher.publish(message) | ||
# if b pressed, make cmd_vel movements faster | ||
elif key == 'b': | ||
self.increment /= 2 | ||
elif key == 'h': | ||
self.increment *= 2 | ||
elif key == 'z': | ||
raise Exception('Exit requested') | ||
else: | ||
# pass to default | ||
self.publisher.publish(self.defaultMessage) | ||
return self.has_to_exit | ||
Comment on lines
+56
to
+170
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do not get the approach of this code. Why are you creating a new empty message and treating differently the case of it being the same command repeated? Isn't this equal to a simple update of the previous message? def handle_key(self, k):
if k == '...':
self.prevousMessage. ... += inc # constained
SEND self.previous ? |
||
|
||
def readLoop(self): | ||
message = Remote() | ||
key = self.readKey() | ||
if self.handleKey(key, message): | ||
# destroy node | ||
self.emulator.destroy_node() | ||
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, settings) | ||
Comment on lines
+175
to
+178
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as above |
||
|
||
|
||
def main(args=None): | ||
rclpy.init(args=args) | ||
try: | ||
emulator_remote_controller = EmulatorRemoteController() | ||
except Exception as err: | ||
print('Error while starting EmulatorRemoteController node: ' + str(err)) | ||
raise err | ||
else: | ||
rclpy.spin(emulator_remote_controller) | ||
emulator_remote_controller.emulator.destroy_node() | ||
emulator_remote_controller.destroy_node() | ||
rclpy.shutdown() | ||
Comment on lines
+181
to
+192
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The execution point should be uniformed to the most recent ones (which provide the correct error handling and logging) |
||
|
||
|
||
if __name__ == '__main__': | ||
main() |
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is just a fixed trajectory, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is not ideal. At this point, if we have a fixed trajectory, let's just use a pre-recorded rosbag. What do you think? |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
import rclpy | ||
from rclpy.node import Node | ||
from reseq_interfaces.msg import Remote | ||
|
||
|
||
class EmulatorTrajectory(Node): | ||
def __init__(self): | ||
super().__init__('emulator_trajectory') | ||
self.publisher = self.create_publisher(Remote, '/remote', 10) | ||
self.get_logger().info('EmulatorTrajectory node started') | ||
self.timer = self.create_timer(0.08, self.readLoop) | ||
self.circleMessage = Remote() | ||
self.circleMessage.right.x = 0.1 | ||
self.circleMessage.right.y = 0.1 | ||
self.circleMessage.right.z = 0.1 | ||
self.circleMessage.left.x = 0.1 | ||
self.circleMessage.left.y = 0.1 | ||
self.circleMessage.left.z = 0.1 | ||
self.stop_emulator_timer = self.create_timer(60.0, self.stopEmulator) | ||
self.has_to_exit = False | ||
|
||
def readLoop(self): | ||
self.publisher.publish(self.circleMessage) | ||
|
||
def stopEmulator(self): | ||
raise Exception('Exit requested') | ||
|
||
|
||
def main(args=None): | ||
rclpy.init(args=args) | ||
try: | ||
emulator_trajectory = EmulatorTrajectory() | ||
except Exception as err: | ||
print('Error while starting EmulatorTrajectory node: ' + str(err)) | ||
raise err | ||
else: | ||
rclpy.spin(emulator_trajectory) | ||
emulator_trajectory.emulator.destroy_node() | ||
emulator_trajectory.destroy_node() | ||
rclpy.shutdown() | ||
|
||
|
||
if __name__ == '__main__': | ||
main() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this? This seems an anti-pattern. Why do you check for an unexpected exception? Is this because of the tests? In that case I would strongly suggest against modifying for worse the production code to have the tests working.