Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes for services playback per review 2 #6

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
bc4d118
Make service_event_ts_lib as private member again
MichaelOrlov Mar 31, 2024
5f324c1
Cleanup in PlayerServiceClient::async_send_request(ser_message)
MichaelOrlov Mar 31, 2024
6397df6
Refactoring. Do full deserialization and only once
MichaelOrlov Mar 31, 2024
94d418f
Specify service request from which introspection message and fix uncr…
Barry-Xu-2018 Apr 7, 2024
00bff3e
Revert uncrustify changes from previous commit.
MichaelOrlov Apr 9, 2024
05a3d1d
Rename service_request_from to the service_requests_source
MichaelOrlov Apr 9, 2024
9d98b04
Add Player::wait_for_sent_service_requests_to_finish() API
MichaelOrlov Apr 10, 2024
b7116b9
Mitigate potential issues related to the operations reordering on ARM
MichaelOrlov Apr 10, 2024
b87c944
Make tests play_service_requests_from_service(client) deterministic
MichaelOrlov Apr 10, 2024
bede854
Misc findings and improvements 1
MichaelOrlov Apr 10, 2024
c5f6c13
Rename get_services_clients() to the get_service_clients()
MichaelOrlov Apr 10, 2024
a623a23
Add a new CLI parameter "--publish-service-requests" for Player
Barry-Xu-2018 Apr 8, 2024
091620c
Fix an issue on filtering topic when prepare publishers
MichaelOrlov Apr 10, 2024
3c19873
Cleanup in play_without_publish_service_requests
MichaelOrlov Apr 10, 2024
98c14ef
Wrap code which can throw with try-catch in the publish_message(..)
MichaelOrlov Apr 11, 2024
f270ac1
Delete some part of the code which became absolute and shall not be used
MichaelOrlov Apr 11, 2024
974d687
Update test codes
Barry-Xu-2018 Apr 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions ros2bag/ros2bag/verb/play.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from ros2cli.node import NODE_NAME_PREFIX
from rosbag2_py import Player
from rosbag2_py import PlayOptions
from rosbag2_py import ServiceRequestFrom
from rosbag2_py import StorageOptions
import yaml

Expand Down Expand Up @@ -152,6 +153,11 @@ def add_arguments(self, parser, cli_name): # noqa: D102
'By default, if loaned message can be used, messages are published as loaned '
'message. It can help to reduce the number of data copies, so there is a greater '
'benefit for sending big data.')
parser.add_argument(
'--service-request-from', default='service_introspection',
choices=['service_introspection', 'client_introspection'],
help='Determine the source of the service request to be replayed. '
'By default, the service request is from recorded service introspection message.')
MichaelOrlov marked this conversation as resolved.
Show resolved Hide resolved

def get_playback_until_from_arg_group(self, playback_until_sec, playback_until_nsec) -> int:
nano_scale = 1000 * 1000 * 1000
Expand Down Expand Up @@ -219,6 +225,10 @@ def main(self, *, args): # noqa: D102
play_options.start_offset = args.start_offset
play_options.wait_acked_timeout = args.wait_for_all_acked
play_options.disable_loan_message = args.disable_loan_message
if not args.service_request_from or args.service_request_from == 'service':
play_options.service_request_from = ServiceRequestFrom.SERVICE_INTROSPECTION
else:
play_options.service_request_from = ServiceRequestFrom.CLIENT_INTROSPECTION

player = Player()
try:
Expand Down
2 changes: 2 additions & 0 deletions rosbag2_cpp/src/rosbag2_cpp/service_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ size_t get_serialization_size_for_service_metadata_event()
static_cast<const rosidl_typesupport_introspection_cpp::MessageMembers *>(
type_support_handle->data);

// TODO(morlov): We shall not rely on this arithmetic!!! It is up to the serialization
// implementation
MichaelOrlov marked this conversation as resolved.
Show resolved Hide resolved
// endian type (4 size) + service event info size + empty request (4 bytes)
// + emtpy response (4 bytes)
size = 4 + service_event_info->size_of_ + 4 + 4;
Expand Down
2 changes: 2 additions & 0 deletions rosbag2_py/rosbag2_py/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
from rosbag2_py._transport import (
Player,
PlayOptions,
ServiceRequestFrom,
Recorder,
RecordOptions,
bag_rewrite,
Expand Down Expand Up @@ -94,6 +95,7 @@
'Info',
'Player',
'PlayOptions',
'ServiceRequestFrom',
'Recorder',
'RecordOptions',
]
6 changes: 6 additions & 0 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,12 @@ PYBIND11_MODULE(_transport, m) {
&PlayOptions::setPlaybackUntilTimestamp)
.def_readwrite("wait_acked_timeout", &PlayOptions::wait_acked_timeout)
.def_readwrite("disable_loan_message", &PlayOptions::disable_loan_message)
.def_readwrite("service_request_from", &PlayOptions::service_request_from)
;

