Skip to content

Commit

Permalink
add test_pub_yaml for ros2topic.
Browse files Browse the repository at this point in the history
Signed-off-by: Tomoya Fujita <[email protected]>
  • Loading branch information
fujitatomoya committed Aug 30, 2024
1 parent c9d666f commit 4adef79
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 0 deletions.
10 changes: 10 additions & 0 deletions ros2topic/test/resources/chatter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
data: 'Hello ROS Users'
---
---
data: Hello ROS Developers
---
data: Hello ROS Developers
---
---
data: 'Hello ROS Users'
39 changes: 39 additions & 0 deletions ros2topic/test/test_echo_pub.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import functools
import pathlib
import sys
import unittest

Expand Down Expand Up @@ -49,6 +50,8 @@
TEST_NODE = 'cli_echo_pub_test_node'
TEST_NAMESPACE = 'cli_echo_pub'

TEST_RESOURCES_DIR = pathlib.Path(__file__).resolve().parent / 'resources'


@pytest.mark.rostest
@launch_testing.markers.keep_alive
Expand Down Expand Up @@ -294,6 +297,42 @@ def test_pub_times(self, launch_service, proc_info, proc_output):
strict=True
)

@launch_testing.markers.retry_on_failure(times=5)
def test_pub_yaml(self, launch_service, proc_info, proc_output):
command_action = ExecuteProcess(
# yaml file prevails to the values 'data: hello'
cmd=(['ros2', 'topic', 'pub', '/clitest/topic/chatter',
'std_msgs/String', 'data: hello', '--yaml-file',
str(TEST_RESOURCES_DIR / 'chatter.yaml')]),
additional_env={
'PYTHONUNBUFFERED': '1'
},
output='screen'
)
with launch_testing.tools.launch_process(
launch_service, command_action, proc_info, proc_output,
output_filter=launch_testing_ros.tools.basic_output_filter(
filtered_rmw_implementation=get_rmw_implementation_identifier()
)
) as command:
assert command.wait_for_shutdown(timeout=10)
assert command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=[
'publisher: beginning loop',
"publishing #1: std_msgs.msg.String(data='Hello ROS Users')",
'',
"publishing #2: std_msgs.msg.String(data='Hello ROS Developers')",
'',
"publishing #3: std_msgs.msg.String(data='Hello ROS Developers')",
'',
"publishing #4: std_msgs.msg.String(data='Hello ROS Users')",
'',
],
text=command.output,
strict=True
)

@launch_testing.markers.retry_on_failure(times=5)
def test_echo_basic(self, launch_service, proc_info, proc_output):
params = [
Expand Down

0 comments on commit 4adef79

Please sign in to comment.