diff --git a/ros2bag/ros2bag/verb/play.py b/ros2bag/ros2bag/verb/play.py index f447e2df71..fcf866f124 100644 --- a/ros2bag/ros2bag/verb/play.py +++ b/ros2bag/ros2bag/verb/play.py @@ -25,6 +25,7 @@ from ros2cli.node import NODE_NAME_PREFIX from rosbag2_py import Player from rosbag2_py import PlayOptions +from rosbag2_py import ServiceRequestsSource from rosbag2_py import StorageOptions import yaml @@ -152,6 +153,15 @@ def add_arguments(self, parser, cli_name): # noqa: D102 'By default, if loaned message can be used, messages are published as loaned ' 'message. It can help to reduce the number of data copies, so there is a greater ' 'benefit for sending big data.') + parser.add_argument( + '--publish-service-requests', action='store_true', default=False, + help='Publish recorded service requests instead of recorded service events') + parser.add_argument( + '--service-requests-source', default='service_introspection', + choices=['service_introspection', 'client_introspection'], + help='Determine the source of the service requests to be replayed. This option only ' + 'makes sense if the "--publish-service-requests" option is set. By default,' + ' the service requests replaying from recorded service introspection message.') def get_playback_until_from_arg_group(self, playback_until_sec, playback_until_nsec) -> int: nano_scale = 1000 * 1000 * 1000 @@ -219,6 +229,12 @@ def main(self, *, args): # noqa: D102 play_options.start_offset = args.start_offset play_options.wait_acked_timeout = args.wait_for_all_acked play_options.disable_loan_message = args.disable_loan_message + play_options.publish_service_requests = args.publish_service_requests + if not args.service_requests_source or \ + args.service_requests_source == 'service_introspection': + play_options.service_requests_source = ServiceRequestsSource.SERVICE_INTROSPECTION + else: + play_options.service_requests_source = ServiceRequestsSource.CLIENT_INTROSPECTION player = Player() try: diff --git a/rosbag2_cpp/include/rosbag2_cpp/service_utils.hpp b/rosbag2_cpp/include/rosbag2_cpp/service_utils.hpp index d68148d37b..11b9f593a3 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/service_utils.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/service_utils.hpp @@ -26,7 +26,7 @@ namespace rosbag2_cpp { ROSBAG2_CPP_PUBLIC bool -is_service_event_topic(const std::string & topic, const std::string & topic_type); +is_service_event_topic(const std::string & topic_name, const std::string & topic_type); // Call this function after is_service_event_topic() return true ROSBAG2_CPP_PUBLIC @@ -38,18 +38,10 @@ ROSBAG2_CPP_PUBLIC std::string service_event_topic_type_to_service_type(const std::string & topic_type); -ROSBAG2_CPP_PUBLIC -size_t -get_serialization_size_for_service_metadata_event(); - ROSBAG2_CPP_PUBLIC std::string service_name_to_service_event_topic_name(const std::string & service_name); -ROSBAG2_CPP_PUBLIC -bool -service_event_include_metadata_and_contents(size_t message_size); - ROSBAG2_CPP_PUBLIC std::string client_id_to_string(std::array & client_id); diff --git a/rosbag2_cpp/src/rosbag2_cpp/service_utils.cpp b/rosbag2_cpp/src/rosbag2_cpp/service_utils.cpp index 62ef91872f..07df54a884 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/service_utils.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/service_utils.cpp @@ -17,128 +17,85 @@ #include "rcl/service_introspection.h" #include "rosbag2_cpp/service_utils.hpp" -#include "rosidl_typesupport_cpp/message_type_support.hpp" -#include "rosidl_typesupport_cpp/message_type_support_dispatch.hpp" -#include "rosidl_typesupport_introspection_cpp/identifier.hpp" -#include "rosidl_typesupport_introspection_cpp/message_introspection.hpp" - #include "service_msgs/msg/service_event_info.hpp" namespace rosbag2_cpp { -const char * service_event_topic_type_postfix = "_Event"; -const char * service_event_topic_type_middle = "/srv/"; +const char * kServiceEventTopicTypePostfix = "_Event"; +const char * kServiceEventTopicTypeMiddle = "/srv/"; +const size_t kServiceEventTopicPostfixLen = strlen(RCL_SERVICE_INTROSPECTION_TOPIC_POSTFIX); +const size_t kServiceEventTypePostfixLen = strlen(kServiceEventTopicTypePostfix); -bool is_service_event_topic(const std::string & topic, const std::string & topic_type) +bool is_service_event_topic(const std::string & topic_name, const std::string & topic_type) { - if (topic.length() <= strlen(RCL_SERVICE_INTROSPECTION_TOPIC_POSTFIX)) { + if (topic_name.length() <= kServiceEventTopicPostfixLen) { return false; + } else { + // The end of the topic name should be "/_service_event" + if (topic_name.substr(topic_name.length() - kServiceEventTopicPostfixLen) != + RCL_SERVICE_INTROSPECTION_TOPIC_POSTFIX) + { + return false; + } } - std::string end_topic_name = topic.substr( - topic.length() - strlen(RCL_SERVICE_INTROSPECTION_TOPIC_POSTFIX)); - - // Should be "/_service_event" - if (end_topic_name != RCL_SERVICE_INTROSPECTION_TOPIC_POSTFIX) { + if (topic_type.length() <= kServiceEventTypePostfixLen) { return false; - } + } else { + // Should include '/srv/' in type + if (topic_type.find(kServiceEventTopicTypeMiddle) == std::string::npos) { + return false; + } - if (topic_type.length() <= std::strlen(service_event_topic_type_postfix)) { - return false; - } - - // Should include '/srv/' in type - if (topic_type.find(service_event_topic_type_middle) == std::string::npos) { - return false; - } + if (topic_type.length() <= kServiceEventTypePostfixLen) { + return false; + } - if (topic_type.length() <= std::strlen(service_event_topic_type_postfix)) { - return false; + return topic_type.compare( + topic_type.length() - kServiceEventTypePostfixLen, + kServiceEventTypePostfixLen, + kServiceEventTopicTypePostfix) == 0; } - - return topic_type.compare( - topic_type.length() - std::strlen(service_event_topic_type_postfix), - std::strlen(service_event_topic_type_postfix), - service_event_topic_type_postfix) == 0; } std::string service_event_topic_name_to_service_name(const std::string & topic_name) { std::string service_name; - if (topic_name.length() <= strlen(RCL_SERVICE_INTROSPECTION_TOPIC_POSTFIX)) { + if (topic_name.length() <= kServiceEventTopicPostfixLen) { return service_name; - } - - if (topic_name.substr( - topic_name.length() - - strlen(RCL_SERVICE_INTROSPECTION_TOPIC_POSTFIX)) != - RCL_SERVICE_INTROSPECTION_TOPIC_POSTFIX) - { + } else { + // The end of the topic name should be "/_service_event" + if (topic_name.substr(topic_name.length() - kServiceEventTopicPostfixLen) != + RCL_SERVICE_INTROSPECTION_TOPIC_POSTFIX) + { + return service_name; + } + service_name = topic_name.substr(0, topic_name.length() - kServiceEventTopicPostfixLen); return service_name; } - - service_name = topic_name.substr( - 0, topic_name.length() - strlen(RCL_SERVICE_INTROSPECTION_TOPIC_POSTFIX)); - - return service_name; } std::string service_event_topic_type_to_service_type(const std::string & topic_type) { std::string service_type; - if (topic_type.length() <= std::strlen(service_event_topic_type_postfix)) { + if (topic_type.length() <= kServiceEventTypePostfixLen) { return service_type; } - // Should include '/srv/' in type - if (topic_type.find(service_event_topic_type_middle) == std::string::npos) { + if (topic_type.find(kServiceEventTopicTypeMiddle) == std::string::npos) { return service_type; } - if (topic_type.substr(topic_type.length() - std::strlen(service_event_topic_type_postfix)) != - service_event_topic_type_postfix) + if (topic_type.substr(topic_type.length() - kServiceEventTypePostfixLen) != + kServiceEventTopicTypePostfix) { return service_type; } - - service_type = topic_type.substr( - 0, topic_type.length() - strlen(service_event_topic_type_postfix)); + service_type = topic_type.substr(0, topic_type.length() - kServiceEventTypePostfixLen); return service_type; } -size_t get_serialization_size_for_service_metadata_event() -{ - // Since the size is fixed, it only needs to be calculated once. - static size_t size = 0; - - if (size != 0) { - return size; - } - - const rosidl_message_type_support_t * type_support_info = - rosidl_typesupport_cpp::get_message_type_support_handle(); - - // Get the serialized size of service event info - const rosidl_message_type_support_t * type_support_handle = - rosidl_typesupport_cpp::get_message_typesupport_handle_function( - type_support_info, - rosidl_typesupport_introspection_cpp::typesupport_identifier); - if (type_support_handle == nullptr) { - throw std::runtime_error("Cannot get ServiceEventInfo typesupport handle !"); - } - - auto service_event_info = - static_cast( - type_support_handle->data); - - // endian type (4 size) + service event info size + empty request (4 bytes) - // + emtpy response (4 bytes) - size = 4 + service_event_info->size_of_ + 4 + 4; - - return size; -} - std::string service_name_to_service_event_topic_name(const std::string & service_name) { if (service_name.empty()) { @@ -146,24 +103,15 @@ std::string service_name_to_service_event_topic_name(const std::string & service } // If the end of string is RCL_SERVICE_INTROSPECTION_TOPIC_POSTFIX, do nothing - if ((service_name.length() > strlen(RCL_SERVICE_INTROSPECTION_TOPIC_POSTFIX)) && - (service_name.substr(service_name.length() - strlen(RCL_SERVICE_INTROSPECTION_TOPIC_POSTFIX)) == + if ((service_name.length() > kServiceEventTopicPostfixLen) && + (service_name.substr(service_name.length() - kServiceEventTopicPostfixLen) == RCL_SERVICE_INTROSPECTION_TOPIC_POSTFIX)) { return service_name; } - return service_name + RCL_SERVICE_INTROSPECTION_TOPIC_POSTFIX; } -bool service_event_include_metadata_and_contents(size_t message_size) -{ - if (message_size <= rosbag2_cpp::get_serialization_size_for_service_metadata_event()) { - return false; - } - return true; -} - std::string client_id_to_string(std::array & client_id) { // output format: diff --git a/rosbag2_cpp/test/rosbag2_cpp/test_service_utils.cpp b/rosbag2_cpp/test/rosbag2_cpp/test_service_utils.cpp index b034d8b629..6434a5ed57 100644 --- a/rosbag2_cpp/test/rosbag2_cpp/test_service_utils.cpp +++ b/rosbag2_cpp/test/rosbag2_cpp/test_service_utils.cpp @@ -97,21 +97,6 @@ TEST_F(ServiceUtilsTest, check_service_name_to_service_event_topic_name) } } -TEST_F(ServiceUtilsTest, check_service_event_include_metadata_and_contents) -{ - auto msg = std::make_shared(); - msg->info.event_type = service_msgs::msg::ServiceEventInfo::REQUEST_SENT; - auto serialized_service_event = memory_management_.serialize_message(msg); - - size_t metadata_event_size = rosbag2_cpp::get_serialization_size_for_service_metadata_event(); - - EXPECT_EQ(serialized_service_event->buffer_length, metadata_event_size); - - EXPECT_FALSE(rosbag2_cpp::service_event_include_metadata_and_contents(metadata_event_size - 1)); - EXPECT_FALSE(rosbag2_cpp::service_event_include_metadata_and_contents(metadata_event_size)); - EXPECT_TRUE(rosbag2_cpp::service_event_include_metadata_and_contents(metadata_event_size + 1)); -} - TEST_F(ServiceUtilsTest, check_client_id_to_string) { service_msgs::msg::ServiceEventInfo::_client_gid_type client_id = { diff --git a/rosbag2_py/rosbag2_py/__init__.py b/rosbag2_py/rosbag2_py/__init__.py index f848f3404b..8338fd1047 100644 --- a/rosbag2_py/rosbag2_py/__init__.py +++ b/rosbag2_py/rosbag2_py/__init__.py @@ -56,6 +56,7 @@ from rosbag2_py._transport import ( Player, PlayOptions, + ServiceRequestsSource, Recorder, RecordOptions, bag_rewrite, @@ -94,6 +95,7 @@ 'Info', 'Player', 'PlayOptions', + 'ServiceRequestsSource', 'Recorder', 'RecordOptions', ] diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index 6f0a205ace..68e6ca2680 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -366,6 +366,13 @@ PYBIND11_MODULE(_transport, m) { &PlayOptions::setPlaybackUntilTimestamp) .def_readwrite("wait_acked_timeout", &PlayOptions::wait_acked_timeout) .def_readwrite("disable_loan_message", &PlayOptions::disable_loan_message) + .def_readwrite("publish_service_requests", &PlayOptions::publish_service_requests) + .def_readwrite("service_requests_source", &PlayOptions::service_requests_source) + ; + + py::enum_(m, "ServiceRequestsSource") + .value("SERVICE_INTROSPECTION", rosbag2_transport::ServiceRequestsSource::SERVICE_INTROSPECTION) + .value("CLIENT_INTROSPECTION", rosbag2_transport::ServiceRequestsSource::CLIENT_INTROSPECTION) ; py::class_(m, "RecordOptions") diff --git a/rosbag2_transport/include/rosbag2_transport/play_options.hpp b/rosbag2_transport/include/rosbag2_transport/play_options.hpp index 18863c6852..e5029c923a 100644 --- a/rosbag2_transport/include/rosbag2_transport/play_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/play_options.hpp @@ -28,6 +28,11 @@ namespace rosbag2_transport { +enum class ServiceRequestsSource : int8_t +{ + SERVICE_INTROSPECTION = 0, + CLIENT_INTROSPECTION = 1 +}; struct PlayOptions { @@ -112,6 +117,12 @@ struct PlayOptions // Disable to publish as loaned message bool disable_loan_message = false; + + // Publish service requests instead of service events + bool publish_service_requests = false; + + // The source of the service request + ServiceRequestsSource service_requests_source = ServiceRequestsSource::SERVICE_INTROSPECTION; }; } // namespace rosbag2_transport diff --git a/rosbag2_transport/include/rosbag2_transport/player.hpp b/rosbag2_transport/include/rosbag2_transport/player.hpp index be623a9749..d83bb69d23 100644 --- a/rosbag2_transport/include/rosbag2_transport/player.hpp +++ b/rosbag2_transport/include/rosbag2_transport/player.hpp @@ -254,6 +254,19 @@ class Player : public rclcpp::Node ROSBAG2_TRANSPORT_PUBLIC void delete_on_play_message_callback(const callback_handle_t & handle); + /// Wait until sent service requests will receive responses from service servers. + /// \note The player node shall be spun in the executor in a parallel thread to be able to wait + /// for responses. + /// \param service_name - Name of the service or service event from what to wait responses. + /// \note is service_name is empty the function will wait until all service requests sent to all + /// service servers will finish. Timeout in this cases will be used for each service name. + /// \param timeout - Timeout in fraction of seconds to wait for. + /// \return true if service requests successfully finished, otherwise false. + ROSBAG2_TRANSPORT_PUBLIC + bool wait_for_sent_service_requests_to_finish( + const std::string & service_name, + std::chrono::duration timeout = std::chrono::seconds(5)); + protected: /// \brief Getter for publishers corresponding to each topic /// \return Hashtable representing topic to publisher map excluding inner clock_publisher @@ -263,7 +276,7 @@ class Player : public rclcpp::Node /// \brief Getter for clients corresponding to each service name /// \return Hashtable representing service name to client ROSBAG2_TRANSPORT_PUBLIC - std::unordered_map> get_services_clients(); + std::unordered_map> get_service_clients(); /// \brief Getter for inner clock_publisher /// \return Shared pointer to the inner clock_publisher diff --git a/rosbag2_transport/include/rosbag2_transport/player_service_client.hpp b/rosbag2_transport/include/rosbag2_transport/player_service_client.hpp index 96476e3c7b..aead1e268c 100644 --- a/rosbag2_transport/include/rosbag2_transport/player_service_client.hpp +++ b/rosbag2_transport/include/rosbag2_transport/player_service_client.hpp @@ -19,7 +19,6 @@ #include #include #include -#include #include #include #include @@ -38,6 +37,9 @@ class PlayerServiceClientManager; class PlayerServiceClient final { public: + using ServiceEventType = service_msgs::msg::ServiceEventInfo::_event_type_type; + using ClientGidType = service_msgs::msg::ServiceEventInfo::_client_gid_type; + explicit PlayerServiceClient( std::shared_ptr generic_client, @@ -46,41 +48,47 @@ class PlayerServiceClient final rclcpp::Logger logger, std::shared_ptr player_service_client_manager); - // Note: Call this function only if is_include_request_message() return true - void async_send_request(const rcl_serialized_message_t & message); + const std::string & get_service_name(); + + /// \brief Deserialize message to the type erased service event + /// \param message - Serialized message + /// \return Shared pointer to the byte array with deserialized service event if success, + /// otherwise nullptr + std::shared_ptr deserialize_service_event(const rcl_serialized_message_t & message); + + std::tuple + get_service_event_type_and_client_gid(const std::shared_ptr type_erased_service_event); + + bool is_service_event_include_request_message( + const std::shared_ptr type_erased_service_event); + + void async_send_request(const std::shared_ptr type_erased_service_event); + + /// Wait until sent service requests will receive responses from service servers. + /// \param service_name - Name of the service or service event from what to wait responses. + /// \param timeout - Timeout in fraction of seconds to wait for. + /// \return true if service requests successfully finished, otherwise false. + bool wait_for_sent_requests_to_finish( + std::chrono::duration timeout = std::chrono::seconds(5)); std::shared_ptr generic_client() { return client_; } - // Check if message can be unpacked to get request message - bool is_include_request_message(const rcl_serialized_message_t & message); - private: std::shared_ptr client_; std::string service_name_; const rclcpp::Logger logger_; std::shared_ptr player_service_client_manager_; - enum class request_info_from - { - SERVICE = 0, - CLIENT, - NO_CONTENT // Only have META info. Not send request. - }; - bool service_set_introspection_content_ = false; - - using client_id = service_msgs::msg::ServiceEventInfo::_client_gid_type; - // Info on request data from service or client - std::unordered_map request_info_; + // Note: The service_event_ts_lib_ shall be a member variable to make sure that library loaded + // during the liveliness of the instance of this class, since we have raw pointers to its members. + std::shared_ptr service_event_ts_lib_; const rosidl_message_type_support_t * service_event_type_ts_; const rosidl_typesupport_introspection_cpp::MessageMembers * service_event_members_; rcutils_allocator_t allocator_ = rcutils_get_default_allocator(); - - std::tuple - get_msg_event_type(const rcl_serialized_message_t & message); }; class PlayerServiceClientManager final @@ -94,15 +102,17 @@ class PlayerServiceClientManager final bool request_future_queue_is_full(); bool register_request_future( - rclcpp::GenericClient::FutureAndRequestId & request_future, + rclcpp::GenericClient::FutureAndRequestId && request_future, std::weak_ptr client); + bool wait_for_all_futures(std::chrono::duration timeout = std::chrono::seconds(5)); + private: using time_point = std::chrono::steady_clock::time_point; using ptr_future_and_request_id = std::unique_ptr; - using request_id_and_client_t = + using future_request_id_and_client_t = std::pair>; - std::map request_futures_list_; + std::map request_futures_list_; std::mutex request_futures_list_mutex_; std::chrono::seconds request_future_timeout_; diff --git a/rosbag2_transport/src/rosbag2_transport/config_options_from_node_params.cpp b/rosbag2_transport/src/rosbag2_transport/config_options_from_node_params.cpp index 2454e22c9c..ea4329c678 100644 --- a/rosbag2_transport/src/rosbag2_transport/config_options_from_node_params.cpp +++ b/rosbag2_transport/src/rosbag2_transport/config_options_from_node_params.cpp @@ -218,6 +218,25 @@ PlayOptions get_play_options_from_node_params(rclcpp::Node & node) play_options.disable_loan_message = node.declare_parameter("play.disable_loan_message", false); + // "SERVICE_INTROSPECTION" or "CLIENT_INTROSPECTION" + auto service_requests_source = + node.declare_parameter("play.service_requests_source", "SERVICE_INTROSPECTION"); + if (service_requests_source == "SERVICE_INTROSPECTION") { + play_options.service_requests_source = ServiceRequestsSource::SERVICE_INTROSPECTION; + } else if (service_requests_source == "CLIENT_INTROSPECTION") { + play_options.service_requests_source = ServiceRequestsSource::CLIENT_INTROSPECTION; + } else { + play_options.service_requests_source = ServiceRequestsSource::SERVICE_INTROSPECTION; + RCLCPP_ERROR( + node.get_logger(), + "play.service_requests_source doesn't support %s. It must be one of SERVICE_INTROSPECTION" + " and CLIENT_INTROSPECTION. Changed it to default value SERVICE_INTROSPECTION.", + service_requests_source.c_str()); + } + + play_options.publish_service_requests = + node.declare_parameter("play.publish_service_request", false); + return play_options; } diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index 09d1e23763..56ed518520 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -21,7 +21,6 @@ #include #include #include -#include #include #include @@ -36,7 +35,6 @@ #include "rosbag2_cpp/clocks/time_controller_clock.hpp" #include "rosbag2_cpp/reader.hpp" #include "rosbag2_cpp/service_utils.hpp" -#include "rosbag2_cpp/typesupport_helpers.hpp" #include "rosbag2_storage/storage_filter.hpp" #include "rosbag2_storage/qos.hpp" @@ -173,13 +171,21 @@ class PlayerImpl /// #add_on_play_message_post_callback void delete_on_play_message_callback(const callback_handle_t & handle); + /// TBD + /// \param service_name + /// \param timeout + /// \return + bool wait_for_sent_service_requests_to_finish( + const std::string & service_name, + std::chrono::duration timeout = std::chrono::seconds(5)); + /// \brief Getter for publishers corresponding to each topic /// \return Hashtable representing topic to publisher map excluding inner clock_publisher std::unordered_map> get_publishers(); /// \brief Getter for clients corresponding to services /// \return Hashtable representing service name to client map - std::unordered_map> get_services_clients(); + std::unordered_map> get_service_clients(); /// \brief Getter for inner clock_publisher /// \return Shared pointer to the inner clock_publisher @@ -789,6 +795,29 @@ void PlayerImpl::delete_on_play_message_callback(const callback_handle_t & handl }); } +bool PlayerImpl::wait_for_sent_service_requests_to_finish( + const std::string & service_name, + std::chrono::duration timeout) +{ + bool is_requests_complete = true; + if (!service_name.empty()) { + auto service_event_name = rosbag2_cpp::service_name_to_service_event_topic_name(service_name); + auto client_iter = service_clients_.find(service_event_name); + if (client_iter != service_clients_.end()) { + is_requests_complete = client_iter->second->wait_for_sent_requests_to_finish(timeout); + } else { + is_requests_complete = false; + } + } else { + for (const auto & [service_event_name, client] : service_clients_) { + if (!client->wait_for_sent_requests_to_finish(timeout)) { + return false; + } + } + } + return is_requests_complete; +} + std::unordered_map> PlayerImpl::get_publishers() { @@ -800,7 +829,7 @@ std::unordered_map> PlayerImpl::get_services_clients() + std::shared_ptr> PlayerImpl::get_service_clients() { std::unordered_map> topic_to_client_map; for (const auto & [service_name, client] : service_clients_) { @@ -936,7 +965,7 @@ namespace { bool allow_topic( bool is_service, - const std::string topic_name, + const std::string & topic_name, const rosbag2_storage::StorageFilter & storage_filter) { auto & include_topics = storage_filter.topics; @@ -1060,7 +1089,8 @@ void PlayerImpl::prepare_publishers() auto topics = reader_->get_all_topics_and_types(); std::string topic_without_support_acked; for (const auto & topic : topics) { - if (rosbag2_cpp::is_service_event_topic(topic.name, topic.type)) { + const bool is_service_event_topic = rosbag2_cpp::is_service_event_topic(topic.name, topic.type); + if (play_options_.publish_service_requests && is_service_event_topic) { // Check if sender was created if (service_clients_.find(topic.name) != service_clients_.end()) { continue; @@ -1074,11 +1104,11 @@ void PlayerImpl::prepare_publishers() auto service_name = rosbag2_cpp::service_event_topic_name_to_service_name(topic.name); auto service_type = rosbag2_cpp::service_event_topic_type_to_service_type(topic.type); try { - auto cli = owner_->create_generic_client(service_name, service_type); - auto player_cli = std::make_shared( - std::move(cli), service_name, topic.type, owner_->get_logger(), + auto generic_client = owner_->create_generic_client(service_name, service_type); + auto player_client = std::make_shared( + std::move(generic_client), service_name, topic.type, owner_->get_logger(), player_service_client_manager_); - service_clients_.insert(std::make_pair(topic.name, player_cli)); + service_clients_.insert(std::make_pair(topic.name, player_client)); } catch (const std::runtime_error & e) { RCLCPP_WARN( owner_->get_logger(), @@ -1091,7 +1121,7 @@ void PlayerImpl::prepare_publishers() } // filter topics to add publishers if necessary - if (!allow_topic(false, topic.name, storage_filter)) { + if (!allow_topic(is_service_event_topic, topic.name, storage_filter)) { continue; } @@ -1170,13 +1200,11 @@ void PlayerImpl::run_play_msg_post_callbacks( bool PlayerImpl::publish_message(rosbag2_storage::SerializedBagMessageSharedPtr message) { - bool message_published = false; - auto pub_iter = publishers_.find(message->topic_name); if (pub_iter != publishers_.end()) { // Calling on play message pre-callbacks run_play_msg_pre_callbacks(message); - + bool message_published = false; try { pub_iter->second->publish(rclcpp::SerializedMessage(*message->serialized_data)); message_published = true; @@ -1191,17 +1219,78 @@ bool PlayerImpl::publish_message(rosbag2_storage::SerializedBagMessageSharedPtr return message_published; } + // Try to publish message as service request auto client_iter = service_clients_.find(message->topic_name); - if (client_iter != service_clients_.end()) { - if (!client_iter->second->is_include_request_message(*message->serialized_data)) { - return message_published; + if (play_options_.publish_service_requests && client_iter != service_clients_.end()) { + const auto & service_client = client_iter->second; + auto service_event = service_client->deserialize_service_event(*message->serialized_data); + if (!service_event) { + RCLCPP_ERROR_STREAM( + owner_->get_logger(), "Failed to deserialize service event message for '" << + service_client->get_service_name() << "' service!\n"); + return false; + } + + try { + auto [service_event_type, client_gid] = + service_client->get_service_event_type_and_client_gid(service_event); + // Ignore response message + if (service_event_type == service_msgs::msg::ServiceEventInfo::RESPONSE_SENT || + service_event_type == service_msgs::msg::ServiceEventInfo::RESPONSE_RECEIVED) + { + return false; + } + + if (play_options_.service_requests_source == ServiceRequestsSource::SERVICE_INTROSPECTION && + service_event_type != service_msgs::msg::ServiceEventInfo::REQUEST_RECEIVED) + { + return false; + } + + if (play_options_.service_requests_source == ServiceRequestsSource::CLIENT_INTROSPECTION && + service_event_type != service_msgs::msg::ServiceEventInfo::REQUEST_SENT) + { + return false; + } + + if (!service_client->generic_client()->service_is_ready()) { + RCLCPP_ERROR( + owner_->get_logger(), "Service request hasn't been sent. The '%s' service isn't ready !", + service_client->get_service_name().c_str()); + return false; + } + + if (!service_client->is_service_event_include_request_message(service_event)) { + if (service_event_type == service_msgs::msg::ServiceEventInfo::REQUEST_RECEIVED) { + RCUTILS_LOG_WARN_ONCE_NAMED( + ROSBAG2_TRANSPORT_PACKAGE_NAME, + "Can't send service request. " + "The configuration of introspection for '%s' was metadata only on service side!", + service_client->get_service_name().c_str()); + } else if (service_event_type == service_msgs::msg::ServiceEventInfo::REQUEST_SENT) { + RCUTILS_LOG_WARN_ONCE_NAMED( + ROSBAG2_TRANSPORT_PACKAGE_NAME, + "Can't send service request. " + "The configuration of introspection for '%s' client [ID: %s]` was metadata only!", + service_client->get_service_name().c_str(), + rosbag2_cpp::client_id_to_string(client_gid).c_str()); + } + return false; + } + } catch (const std::exception & e) { + RCLCPP_ERROR_STREAM( + owner_->get_logger(), "Failed to send request on '" << + rosbag2_cpp::service_event_topic_name_to_service_name(message->topic_name) << + "' service. \nError: " << e.what()); + return false; } // Calling on play message pre-callbacks run_play_msg_pre_callbacks(message); + bool message_published = false; try { - client_iter->second->async_send_request(*message->serialized_data); + service_client->async_send_request(service_event); message_published = true; } catch (const std::exception & e) { RCLCPP_ERROR_STREAM( @@ -1215,9 +1304,11 @@ bool PlayerImpl::publish_message(rosbag2_storage::SerializedBagMessageSharedPtr return message_published; } - RCLCPP_WARN_STREAM( - owner_->get_logger(), "Not find sender for topic '" << message->topic_name << "' topic."); - return message_published; + RCUTILS_LOG_WARN_ONCE_NAMED( + ROSBAG2_TRANSPORT_PACKAGE_NAME, + "Publisher for topic '%s' not found", message->topic_name.c_str()); + + return false; } void PlayerImpl::add_key_callback( @@ -1576,15 +1667,21 @@ void Player::delete_on_play_message_callback(const Player::callback_handle_t & h pimpl_->delete_on_play_message_callback(handle); } +bool Player::wait_for_sent_service_requests_to_finish( + const std::string & service_name, std::chrono::duration timeout) +{ + return pimpl_->wait_for_sent_service_requests_to_finish(service_name, timeout); +} + std::unordered_map> Player::get_publishers() { return pimpl_->get_publishers(); } std::unordered_map> Player::get_services_clients() + std::shared_ptr> Player::get_service_clients() { - return pimpl_->get_services_clients(); + return pimpl_->get_service_clients(); } rclcpp::Publisher::SharedPtr Player::get_clock_publisher() diff --git a/rosbag2_transport/src/rosbag2_transport/player_service_client.cpp b/rosbag2_transport/src/rosbag2_transport/player_service_client.cpp index d54ff93cea..fa82c4a977 100644 --- a/rosbag2_transport/src/rosbag2_transport/player_service_client.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player_service_client.cpp @@ -24,6 +24,8 @@ #include "logging.hpp" +using namespace std::chrono_literals; + namespace rosbag2_transport { @@ -38,11 +40,11 @@ PlayerServiceClient::PlayerServiceClient( logger_(std::move(logger)), player_service_client_manager_(std::move(player_service_client_manager)) { - auto service_event_ts_lib = rclcpp::get_typesupport_library( - service_event_type, "rosidl_typesupport_cpp"); + service_event_ts_lib_ = + rclcpp::get_typesupport_library(service_event_type, "rosidl_typesupport_cpp"); service_event_type_ts_ = rclcpp::get_message_typesupport_handle( - service_event_type, "rosidl_typesupport_cpp", *service_event_ts_lib); + service_event_type, "rosidl_typesupport_cpp", *service_event_ts_lib_); auto service_event_ts_introspection = get_message_typesupport_handle( service_event_type_ts_, rosidl_typesupport_introspection_cpp::typesupport_identifier); @@ -50,172 +52,128 @@ PlayerServiceClient::PlayerServiceClient( service_event_members_ = reinterpret_cast( service_event_ts_introspection->data); -} - -bool PlayerServiceClient::is_include_request_message(const rcl_serialized_message_t & message) -{ - auto [type, client_id, sequence_number] = get_msg_event_type(message); - // Ignore response message - if (type == service_msgs::msg::ServiceEventInfo::RESPONSE_SENT || - type == service_msgs::msg::ServiceEventInfo::RESPONSE_RECEIVED) - { - return false; + // Sanity checks for service_event_members_ + if (service_event_members_ == nullptr) { + throw std::invalid_argument("service_event_members_ for `" + service_name_ + "` is nullptr"); + } + if (service_event_members_->member_count_ != 3) { + // members_[0]: service_info, members_[1]: request[<=1], members_[2]: response[<=1] + std::stringstream ss; + ss << "Expected 3 fields in the service introspection message, but got " << + service_event_members_->member_count_; + throw std::invalid_argument(ss.str()); } - bool ret = false; - - // For each Client, decide which request data to use based on the first message related to - // the request that is obtained from the record data. - // e.g. - - auto iter = request_info_.find(client_id); - if (type == service_msgs::msg::ServiceEventInfo::REQUEST_RECEIVED) { - if (!service_set_introspection_content_) { - if (rosbag2_cpp::service_event_include_metadata_and_contents(message.buffer_length)) { - service_set_introspection_content_ = true; - } - } + if (!service_event_members_->members_[1].is_array_) { + std::stringstream ss; + ss << "The service request for '" << service_name_ << "' is not array.\n"; + throw std::invalid_argument(ss.str()); + } - if (iter != request_info_.end()) { - switch (iter->second) { - case request_info_from::CLIENT: - { - // Already decide using request data from client. - break; - } - case request_info_from::NO_CONTENT: - { - if (service_set_introspection_content_) { - // introspection type is changed from metadata to metadata + contents - request_info_[client_id] = request_info_from::SERVICE; - ret = true; - } else { - RCUTILS_LOG_WARN_ONCE_NAMED( - ROSBAG2_TRANSPORT_PACKAGE_NAME, - "The configuration of introspection for '%s' is metadata on service side !", - service_name_.c_str()); - } - break; - } - default: // request_info_from::SERVICE: - { - // Already decide using request data from service. - ret = true; - } - } - } else { - if (service_set_introspection_content_) { - request_info_[client_id] = request_info_from::SERVICE; - ret = true; - } else { - request_info_[client_id] = request_info_from::NO_CONTENT; // Only have metadata - } - } + if (service_event_members_->members_[1].size_function == nullptr) { + std::stringstream ss; + ss << "size_function() for service request '" << service_name_ << "' is not set.\n"; + throw std::invalid_argument(ss.str()); + } - return ret; + if (service_event_members_->members_[1].get_function == nullptr) { + std::stringstream ss; + ss << "get_function() for service request '" << service_name_ << "' is not set.\n"; + throw std::invalid_argument(ss.str()); } - // type is service_msgs::msg::ServiceEventInfo::REQUEST_SENT - if (iter != request_info_.end()) { - switch (iter->second) { - case request_info_from::CLIENT: - { - // Already decide using request data from client. - ret = true; - break; - } - case request_info_from::NO_CONTENT: - { - if (rosbag2_cpp::service_event_include_metadata_and_contents(message.buffer_length)) { - // introspection type is changed from metadata to metadata + contents - request_info_[client_id] = request_info_from::CLIENT; - ret = true; - } else { - RCUTILS_LOG_WARN_ONCE_NAMED( - ROSBAG2_TRANSPORT_PACKAGE_NAME, - "The configuration of introspection for '%s' client [ID: %s]` is metadata !", - rosbag2_cpp::client_id_to_string(client_id).c_str(), - service_name_.c_str()); - } - break; - } - default: // request_info_from::SERVICE: - { - // Already decide using request data from service. - ret = false; - } - } - } else { - if (rosbag2_cpp::service_event_include_metadata_and_contents(message.buffer_length)) { - request_info_[client_id] = request_info_from::CLIENT; - ret = true; - } else { - request_info_[client_id] = request_info_from::NO_CONTENT; - } + if (service_event_members_->init_function == nullptr) { + std::stringstream ss; + ss << "service_event_members_->init_function for '" << service_name_ << "' is not set.\n"; + throw std::invalid_argument(ss.str()); } - return ret; + if (service_event_members_->fini_function == nullptr) { + std::stringstream ss; + ss << "service_event_members_->fini_function for '" << service_name_ << "' is not set.\n"; + throw std::invalid_argument(ss.str()); + } } -void PlayerServiceClient::async_send_request(const rcl_serialized_message_t & message) +const std::string & PlayerServiceClient::get_service_name() { - int ret = RMW_RET_OK; - - { - auto ros_message = std::make_unique(service_event_members_->size_of_); - - service_event_members_->init_function( - ros_message.get(), rosidl_runtime_cpp::MessageInitialization::ZERO); - - ret = rmw_deserialize( - &message, service_event_type_ts_, ros_message.get()); - if (ret == RMW_RET_OK) { - if (client_->service_is_ready()) { - // members_[0]: info, members_[1]: request, members_[2]: response - auto request_offset = service_event_members_->members_[1].offset_; - auto request_addr = reinterpret_cast(ros_message.get()) + request_offset; - auto future_and_request_id = client_->async_send_request( - reinterpret_cast(*reinterpret_cast(request_addr))); - player_service_client_manager_->register_request_future(future_and_request_id, client_); - } else { - RCLCPP_ERROR( - logger_, "Service request hasn't been sent. The '%s' service isn't ready !", - service_name_.c_str()); - } - } + return service_name_; +} - service_event_members_->fini_function(ros_message.get()); +std::shared_ptr +PlayerServiceClient::deserialize_service_event(const rcl_serialized_message_t & message) +{ + auto type_erased_service_event = std::shared_ptr( + new uint8_t[service_event_members_->size_of_], + [fini_function = this->service_event_members_->fini_function](uint8_t * msg) { + fini_function(msg); + delete[] msg; + }); + + service_event_members_->init_function( + type_erased_service_event.get(), rosidl_runtime_cpp::MessageInitialization::ZERO); + + rmw_ret_t ret = + rmw_deserialize(&message, service_event_type_ts_, type_erased_service_event.get()); + if (ret != RMW_RET_OK) { // Failed to deserialize service event message + type_erased_service_event.reset(); } + return type_erased_service_event; +} - if (ret != RMW_RET_OK) { - throw std::runtime_error( - "Failed to deserialize service event message for " + service_name_ + " !"); +std::tuple +PlayerServiceClient::get_service_event_type_and_client_gid( + const std::shared_ptr type_erased_service_event) +{ + if (type_erased_service_event) { + // members_[0]: service_info, members_[1]: request[<=1], members_[2]: response[<=1] + const auto & info_member = service_event_members_->members_[0]; + + auto service_event_info_ptr = reinterpret_cast( + type_erased_service_event.get() + info_member.offset_); + if (service_event_info_ptr == nullptr) { + throw std::runtime_error("Error: The service_event_info_ptr is nullptr"); + } + return {service_event_info_ptr->event_type, service_event_info_ptr->client_gid}; + } else { + throw std::invalid_argument("Error: The type_erased_service_event is nullptr"); } } -std::tuple -PlayerServiceClient::get_msg_event_type(const rcl_serialized_message_t & message) +bool PlayerServiceClient::is_service_event_include_request_message( + const std::shared_ptr type_erased_service_event) { - auto msg = service_msgs::msg::ServiceEventInfo(); - - const rosidl_message_type_support_t * type_support_info = - rosidl_typesupport_cpp:: - get_message_type_support_handle(); - if (type_support_info == nullptr) { - throw std::runtime_error( - "Failed to get message type support handle of service event info !"); + if (type_erased_service_event) { + // members_[0]: service_info, members_[1]: request[<=1], members_[2]: response[<=1] + const auto & request_member = service_event_members_->members_[1]; + void * request_sequence_ptr = type_erased_service_event.get() + request_member.offset_; + if (request_member.size_function(request_sequence_ptr) > 0) { + return true; + } // else { /* No service request */ } + } else { + throw std::invalid_argument("Error: The type_erased_service_event is nullptr"); } + return false; +} - auto ret = rmw_deserialize( - &message, - type_support_info, - reinterpret_cast(&msg)); - if (ret != RMW_RET_OK) { - throw std::runtime_error("Failed to deserialize message !"); - } +void PlayerServiceClient::async_send_request( + const std::shared_ptr type_erased_service_event) +{ + // members_[0]: service_info, members_[1]: request[<=1], members_[2]: response[<=1] + const auto & request_member = service_event_members_->members_[1]; + void * request_sequence_ptr = type_erased_service_event.get() + request_member.offset_; + if (request_member.size_function(request_sequence_ptr) > 0) { + void * request_ptr = request_member.get_function(request_sequence_ptr, 0); + auto future_and_request_id = client_->async_send_request(request_ptr); + player_service_client_manager_->register_request_future( + std::move(future_and_request_id), client_); + } // else { /* No service request in the service event. Do nothing, just skip it. */ } +} - return {msg.event_type, msg.client_gid, msg.sequence_number}; +bool PlayerServiceClient::wait_for_sent_requests_to_finish(std::chrono::duration timeout) +{ + return player_service_client_manager_->wait_for_all_futures(timeout); } PlayerServiceClientManager::PlayerServiceClientManager( @@ -236,10 +194,7 @@ bool PlayerServiceClientManager::request_future_queue_is_full() return false; } - // Remove complete request future remove_complete_request_future(); - - // Remove all timeout request future remove_all_timeout_request_future(); if (request_futures_list_.size() == maximum_request_future_queue_) { @@ -250,7 +205,7 @@ bool PlayerServiceClientManager::request_future_queue_is_full() } bool PlayerServiceClientManager::register_request_future( - rclcpp::GenericClient::FutureAndRequestId & request_future, + rclcpp::GenericClient::FutureAndRequestId && request_future, std::weak_ptr client) { auto future_and_request_id = @@ -258,28 +213,59 @@ bool PlayerServiceClientManager::register_request_future( if (!request_future_queue_is_full()) { std::lock_guard lock(request_futures_list_mutex_); - request_futures_list_[std::chrono::steady_clock::now()] = - {std::move(future_and_request_id), client}; - return true; + auto emplace_result = request_futures_list_.emplace( + std::chrono::steady_clock::now(), std::make_pair(std::move(future_and_request_id), client)); + return emplace_result.second; } else { ROSBAG2_TRANSPORT_LOG_WARN( "Client request queue is full. " "Please consider increasing the length of the queue."); } - return false; } +bool PlayerServiceClientManager::wait_for_all_futures(std::chrono::duration timeout) +{ + auto is_all_futures_ready = [&]() { + bool is_ready = true; + for (auto & [timestamp, future_request_id_and_client] : request_futures_list_) { + if (!future_request_id_and_client.first->future.valid()) { + std::stringstream ss; + ss << "request's " << future_request_id_and_client.first->request_id << + " future is not valid!\n"; + throw std::runtime_error(ss.str()); + } + if (future_request_id_and_client.first->wait_for(0s) != std::future_status::ready) { + return false; + } + } + return is_ready; + }; + + auto sleep_time = std::chrono::milliseconds(10); + if (timeout < std::chrono::seconds(1)) { + sleep_time = std::chrono::duration_cast(timeout); + } + using clock = std::chrono::system_clock; + auto start = clock::now(); + + std::lock_guard lock(request_futures_list_mutex_); + while (!is_all_futures_ready() && (clock::now() - start) < timeout) { + std::this_thread::sleep_for(sleep_time); + } + + return is_all_futures_ready(); +} + void PlayerServiceClientManager::remove_complete_request_future() { std::vector remove_keys; - for (auto & [timestamp, request_id_and_client] : request_futures_list_) { - if (request_id_and_client.first->wait_for(std::chrono::seconds(0)) == - std::future_status::ready) - { - auto client = request_id_and_client.second.lock(); + std::lock_guard lock(request_futures_list_mutex_); + for (auto & [timestamp, future_request_id_and_client] : request_futures_list_) { + if (future_request_id_and_client.first->wait_for(0s) == std::future_status::ready) { + auto client = future_request_id_and_client.second.lock(); if (client) { - client->remove_pending_request(request_id_and_client.first->request_id); + client->remove_pending_request(future_request_id_and_client.first->request_id); } remove_keys.emplace_back(timestamp); } @@ -291,6 +277,7 @@ void PlayerServiceClientManager::remove_complete_request_future() void PlayerServiceClientManager::remove_all_timeout_request_future() { + std::lock_guard lock(request_futures_list_mutex_); auto current_time = std::chrono::steady_clock::now(); auto first_iter_without_timeout = request_futures_list_.lower_bound(current_time - request_future_timeout_); diff --git a/rosbag2_transport/test/resources/player_node_params.yaml b/rosbag2_transport/test/resources/player_node_params.yaml index bbef93ec4f..79e1b47a93 100644 --- a/rosbag2_transport/test/resources/player_node_params.yaml +++ b/rosbag2_transport/test/resources/player_node_params.yaml @@ -37,6 +37,8 @@ player_params_node: sec: 0 nsec: -999999999 disable_loan_message: false + publish_service_requests: false + service_requests_source: "CLIENT_INTROSPECTION" # SERVICE_INTROSPECTION or CLIENT_INTROSPECTION storage: uri: "path/to/some_bag" diff --git a/rosbag2_transport/test/rosbag2_transport/mock_player.hpp b/rosbag2_transport/test/rosbag2_transport/mock_player.hpp index bf575cace8..9220679b0b 100644 --- a/rosbag2_transport/test/rosbag2_transport/mock_player.hpp +++ b/rosbag2_transport/test/rosbag2_transport/mock_player.hpp @@ -59,7 +59,7 @@ class MockPlayer : public rosbag2_transport::Player std::vector get_list_of_clients() { std::vector cli_list; - for (const auto & client : this->get_services_clients()) { + for (const auto & client : this->get_service_clients()) { cli_list.push_back( static_cast( client.second.get())); diff --git a/rosbag2_transport/test/rosbag2_transport/test_burst.cpp b/rosbag2_transport/test/rosbag2_transport/test_burst.cpp index b518b787f1..8de44f02da 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_burst.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_burst.cpp @@ -41,7 +41,7 @@ get_service_event_message_basic_types() { auto msg = std::make_shared(); - msg->info.event_type = service_msgs::msg::ServiceEventInfo::REQUEST_SENT; + msg->info.event_type = service_msgs::msg::ServiceEventInfo::REQUEST_RECEIVED; test_msgs::srv::BasicTypes_Request request; request.int32_value = 123; request.int64_value = 456; @@ -51,7 +51,7 @@ get_service_event_message_basic_types() { auto msg = std::make_shared(); - msg->info.event_type = service_msgs::msg::ServiceEventInfo::REQUEST_SENT; + msg->info.event_type = service_msgs::msg::ServiceEventInfo::REQUEST_RECEIVED; test_msgs::srv::BasicTypes_Request request; request.int32_value = 456; request.int64_value = 789; @@ -401,32 +401,38 @@ TEST_F(RosBag2PlayTestFixture, burst_bursting_only_filtered_topics) { } TEST_F(RosBag2PlayTestFixture, burst_bursting_only_filtered_services) { + const std::string service_name1 = "/test_service1"; + const std::string service_name2 = "/test_service2"; + const std::string service_event_name1 = service_name1 + "/_service_event"; + const std::string service_event_name2 = service_name2 + "/_service_event"; + auto services_types = std::vector{ - {1u, "/test_service1/_service_event", "test_msgs/srv/BasicTypes_Event", "", {}, ""}, - {2u, "/test_service2/_service_event", "test_msgs/srv/BasicTypes_Event", "", {}, ""}, + {1u, service_event_name1, "test_msgs/srv/BasicTypes_Event", "", {}, ""}, + {2u, service_event_name2, "test_msgs/srv/BasicTypes_Event", "", {}, ""}, }; std::vector> messages = { serialize_test_message( - "/test_service1/_service_event", 500, get_service_event_message_basic_types()[0]), + service_event_name1, 500, get_service_event_message_basic_types()[0]), serialize_test_message( - "/test_service2/_service_event", 600, get_service_event_message_basic_types()[0]), + service_event_name2, 600, get_service_event_message_basic_types()[0]), serialize_test_message( - "/test_service1/_service_event", 400, get_service_event_message_basic_types()[1]), + service_event_name1, 400, get_service_event_message_basic_types()[1]), serialize_test_message( - "/test_service2/_service_event", 500, get_service_event_message_basic_types()[1]) + service_event_name2, 500, get_service_event_message_basic_types()[1]) }; std::vector> service1_receive_requests; std::vector> service2_receive_requests; - srv_->setup_service("test_service1", service1_receive_requests); - srv_->setup_service("test_service2", service2_receive_requests); + srv_->setup_service(service_name1, service1_receive_requests); + srv_->setup_service(service_name2, service2_receive_requests); srv_->run_services(); // Filter allows test_service2, blocks test_service1 - play_options_.services_to_filter = {"test_service2/_service_event"}; + play_options_.services_to_filter = {service_event_name2}; + play_options_.publish_service_requests = true; auto prepared_mock_reader = std::make_unique(); prepared_mock_reader->prepare(messages, services_types); auto reader = std::make_unique(std::move(prepared_mock_reader)); @@ -448,7 +454,7 @@ TEST_F(RosBag2PlayTestFixture, burst_bursting_only_filtered_services) { player->resume(); player_future.get(); - std::this_thread::sleep_for(200ms); + player->wait_for_playback_to_finish(); EXPECT_EQ(service1_receive_requests.size(), 0); EXPECT_EQ(service2_receive_requests.size(), 2); diff --git a/rosbag2_transport/test/rosbag2_transport/test_composable_player.cpp b/rosbag2_transport/test/rosbag2_transport/test_composable_player.cpp index c239b4d2e6..d400d1fed3 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_composable_player.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_composable_player.cpp @@ -170,6 +170,10 @@ TEST_P(ComposablePlayerTests, player_can_parse_parameters_from_file) { EXPECT_EQ(play_options.start_offset, 999999999); EXPECT_EQ(play_options.wait_acked_timeout, -999999999); EXPECT_EQ(play_options.disable_loan_message, false); + EXPECT_EQ(play_options.publish_service_requests, false); + EXPECT_EQ( + play_options.service_requests_source, + rosbag2_transport::ServiceRequestsSource::CLIENT_INTROSPECTION); EXPECT_EQ(storage_options.uri, uri_str); EXPECT_EQ(storage_options.storage_id, GetParam()); diff --git a/rosbag2_transport/test/rosbag2_transport/test_play.cpp b/rosbag2_transport/test/rosbag2_transport/test_play.cpp index 824fc95934..26bb25c4bb 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play.cpp @@ -54,26 +54,59 @@ get_service_event_message_basic_types() { auto msg = std::make_shared(); - msg->info.event_type = service_msgs::msg::ServiceEventInfo::REQUEST_SENT; + msg->info.event_type = service_msgs::msg::ServiceEventInfo::REQUEST_RECEIVED; test_msgs::srv::BasicTypes_Request request; request.int32_value = 123; request.int64_value = 456; + request.string_value = "event_type=REQUEST_RECEIVED"; msg->request.emplace_back(request); messages.push_back(msg); } { auto msg = std::make_shared(); - msg->info.event_type = service_msgs::msg::ServiceEventInfo::REQUEST_SENT; + msg->info.event_type = service_msgs::msg::ServiceEventInfo::REQUEST_RECEIVED; test_msgs::srv::BasicTypes_Request request; request.int32_value = 456; request.int64_value = 789; + request.string_value = "event_type=REQUEST_RECEIVED"; + msg->request.emplace_back(request); + messages.push_back(msg); + } + + { + auto msg = std::make_shared(); + msg->info.event_type = service_msgs::msg::ServiceEventInfo::REQUEST_SENT; + test_msgs::srv::BasicTypes_Request request; + request.int32_value = 789; + request.int64_value = 123; + request.string_value = "event_type=REQUEST_SENT"; msg->request.emplace_back(request); messages.push_back(msg); } return messages; } + +void spin_thread_and_wait_for_sent_service_requests_to_finish( + std::shared_ptr player, + std::vector service_name_list) +{ + rclcpp::executors::SingleThreadedExecutor exec; + exec.add_node(player); + auto spin_thread = std::thread( + [&exec]() { + exec.spin(); + }); + player->play(); + player->wait_for_playback_to_finish(200ms); + + for (auto & service_name : service_name_list) { + EXPECT_TRUE(player->wait_for_sent_service_requests_to_finish(service_name, 100ms)); + } + exec.cancel(); + spin_thread.join(); +} } // namespace TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics) @@ -143,20 +176,25 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics) TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_services) { + const std::string service_name1 = "/test_service1"; + const std::string service_event_name1 = service_name1 + "/_service_event"; + const std::string service_name2 = "/test_service2"; + const std::string service_event_name2 = service_name2 + "/_service_event"; + auto services_types = std::vector{ - {1u, "test_service1/_service_event", "test_msgs/srv/BasicTypes_Event", "", {}, ""}, - {2u, "test_service2/_service_event", "test_msgs/srv/BasicTypes_Event", "", {}, ""}, + {1u, service_event_name1, "test_msgs/srv/BasicTypes_Event", "", {}, ""}, + {2u, service_event_name2, "test_msgs/srv/BasicTypes_Event", "", {}, ""}, }; std::vector> messages = { serialize_test_message( - "test_service1/_service_event", 500, get_service_event_message_basic_types()[0]), + service_event_name1, 500, get_service_event_message_basic_types()[0]), serialize_test_message( - "test_service2/_service_event", 600, get_service_event_message_basic_types()[0]), + service_event_name2, 600, get_service_event_message_basic_types()[0]), serialize_test_message( - "test_service1/_service_event", 400, get_service_event_message_basic_types()[1]), + service_event_name1, 400, get_service_event_message_basic_types()[1]), serialize_test_message( - "test_service2/_service_event", 500, get_service_event_message_basic_types()[1]) + service_event_name2, 500, get_service_event_message_basic_types()[1]) }; auto prepared_mock_reader = std::make_unique(); @@ -166,19 +204,20 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_services) std::vector> service1_receive_requests; std::vector> service2_receive_requests; - srv_->setup_service("test_service1", service1_receive_requests); - srv_->setup_service("test_service2", service2_receive_requests); + srv_->setup_service(service_name1, service1_receive_requests); + srv_->setup_service(service_name2, service2_receive_requests); srv_->run_services(); ASSERT_TRUE(srv_->all_services_ready()); + play_options_.publish_service_requests = true; auto player = std::make_shared( std::move( reader), storage_options_, play_options_); - player->play(); - std::this_thread::sleep_for(300ms); + spin_thread_and_wait_for_sent_service_requests_to_finish(player, {service_name1, service_name2}); + EXPECT_EQ(service1_receive_requests.size(), 2); EXPECT_EQ(service2_receive_requests.size(), 2); } @@ -188,20 +227,24 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_topics_and_servi auto topic_msg = get_messages_basic_types()[0]; topic_msg->int64_value = 1111; + const std::string topic_name = "/topic1"; + const std::string service_name = "/test_service1"; + const std::string service_event_name = service_name + "/_service_event"; + auto services_types = std::vector{ - {1u, "topic1", "test_msgs/BasicTypes", "", {}, ""}, - {2u, "test_service1/_service_event", "test_msgs/srv/BasicTypes_Event", "", {}, ""}, + {1u, topic_name, "test_msgs/BasicTypes", "", {}, ""}, + {2u, service_event_name, "test_msgs/srv/BasicTypes_Event", "", {}, ""}, }; std::vector> messages = { serialize_test_message( - "test_service1/_service_event", 500, get_service_event_message_basic_types()[0]), + service_event_name, 500, get_service_event_message_basic_types()[0]), serialize_test_message( - "topic1", 600, topic_msg), + topic_name, 600, topic_msg), serialize_test_message( - "test_service1/_service_event", 550, get_service_event_message_basic_types()[1]), + service_event_name, 550, get_service_event_message_basic_types()[1]), serialize_test_message( - "topic1", 400, topic_msg), + topic_name, 400, topic_msg), }; auto prepared_mock_reader = std::make_unique(); @@ -209,28 +252,40 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_topics_and_servi auto reader = std::make_unique(std::move(prepared_mock_reader)); std::vector> service_receive_requests; - srv_->setup_service("test_service1", service_receive_requests); + srv_->setup_service(service_name, service_receive_requests); srv_->run_services(); ASSERT_TRUE(srv_->all_services_ready()); - sub_->add_subscription("/topic1", 2); + sub_->add_subscription(topic_name, 2); auto await_received_messages = sub_->spin_subscriptions(); + play_options_.publish_service_requests = true; auto player = std::make_shared( std::move( reader), storage_options_, play_options_); + + rclcpp::executors::SingleThreadedExecutor exec; + exec.add_node(player); + auto spin_thread = std::thread( + [&exec]() { + exec.spin(); + }); + player->play(); await_received_messages.get(); auto replayed_topic_msg = sub_->get_received_messages( - "/topic1"); + topic_name); EXPECT_THAT(replayed_topic_msg, SizeIs(Ge(2u))); EXPECT_THAT( replayed_topic_msg, Each(Pointee(Field(&test_msgs::msg::BasicTypes::int64_value, 1111)))); - std::this_thread::sleep_for(200ms); + player->wait_for_playback_to_finish(200ms); + EXPECT_TRUE(player->wait_for_sent_service_requests_to_finish(service_name, 100ms)); + exec.cancel(); + spin_thread.join(); ASSERT_EQ(service_receive_requests.size(), 2); for (size_t i = 0; i < 2; i++) { EXPECT_EQ( @@ -433,37 +488,44 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_filtered_topics) TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_filtered_services) { + const std::string service_name1 = "/test_service1"; + const std::string service_event_name1 = service_name1 + "/_service_event"; + const std::string service_name2 = "/test_service2"; + const std::string service_event_name2 = service_name2 + "/_service_event"; + auto services_types = std::vector{ - {1u, "/test_service1/_service_event", "test_msgs/srv/BasicTypes_Event", "", {}, ""}, - {2u, "/test_service2/_service_event", "test_msgs/srv/BasicTypes_Event", "", {}, ""}, + {1u, service_event_name1, "test_msgs/srv/BasicTypes_Event", "", {}, ""}, + {2u, service_event_name2, "test_msgs/srv/BasicTypes_Event", "", {}, ""}, }; std::vector> messages = { serialize_test_message( - "/test_service1/_service_event", 500, get_service_event_message_basic_types()[0]), + service_event_name1, 500, get_service_event_message_basic_types()[0]), serialize_test_message( - "/test_service2/_service_event", 600, get_service_event_message_basic_types()[0]), + service_event_name2, 600, get_service_event_message_basic_types()[0]), serialize_test_message( - "/test_service1/_service_event", 400, get_service_event_message_basic_types()[1]), + service_event_name1, 400, get_service_event_message_basic_types()[1]), serialize_test_message( - "/test_service2/_service_event", 500, get_service_event_message_basic_types()[1]) + service_event_name2, 500, get_service_event_message_basic_types()[1]) }; auto prepared_mock_reader = std::make_unique(); prepared_mock_reader->prepare(messages, services_types); auto reader = std::make_unique(std::move(prepared_mock_reader)); + play_options_.publish_service_requests = true; + // Filter allows /test_service2, blocks /test_service1 { - play_options_.services_to_filter = {"test_service2/_service_event"}; + play_options_.services_to_filter = {service_event_name2}; srv_.reset(); srv_ = std::make_shared(); std::vector> service1_receive_requests; std::vector> service2_receive_requests; - srv_->setup_service("test_service1", service1_receive_requests); - srv_->setup_service("test_service2", service2_receive_requests); + srv_->setup_service(service_name1, service1_receive_requests); + srv_->setup_service(service_name2, service2_receive_requests); srv_->run_services(); ASSERT_TRUE(srv_->all_services_ready()); @@ -475,23 +537,25 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_filtered_service auto player = std::make_shared( std::move( reader), storage_options_, play_options_); - player->play(); - std::this_thread::sleep_for(250ms); + // Only need to wait for sent service 2 request to finish + spin_thread_and_wait_for_sent_service_requests_to_finish( + player, {service_name2}); + EXPECT_EQ(service1_receive_requests.size(), 0); EXPECT_EQ(service2_receive_requests.size(), 2); } // Filter allows /test_service1, blocks /test_service2 { - play_options_.services_to_filter = {"test_service1/_service_event"}; + play_options_.services_to_filter = {service_event_name1}; srv_.reset(); srv_ = std::make_shared(); std::vector> service1_receive_requests; std::vector> service2_receive_requests; - srv_->setup_service("test_service1", service1_receive_requests); - srv_->setup_service("test_service2", service2_receive_requests); + srv_->setup_service(service_name1, service1_receive_requests); + srv_->setup_service(service_name2, service2_receive_requests); srv_->run_services(); ASSERT_TRUE(srv_->all_services_ready()); @@ -503,9 +567,10 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_filtered_service auto player = std::make_shared( std::move( reader), storage_options_, play_options_); - player->play(); + // Only need to wait for sent service 1 request to finish + spin_thread_and_wait_for_sent_service_requests_to_finish( + player, {service_name1}); - std::this_thread::sleep_for(250ms); EXPECT_EQ(service1_receive_requests.size(), 2); EXPECT_EQ(service2_receive_requests.size(), 0); } @@ -518,8 +583,8 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_filtered_service srv_ = std::make_shared(); std::vector> service1_receive_requests; std::vector> service2_receive_requests; - srv_->setup_service("test_service1", service1_receive_requests); - srv_->setup_service("test_service2", service2_receive_requests); + srv_->setup_service(service_name1, service1_receive_requests); + srv_->setup_service(service_name2, service2_receive_requests); srv_->run_services(); ASSERT_TRUE(srv_->all_services_ready()); @@ -531,9 +596,9 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_filtered_service auto player = std::make_shared( std::move( reader), storage_options_, play_options_); - player->play(); + spin_thread_and_wait_for_sent_service_requests_to_finish( + player, {service_name1, service_name2}); - std::this_thread::sleep_for(250ms); EXPECT_EQ(service1_receive_requests.size(), 2); EXPECT_EQ(service2_receive_requests.size(), 2); } @@ -541,43 +606,45 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_filtered_service TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_filtered_topics_and_services) { + const std::string topic_name1 = "/topic1"; + const std::string topic_name2 = "/topic2"; + const std::string service_name1 = "/test_service1"; + const std::string service_event_name1 = service_name1 + "/_service_event"; + const std::string service_name2 = "/test_service2"; + const std::string service_event_name2 = service_name2 + "/_service_event"; + auto all_types = std::vector{ - {1u, "/topic1", "test_msgs/BasicTypes", "", {}, ""}, - {2u, "/topic2", "test_msgs/BasicTypes", "", {}, ""}, - {3u, "/test_service1/_service_event", "test_msgs/srv/BasicTypes_Event", "", {}, ""}, - {4u, "/test_service2/_service_event", "test_msgs/srv/BasicTypes_Event", "", {}, ""}, + {1u, topic_name1, "test_msgs/BasicTypes", "", {}, ""}, + {2u, topic_name2, "test_msgs/BasicTypes", "", {}, ""}, + {3u, service_event_name1, "test_msgs/srv/BasicTypes_Event", "", {}, ""}, + {4u, service_event_name2, "test_msgs/srv/BasicTypes_Event", "", {}, ""}, }; std::vector> messages = { - serialize_test_message("/topic1", 500, get_messages_basic_types()[0]), - serialize_test_message( - "/test_service1/_service_event", - 520, - get_service_event_message_basic_types()[0]), - serialize_test_message("/topic2", 520, get_messages_basic_types()[0]), - serialize_test_message( - "/test_service2/_service_event", - 550, - get_service_event_message_basic_types()[0]), + serialize_test_message(topic_name1, 500, get_messages_basic_types()[0]), + serialize_test_message(service_event_name1, 520, get_service_event_message_basic_types()[0]), + serialize_test_message(topic_name2, 520, get_messages_basic_types()[0]), + serialize_test_message(service_event_name2, 550, get_service_event_message_basic_types()[0]), }; + play_options_.publish_service_requests = true; // Filter allows all topics, blocks service test_service2 { - play_options_.topics_to_filter = {"topic1", "topic2"}; - play_options_.services_to_filter = {"test_service1/_service_event"}; + play_options_.topics_to_filter = {topic_name1, topic_name2}; + play_options_.services_to_filter = {service_event_name1}; sub_.reset(); sub_ = std::make_shared(); - sub_->add_subscription("/topic1", 1); - sub_->add_subscription("/topic2", 1); + sub_->add_subscription(topic_name1, 1); + sub_->add_subscription(topic_name2, 1); srv_.reset(); srv_ = std::make_shared(); std::vector> service1_receive_requests; std::vector> service2_receive_requests; - srv_->setup_service("test_service1", service1_receive_requests); - srv_->setup_service("test_service2", service2_receive_requests); + srv_->setup_service(service_name1, service1_receive_requests); + srv_->setup_service(service_name2, service2_receive_requests); srv_->run_services(); ASSERT_TRUE(srv_->all_services_ready()); @@ -590,15 +657,26 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_filtered_topics_ auto player = std::make_shared( std::move( reader), storage_options_, play_options_); - player->play(); + rclcpp::executors::SingleThreadedExecutor exec; + exec.add_node(player); + auto spin_thread = std::thread( + [&exec]() { + exec.spin(); + }); + + player->play(); await_received_messages.get(); - std::this_thread::sleep_for(300ms); + player->wait_for_playback_to_finish(200ms); + + EXPECT_TRUE(player->wait_for_sent_service_requests_to_finish(service_name1, 100ms)); + exec.cancel(); + spin_thread.join(); // Filter allow all topics - auto replayed_topic1 = sub_->get_received_messages("/topic1"); + auto replayed_topic1 = sub_->get_received_messages(topic_name1); EXPECT_THAT(replayed_topic1, SizeIs(1u)); - auto replayed_topic2 = sub_->get_received_messages("/topic2"); + auto replayed_topic2 = sub_->get_received_messages(topic_name2); EXPECT_THAT(replayed_topic2, SizeIs(1u)); // Filter allow test_service1, block test_service2 @@ -608,21 +686,21 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_filtered_topics_ // Filter allows all services, blocks topic2 { - play_options_.topics_to_filter = {"topic1"}; + play_options_.topics_to_filter = {topic_name1}; play_options_.services_to_filter = { - "test_service1/_service_event", "test_service2/_service_event"}; + service_event_name1, service_event_name2}; sub_.reset(); sub_ = std::make_shared(); - sub_->add_subscription("/topic1", 1); - sub_->add_subscription("/topic2", 0); + sub_->add_subscription(topic_name1, 1); + sub_->add_subscription(topic_name2, 0); srv_.reset(); srv_ = std::make_shared(); std::vector> service1_receive_requests; std::vector> service2_receive_requests; - srv_->setup_service("test_service1", service1_receive_requests); - srv_->setup_service("test_service2", service2_receive_requests); + srv_->setup_service(service_name1, service1_receive_requests); + srv_->setup_service(service_name2, service2_receive_requests); srv_->run_services(); ASSERT_TRUE(srv_->all_services_ready()); @@ -635,15 +713,27 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_filtered_topics_ auto player = std::make_shared( std::move( reader), storage_options_, play_options_); - player->play(); + rclcpp::executors::SingleThreadedExecutor exec; + exec.add_node(player); + auto spin_thread = std::thread( + [&exec]() { + exec.spin(); + }); + + player->play(); await_received_messages.get(); - std::this_thread::sleep_for(200ms); + player->wait_for_playback_to_finish(200ms); + + EXPECT_TRUE(player->wait_for_sent_service_requests_to_finish(service_name1, 100ms)); + EXPECT_TRUE(player->wait_for_sent_service_requests_to_finish(service_name2, 100ms)); + exec.cancel(); + spin_thread.join(); // Filter allow topic2, block topic1 - auto replayed_topic1 = sub_->get_received_messages("/topic1"); + auto replayed_topic1 = sub_->get_received_messages(topic_name1); EXPECT_THAT(replayed_topic1, SizeIs(1u)); - auto replayed_topic2 = sub_->get_received_messages("/topic2"); + auto replayed_topic2 = sub_->get_received_messages(topic_name2); EXPECT_THAT(replayed_topic2, SizeIs(0u)); // Filter allow all services @@ -653,21 +743,21 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_filtered_topics_ // Filter allows all services and topics { - play_options_.topics_to_filter = {"topic1", "topic2"}; + play_options_.topics_to_filter = {topic_name1, topic_name2}; play_options_.services_to_filter = { - "test_service1/_service_event", "test_service2/_service_event"}; + service_event_name1, service_event_name2}; sub_.reset(); sub_ = std::make_shared(); - sub_->add_subscription("/topic1", 1); - sub_->add_subscription("/topic2", 1); + sub_->add_subscription(topic_name1, 1); + sub_->add_subscription(topic_name2, 1); srv_.reset(); srv_ = std::make_shared(); std::vector> service1_receive_requests; std::vector> service2_receive_requests; - srv_->setup_service("test_service1", service1_receive_requests); - srv_->setup_service("test_service2", service2_receive_requests); + srv_->setup_service(service_name1, service1_receive_requests); + srv_->setup_service(service_name2, service2_receive_requests); srv_->run_services(); ASSERT_TRUE(srv_->all_services_ready()); @@ -680,10 +770,22 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_filtered_topics_ auto player = std::make_shared( std::move( reader), storage_options_, play_options_); - player->play(); + rclcpp::executors::SingleThreadedExecutor exec; + exec.add_node(player); + auto spin_thread = std::thread( + [&exec]() { + exec.spin(); + }); + + player->play(); await_received_messages.get(); - std::this_thread::sleep_for(200ms); + player->wait_for_playback_to_finish(200ms); + + EXPECT_TRUE(player->wait_for_sent_service_requests_to_finish(service_name1, 100ms)); + EXPECT_TRUE(player->wait_for_sent_service_requests_to_finish(service_name2, 100ms)); + exec.cancel(); + spin_thread.join(); // Filter allow all topics auto replayed_topic1 = sub_->get_received_messages("/topic1"); @@ -839,6 +941,155 @@ TEST_F(RosBag2PlayTestFixture, player_gracefully_exit_by_rclcpp_shutdown_in_paus player->wait_for_playback_to_finish(); } +TEST_F(RosBag2PlayTestFixture, play_service_requests_from_service_introspection_messages) +{ + const std::string service_name = "/test_service1"; + const std::string service_event_name = service_name + "/_service_event"; + auto services_types = std::vector{ + {1u, service_event_name, "test_msgs/srv/BasicTypes_Event", "", {}, ""}, + }; + std::vector> messages = + { + serialize_test_message(service_event_name, 5, get_service_event_message_basic_types()[2]), + serialize_test_message(service_event_name, 10, get_service_event_message_basic_types()[0]), + serialize_test_message(service_event_name, 20, get_service_event_message_basic_types()[2]), + serialize_test_message(service_event_name, 25, get_service_event_message_basic_types()[0]), + serialize_test_message(service_event_name, 30, get_service_event_message_basic_types()[2]), + }; + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, services_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + + std::vector> received_service_requests; + + srv_->setup_service(service_name, received_service_requests); + srv_->run_services(); + ASSERT_TRUE(srv_->all_services_ready()); + + play_options_.publish_service_requests = true; + play_options_.service_requests_source = ServiceRequestsSource::SERVICE_INTROSPECTION; + + auto player = + std::make_shared(std::move(reader), storage_options_, play_options_); + + spin_thread_and_wait_for_sent_service_requests_to_finish(player, {service_name}); + + EXPECT_EQ(received_service_requests.size(), 2); + // expected_request is ServiceEventInfo::REQUEST_RECEIVED + const auto expected_request = get_service_event_message_basic_types()[0]->request[0]; + for (const auto & service_request : received_service_requests) { + EXPECT_EQ(service_request->int32_value, expected_request.int32_value) << + service_request->string_value; + EXPECT_EQ(service_request->int64_value, expected_request.int64_value) << + service_request->string_value; + } +} + +TEST_F(RosBag2PlayTestFixture, play_service_requests_from_client_introspection_messages) +{ + const std::string service_name = "/test_service1"; + const std::string service_event_name = service_name + "/_service_event"; + auto services_types = std::vector{ + {1u, service_event_name, "test_msgs/srv/BasicTypes_Event", "", {}, ""}, + }; + std::vector> messages = + { + serialize_test_message(service_event_name, 5, get_service_event_message_basic_types()[0]), + serialize_test_message(service_event_name, 10, get_service_event_message_basic_types()[1]), + serialize_test_message(service_event_name, 20, get_service_event_message_basic_types()[2]), + serialize_test_message(service_event_name, 25, get_service_event_message_basic_types()[2]), + serialize_test_message(service_event_name, 30, get_service_event_message_basic_types()[1]), + }; + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, services_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + + std::vector> received_service_requests; + + srv_->setup_service(service_name, received_service_requests); + srv_->run_services(); + ASSERT_TRUE(srv_->all_services_ready()); + + play_options_.publish_service_requests = true; + play_options_.service_requests_source = ServiceRequestsSource::CLIENT_INTROSPECTION; + + auto player = + std::make_shared(std::move(reader), storage_options_, play_options_); + + spin_thread_and_wait_for_sent_service_requests_to_finish(player, {service_name}); + + EXPECT_EQ(received_service_requests.size(), 2); + // expected_request is ServiceEventInfo::REQUEST_SENT + const auto expected_request = get_service_event_message_basic_types()[2]->request[0]; + for (const auto & service_request : received_service_requests) { + EXPECT_EQ(service_request->int32_value, expected_request.int32_value) << + service_request->string_value; + EXPECT_EQ(service_request->int64_value, expected_request.int64_value) << + service_request->string_value; + } +} + +TEST_F(RosBag2PlayTestFixture, play_service_events_and_topics) +{ + const std::string topic_1_name = "/topic1"; + const std::string topic_2_name = "/topic2"; + const std::string service_event_1_name = "/test_service1/_service_event"; + const std::string service_event_2_name = "/test_service2/_service_event"; + + auto all_types = std::vector{ + {1u, topic_1_name, "test_msgs/BasicTypes", "", {}, ""}, + {2u, topic_2_name, "test_msgs/BasicTypes", "", {}, ""}, + {3u, service_event_1_name, "test_msgs/srv/BasicTypes_Event", "", {}, ""}, + {4u, service_event_2_name, "test_msgs/srv/BasicTypes_Event", "", {}, ""}, + }; + + auto request_received_service_event = get_service_event_message_basic_types()[0]; + std::vector> messages = + { + serialize_test_message(topic_1_name, 10, get_messages_basic_types()[0]), + serialize_test_message(service_event_1_name, 20, request_received_service_event), + serialize_test_message(topic_2_name, 30, get_messages_basic_types()[0]), + serialize_test_message(service_event_2_name, 40, request_received_service_event), + }; + + play_options_.publish_service_requests = false; + + sub_ = std::make_shared(); + sub_->add_subscription(topic_1_name, 1); + sub_->add_subscription(topic_2_name, 1); + sub_->add_subscription(service_event_1_name, 1); + sub_->add_subscription(service_event_2_name, 1); + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, all_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + auto player = std::make_shared(std::move(reader), storage_options_, play_options_); + + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE( + sub_->spin_and_wait_for_matched(player->get_list_of_publishers(), std::chrono::seconds(30))); + + auto await_received_messages = sub_->spin_subscriptions(); + + player->play(); + player->wait_for_playback_to_finish(); + + await_received_messages.get(); + + auto replayed_topic1 = sub_->get_received_messages(topic_1_name); + EXPECT_THAT(replayed_topic1, SizeIs(1u)); + auto replayed_topic2 = sub_->get_received_messages(topic_2_name); + EXPECT_THAT(replayed_topic2, SizeIs(1u)); + auto replayed_service_event_1 = + sub_->get_received_messages(service_event_1_name); + EXPECT_THAT(replayed_service_event_1, SizeIs(1u)); + auto replayed_service_event_2 = + sub_->get_received_messages(service_event_2_name); + EXPECT_THAT(replayed_service_event_2, SizeIs(1u)); +} + class RosBag2PlayQosOverrideTestFixture : public RosBag2PlayTestFixture { public: