|
| 1 | +"""This is a total example node to show off some simple node_helpers features. It's not |
| 2 | +meant to be a comprehensive example, but rather a simple one to show off some of the |
| 3 | +features of the node_helpers package. |
| 4 | +""" |
| 5 | + |
| 6 | +from typing import Any |
| 7 | + |
| 8 | +from pydantic import BaseModel |
| 9 | +from rclpy.qos import qos_profile_services_default |
| 10 | +from sensor_msgs.msg import JointState |
| 11 | + |
| 12 | +from node_helpers.example_urdf import ForkliftURDF |
| 13 | +from node_helpers.nodes import HelpfulNode |
| 14 | +from node_helpers.spinning import create_spin_function |
| 15 | + |
| 16 | + |
| 17 | +class ExampleNode(HelpfulNode): |
| 18 | + class Parameters(BaseModel): |
| 19 | + # Define your ROS parameters here |
| 20 | + forklift_speed: float # m/s |
| 21 | + forklift_max_extent: float |
| 22 | + |
| 23 | + def __init__(self, **kwargs: Any): |
| 24 | + super().__init__("ExampleNode", **kwargs) |
| 25 | + # Load parameters from the ROS parameter server |
| 26 | + self.params = self.declare_from_pydantic_model(self.Parameters, "root_config") |
| 27 | + self.urdf = ForkliftURDF.with_namespace(self.get_namespace()) |
| 28 | + |
| 29 | + # Track the forks position and direction, so we can move them up and down |
| 30 | + self.forklift_position = 0.0 |
| 31 | + self.forklift_direction = 1 |
| 32 | + |
| 33 | + # Create publishers |
| 34 | + self.joint_state_publisher = self.create_publisher( |
| 35 | + JointState, "desired_joint_states", qos_profile_services_default |
| 36 | + ) |
| 37 | + |
| 38 | + # Create a timer to publish joint values |
| 39 | + self._publish_hz = 20 |
| 40 | + self.create_timer(1 / self._publish_hz, self.on_publish_joints) |
| 41 | + |
| 42 | + def on_publish_joints(self) -> None: |
| 43 | + if self.forklift_position >= self.params.forklift_max_extent: |
| 44 | + self.forklift_direction = -1 |
| 45 | + elif self.forklift_position <= 0: |
| 46 | + self.forklift_direction = 1 |
| 47 | + |
| 48 | + self.forklift_position += ( |
| 49 | + self.forklift_direction * self.params.forklift_speed / self._publish_hz |
| 50 | + ) |
| 51 | + |
| 52 | + joint_positions = {self.urdf.joints.FORKS: self.forklift_position} |
| 53 | + |
| 54 | + self.joint_state_publisher.publish( |
| 55 | + JointState( |
| 56 | + name=list(joint_positions.keys()), |
| 57 | + position=list(joint_positions.values()), |
| 58 | + ) |
| 59 | + ) |
| 60 | + |
| 61 | + |
| 62 | +main = create_spin_function(ExampleNode) |
0 commit comments