py::enum_<rosbag2_transport::ServiceRequestFrom>(m, "ServiceRequestFrom")
.value("SERVICE_INTROSPECTION", rosbag2_transport::ServiceRequestFrom::SERVICE_INTROSPECTION)
.value("CLIENT_INTROSPECTION", rosbag2_transport::ServiceRequestFrom::CLIENT_INTROSPECTION)
;

py::class_<RecordOptions>(m, "RecordOptions")
Expand Down
8 changes: 8 additions & 0 deletions rosbag2_transport/include/rosbag2_transport/play_options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@

namespace rosbag2_transport
{
enum class ServiceRequestFrom : int
{
SERVICE_INTROSPECTION = 0,
CLIENT_INTROSPECTION = 1
};

struct PlayOptions
{
Expand Down Expand Up @@ -112,6 +117,9 @@ struct PlayOptions

// Disable to publish as loaned message
bool disable_loan_message = false;

// The source of the service request
ServiceRequestFrom service_request_from = ServiceRequestFrom::SERVICE_INTROSPECTION;
};

} // namespace rosbag2_transport
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include <map>
#include <memory>
#include <mutex>
#include <unordered_map>
#include <utility>
#include <string>
#include <tuple>
Expand All @@ -38,6 +37,9 @@ class PlayerServiceClientManager;
class PlayerServiceClient final
{
public:
using ServiceEventType = service_msgs::msg::ServiceEventInfo::_event_type_type;
using ClientGidType = service_msgs::msg::ServiceEventInfo::_client_gid_type;

explicit
PlayerServiceClient(
std::shared_ptr<rclcpp::GenericClient> generic_client,
Expand All @@ -46,40 +48,44 @@ class PlayerServiceClient final
rclcpp::Logger logger,
std::shared_ptr<PlayerServiceClientManager> player_service_client_manager);

// Note: Call this function only if is_include_request_message() return true
const std::string & get_service_name();

/// \brief Deserialize message to the type erased service event
/// \param message - Serialized message
/// \return Shared pointer to the byte array with deserialized service event if success,
/// otherwise nullptr
std::shared_ptr<uint8_t[]> deserialize_service_event(const rcl_serialized_message_t & message);

std::tuple<PlayerServiceClient::ServiceEventType, PlayerServiceClient::ClientGidType>
get_service_event_type_and_client_gid(const std::shared_ptr<uint8_t[]> type_erased_service_event);

bool is_service_event_include_request_message(
const std::shared_ptr<uint8_t[]> type_erased_service_event);

void async_send_request(const std::shared_ptr<uint8_t[]> type_erased_service_event);

void async_send_request(const rcl_serialized_message_t & message);

std::shared_ptr<rclcpp::GenericClient> generic_client()
{
return client_;
}

// Check if message can be unpacked to get request message
bool is_include_request_message(const rcl_serialized_message_t & message);

private:
std::shared_ptr<rclcpp::GenericClient> client_;
std::string service_name_;
const rclcpp::Logger logger_;
std::shared_ptr<PlayerServiceClientManager> player_service_client_manager_;
enum class request_info_from
{
SERVICE = 0,
CLIENT,
NO_CONTENT // Only have META info. Not send request.
};
bool service_set_introspection_content_ = false;

using client_id = service_msgs::msg::ServiceEventInfo::_client_gid_type;
// Info on request data from service or client
std::unordered_map<client_id, request_info_from, rosbag2_cpp::client_id_hash> request_info_;
// Note: The service_event_ts_lib_ shall be a member variable to make sure that library loaded
// during the liveliness of the instance of this class, since we have raw pointers to its members.
std::shared_ptr<rcpputils::SharedLibrary> service_event_ts_lib_;

const rosidl_message_type_support_t * service_event_type_ts_;
const rosidl_typesupport_introspection_cpp::MessageMembers * service_event_members_;

rcutils_allocator_t allocator_ = rcutils_get_default_allocator();

std::tuple<uint8_t, client_id, int64_t>
std::tuple<uint8_t, ClientGidType, int64_t>
get_msg_event_type(const rcl_serialized_message_t & message);
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,21 @@ PlayOptions get_play_options_from_node_params(rclcpp::Node & node)
play_options.disable_loan_message =
node.declare_parameter<bool>("play.disable_loan_message", false);

// "SERVICE_INTROSPECTION" or "CLIENT_INTROSPECTION"
auto service_request_from =
node.declare_parameter<std::string>("play.service_request_from", "SERVICE_INTROSPECTION");
if (service_request_from == "SERVICE_INTROSPECTION") {
play_options.service_request_from = ServiceRequestFrom::SERVICE_INTROSPECTION;
} else if (service_request_from == "CLIENT_INTROSPECTION") {
play_options.service_request_from = ServiceRequestFrom::CLIENT_INTROSPECTION;
} else {
RCLCPP_ERROR(
node.get_logger(),
"play.service_request_from doesn't support %s. It must be one of SERVICE_INTROSPECTION"
" and CLIENT_INTROSPECTION. Change it by default value SERVICE_INTROSPECTION.",
service_request_from.c_str());
}

return play_options;
}

Expand Down
76 changes: 65 additions & 11 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include <string>
#include <unordered_map>
#include <utility>
#include <variant>
#include <vector>
#include <thread>

Expand All @@ -36,7 +35,6 @@
#include "rosbag2_cpp/clocks/time_controller_clock.hpp"
#include "rosbag2_cpp/reader.hpp"
#include "rosbag2_cpp/service_utils.hpp"
#include "rosbag2_cpp/typesupport_helpers.hpp"

#include "rosbag2_storage/storage_filter.hpp"
#include "rosbag2_storage/qos.hpp"
Expand Down Expand Up @@ -1170,13 +1168,11 @@ void PlayerImpl::run_play_msg_post_callbacks(

bool PlayerImpl::publish_message(rosbag2_storage::SerializedBagMessageSharedPtr message)
{
bool message_published = false;

auto pub_iter = publishers_.find(message->topic_name);
if (pub_iter != publishers_.end()) {
// Calling on play message pre-callbacks
run_play_msg_pre_callbacks(message);

bool message_published = false;
try {
pub_iter->second->publish(rclcpp::SerializedMessage(*message->serialized_data));
message_published = true;
Expand All @@ -1191,17 +1187,73 @@ bool PlayerImpl::publish_message(rosbag2_storage::SerializedBagMessageSharedPtr
return message_published;
}

// Try to publish message as service request
auto client_iter = service_clients_.find(message->topic_name);
if (client_iter != service_clients_.end()) {
if (!client_iter->second->is_include_request_message(*message->serialized_data)) {
return message_published;
const auto & service_client = client_iter->second;
// TODO(morlov):
// Wrap deserialize_service_event and get_service_event_type_and_client_gid in try-catch
auto service_event = service_client->deserialize_service_event(*message->serialized_data);
if (!service_event) {
RCLCPP_ERROR_STREAM(
owner_->get_logger(), "Failed to deserialize service event message for '" <<
service_client->get_service_name() << "' service!\n");
return false;
}

auto [service_event_type, client_gid] =
service_client->get_service_event_type_and_client_gid(service_event);
// Ignore response message
if (service_event_type == service_msgs::msg::ServiceEventInfo::RESPONSE_SENT ||
service_event_type == service_msgs::msg::ServiceEventInfo::RESPONSE_RECEIVED)
{
// TODO(morlov): Shall we ignore REQUEST_RECEIVED as well?
return false;
}

if (play_options_.service_request_from == ServiceRequestFrom::SERVICE_INTROSPECTION &&
service_event_type != service_msgs::msg::ServiceEventInfo::REQUEST_RECEIVED)
{
return false;
}

if (play_options_.service_request_from == ServiceRequestFrom::CLIENT_INTROSPECTION &&
service_event_type != service_msgs::msg::ServiceEventInfo::REQUEST_SENT)
{
return false;
}

if (!service_client->generic_client()->service_is_ready()) {
RCLCPP_ERROR(
owner_->get_logger(), "Service request hasn't been sent. The '%s' service isn't ready !",
service_client->get_service_name().c_str());
return false;
}

if (!service_client->is_service_event_include_request_message(service_event)) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears to overlook the scenario where both the service and the client enable introspection contents for the same client ID.

Old function is PlayerServiceClient::is_include_request_message().
Except checking if request is included, this function also includes the following logic

  1. Decide whether the request comes from the service or the client is used for one client ID. (Currently, it is decided by the first message including the request data. e.g. First message with request from client. We will use message from client to send request, even if message from service also provide request data.)

  2. Consider the case. Service/client switched introspection type from metadata to contents in recorded data.
    I don't consider the case. Service/Client switched introspection type from contents to metadata in record data, which leads that the decision in 1 maybe adjusted.

bool PlayerServiceClient::is_include_request_message(const rcl_serialized_message_t & message)
{
auto [type, client_id, sequence_number] = get_msg_event_type(message);
// Ignore response message
if (type == service_msgs::msg::ServiceEventInfo::RESPONSE_SENT ||
type == service_msgs::msg::ServiceEventInfo::RESPONSE_RECEIVED)
{
return false;
}
bool ret = false;
// For each Client, decide which request data to use based on the first message related to
// the request that is obtained from the record data.
// e.g.
auto iter = request_info_.find(client_id);
if (type == service_msgs::msg::ServiceEventInfo::REQUEST_RECEIVED) {
if (!service_set_introspection_content_) {
if (rosbag2_cpp::service_event_include_metadata_and_contents(message.buffer_length)) {
service_set_introspection_content_ = true;
}
}
if (iter != request_info_.end()) {
switch (iter->second) {
case request_info_from::CLIENT:
{
// Already decide using request data from client.
break;
}
case request_info_from::NO_CONTENT:
{
if (service_set_introspection_content_) {
// introspection type is changed from metadata to metadata + contents
request_info_[client_id] = request_info_from::SERVICE;
ret = true;
} else {
RCUTILS_LOG_WARN_ONCE_NAMED(
ROSBAG2_TRANSPORT_PACKAGE_NAME,
"The configuration of introspection for '%s' is metadata on service side !",
service_name_.c_str());
}
break;
}
default: // request_info_from::SERVICE:
{
// Already decide using request data from service.
ret = true;
}
}
} else {
if (service_set_introspection_content_) {
request_info_[client_id] = request_info_from::SERVICE;
ret = true;
} else {
request_info_[client_id] = request_info_from::NO_CONTENT; // Only have metadata
}
}
return ret;
}
// type is service_msgs::msg::ServiceEventInfo::REQUEST_SENT
if (iter != request_info_.end()) {
switch (iter->second) {
case request_info_from::CLIENT:
{
// Already decide using request data from client.
ret = true;
break;
}
case request_info_from::NO_CONTENT:
{
if (rosbag2_cpp::service_event_include_metadata_and_contents(message.buffer_length)) {
// introspection type is changed from metadata to metadata + contents
request_info_[client_id] = request_info_from::CLIENT;
ret = true;
} else {
RCUTILS_LOG_WARN_ONCE_NAMED(
ROSBAG2_TRANSPORT_PACKAGE_NAME,
"The configuration of introspection for '%s' client [ID: %s]` is metadata !",
rosbag2_cpp::client_id_to_string(client_id).c_str(),
service_name_.c_str());
}
break;
}
default: // request_info_from::SERVICE:
{
// Already decide using request data from service.
ret = false;
}
}
} else {
if (rosbag2_cpp::service_event_include_metadata_and_contents(message.buffer_length)) {
request_info_[client_id] = request_info_from::CLIENT;
ret = true;
} else {
request_info_[client_id] = request_info_from::NO_CONTENT;
}
}
return ret;
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Barry-Xu-2018 Thanks for finding and pointing out this. It seems I overlook these cases.
From one side I understand that it would be nice to have some autodetection algorithm to be able to decide whether to use a service request from the original client request or from the service request confirmation event message.
However, this sophisticated logic could have corner cases and would be difficult to understand or explain for final users.
In addition, it seems the original implementation with a "hidden" autodetection algorithm from the PlayerServiceClient::is_include_request_message() is not going to work due to the found issue on the RMW layer [service introspection event does not have unique GID during transaction
ros2#357](ros2/rmw#357).

I am more inclined to implement a simple logic for a while to allow publish service requests upon a boolean flag by client request or by service confirmation and add this flag to the settings and perhaps to the CLI or env variable. Need to figure out what is better on the current stage.

Also, I think that we need to publish original service events in all other cases to be able to see them in the topic echo during playback or other introspection tools.

cc: @fujitatomoya

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am more inclined to implement a simple logic for a while to allow publish service requests upon a boolean flag by client request or by service confirmation and add this flag to the settings and perhaps to the CLI or env variable. Need to figure out what is better on the current stage.

that sounds reasonable to me as well. in default, we could rely the service events on service server side. i think this is easier and suggested way to use, but not always we can do this with 3rd party components.

Also, I think that we need to publish original service events in all other cases to be able to see them in the topic echo during playback or other introspection tools.

I really do not see the benefit for this, can you share the use case for this?

i would do, ros2 bag play --service and enable introspection on server application to see the service server is working okay with ros2 service echo. if we publish all the service events on the topic at the same time during playback, that ros2 service echo would be really complicated and printing duplicated messages?

or are you suggesting that we would want to publish the service events with topic via specific option?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MichaelOrlov

However, this sophisticated logic could have corner cases and would be difficult to understand or explain for final users.

Yes. Since users are not aware of the internal selection logic, it can lead to confusion about the result under certain scenario.

I am more inclined to implement a simple logic for a while to allow publish service requests upon a boolean flag by client request or by service confirmation and add this flag to the settings and perhaps to the CLI or env variable. Need to figure out what is better on the current stage.

I also agree with using simpler logic. However, this might result in losing support for complex scenarios, such as some services enabling introspection while others have client introspection enabled. It would require users to uniformly use either service introspection or client introspection.

I incline to add new option for play.
e.g.
--service-request-from {service, client}. service is default value.
If there's a need to add automatic selection in the future (This depends on whether there is a demand from the users.), we can add an "auto" option.

Also, I think that we need to publish original service events in all other cases to be able to see them in the topic echo during playback or other introspection tools.

Are you saying that users, besides being able to replay service requests, can also replay the service event topic like a normal topic, allowing other introspection tools to analyze recorded the information via service event topic?

If yes, enable this function, is there a new parameter for play (if this parameter exists, play service event topic instead of play service request) ? About filter parameters (--services, --exclude-services, etc), do they work? Or user should use --topics, --exclude-topics since service event topic is replayed as normal topic.)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About filter parameters (--services, --exclude-services, etc), do they work? Or user should use --topics, --exclude-topics since service event topic is replayed as normal topic.)

After checking current code, --services and --exclude-services should still function normally, affecting the playback of service events.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Barry-Xu-2018 Ok I've cherry-picked (d6db1bb to support --publish-service-requests) and made some fixes above it. See my new commits on this branch.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MichaelOrlov

I will update other service-related unit tests by this way.

I updated test code based on latest code. Please refer to e096323.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Barry-Xu-2018 Thanks for updating tests in the e096323 I will review your changes at my Thursday morning or first half of the day.
Meanwhile could you please review my changes per this PR one more time?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Barry-Xu-2018 I've cherry-picked your changes in tests from e096323. Honestly, didn't have time to review thoroughly, but this shouldn't be a problem.
If you don't have any more comments/requests for changes for This PR let's move forward and merge rhi PR to your branch.
I think we almost good to go. Will need to rebase and run CI after merge.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some nitpick in mind to fix - but we can do it after code freeze

if (service_event_type == service_msgs::msg::ServiceEventInfo::REQUEST_RECEIVED) {
RCUTILS_LOG_WARN_ONCE_NAMED(
ROSBAG2_TRANSPORT_PACKAGE_NAME,
"Can't send service request. "
"The configuration of introspection for '%s' was metadata only on service side!",
service_client->get_service_name().c_str());
} else if (service_event_type == service_msgs::msg::ServiceEventInfo::REQUEST_SENT) {
RCUTILS_LOG_WARN_ONCE_NAMED(
ROSBAG2_TRANSPORT_PACKAGE_NAME,
"Can't send service request. "
"The configuration of introspection for '%s' client [ID: %s]` was metadata only!",
service_client->get_service_name().c_str(),
rosbag2_cpp::client_id_to_string(client_gid).c_str());
}
return false;
}

// Calling on play message pre-callbacks
run_play_msg_pre_callbacks(message);

bool message_published = false;
try {
client_iter->second->async_send_request(*message->serialized_data);
client_iter->second->async_send_request(service_event);
message_published = true;
} catch (const std::exception & e) {
RCLCPP_ERROR_STREAM(
Expand All @@ -1215,9 +1267,11 @@ bool PlayerImpl::publish_message(rosbag2_storage::SerializedBagMessageSharedPtr
return message_published;
}

RCLCPP_WARN_STREAM(
owner_->get_logger(), "Not find sender for topic '" << message->topic_name << "' topic.");
return message_published;
RCUTILS_LOG_WARN_ONCE_NAMED(
ROSBAG2_TRANSPORT_PACKAGE_NAME,
"Publisher for topic '%s' not found", message->topic_name.c_str());

return false;
}

void PlayerImpl::add_key_callback(
Expand Down
Loading
Loading