Skip to content

Commit

Permalink
Rewrite how Topics are tracked in rmw_fastrtps_cpp. (#669)
Browse files Browse the repository at this point in the history
* Rewrite how Topics are tracked in rmw_fastrtps_cpp.

Every DataReader and DataWriter in the system needs to
have an associated Topic.  Ideally we would create these
as one per publisher/subscriber, but Fast-DDS does not
allow you to create multiple topics within the same
DomainParticipant with the same topic name.  Instead,
what we need to do is track ourselves whether a topic
should be reused.

This was previously done with a combination of TopicHolder
and cast_or_create_topic, but that had a couple of problems.
First of all, a TopicHolder is really a SCOPE_EXIT by a
different name.  Second, cast_or_create_topic worked, but
really left the semantics of who owned a Topic up in the
air.  If you created multiple topics with the same name,
the first one would own it, and thus own the lifetime.
The subsequent ones would reuse that.  But if you happened
to remove the first one, then everything would probably
crash.

Rewrite this whole thing to instead store the list of
Topic pointers in the CustomParticipantInfo, which seems
like a much more appropriate place for it.  When we go
to add them, we first look if it already exists and if so,
just increase the use_count.  If it doesn't exist, we
create it with a use_count of 1.  On deletion, we do the
opposite; decrease the use_count, and call delete_topic
iff the use_count <= 0.  This data is wrapped in another
class called UseCountTopic; we'll also be using this in
the future to associate TopicListener with the Topic.

With this implementation, the semantics are much more clearly
defined, and this should be able to handle deletion as well.
It also happens to remove a bunch of code from the implementation,
which is an added bonus.

Signed-off-by: Chris Lalancette <[email protected]>
  • Loading branch information
clalancette authored Feb 10, 2023
1 parent e12ff1a commit 8510fc1
Show file tree
Hide file tree
Showing 23 changed files with 180 additions and 262 deletions.
2 changes: 1 addition & 1 deletion rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace rmw_fastrtps_cpp

rmw_publisher_t *
create_publisher(
const CustomParticipantInfo * participant_info,
CustomParticipantInfo * participant_info,
const rosidl_message_type_support_t * type_supports,
const char * topic_name,
const rmw_qos_profile_t * qos_policies,
Expand Down
2 changes: 1 addition & 1 deletion rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace rmw_fastrtps_cpp

rmw_subscription_t *
create_subscription(
const CustomParticipantInfo * participant_info,
CustomParticipantInfo * participant_info,
const rosidl_message_type_support_t * type_supports,
const char * topic_name,
const rmw_qos_profile_t * qos_policies,
Expand Down
19 changes: 7 additions & 12 deletions rmw_fastrtps_cpp/src/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@

rmw_publisher_t *
rmw_fastrtps_cpp::create_publisher(
const CustomParticipantInfo * participant_info,
CustomParticipantInfo * participant_info,
const rosidl_message_type_support_t * type_supports,
const char * topic_name,
const rmw_qos_profile_t * qos_policies,
Expand Down Expand Up @@ -164,11 +164,10 @@ rmw_fastrtps_cpp::create_publisher(
}

auto cleanup_info = rcpputils::make_scope_exit(
[info, dds_participant]() {
[info, participant_info]() {
delete info->listener_;
if (info->type_support_) {
dds_participant->unregister_type(info->type_support_.get_type_name());
}
rmw_fastrtps_shared_cpp::remove_topic_and_type(
participant_info, info->topic_, info->type_support_);
delete info;
});

Expand Down Expand Up @@ -225,11 +224,8 @@ rmw_fastrtps_cpp::create_publisher(
return nullptr;
}

rmw_fastrtps_shared_cpp::TopicHolder topic;
if (!rmw_fastrtps_shared_cpp::cast_or_create_topic(
dds_participant, des_topic,
topic_name_mangled, type_name, topic_qos, true, &topic))
{
info->topic_ = participant_info->find_or_create_topic(topic_name_mangled, type_name, topic_qos);
if (!info->topic_) {
RMW_SET_ERROR_MSG("create_publisher() failed to create topic");
return nullptr;
}
Expand Down Expand Up @@ -269,7 +265,7 @@ rmw_fastrtps_cpp::create_publisher(

// Creates DataWriter with a mask enabling publication_matched calls for the listener
info->data_writer_ = publisher->create_datawriter(
topic.topic,
info->topic_,
writer_qos,
info->listener_,
eprosima::fastdds::dds::StatusMask::publication_matched());
Expand Down Expand Up @@ -320,7 +316,6 @@ rmw_fastrtps_cpp::create_publisher(

rmw_publisher->options = *publisher_options;

topic.should_be_deleted = false;
cleanup_rmw_publisher.cancel();
cleanup_datawriter.cancel();
cleanup_info.cancel();
Expand Down
38 changes: 15 additions & 23 deletions rmw_fastrtps_cpp/src/rmw_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,15 +193,13 @@ rmw_create_client(
}

auto cleanup_info = rcpputils::make_scope_exit(
[info, dds_participant]() {
[info, participant_info]() {
delete info->pub_listener_;
delete info->listener_;
if (info->response_type_support_) {
dds_participant->unregister_type(info->response_type_support_.get_type_name());
}
if (info->request_type_support_) {
dds_participant->unregister_type(info->request_type_support_.get_type_name());
}
rmw_fastrtps_shared_cpp::remove_topic_and_type(
participant_info, info->response_topic_, info->response_type_support_);
rmw_fastrtps_shared_cpp::remove_topic_and_type(
participant_info, info->request_topic_, info->request_type_support_);
delete info;
});

Expand Down Expand Up @@ -269,29 +267,25 @@ rmw_create_client(
}

// Create response topic
rmw_fastrtps_shared_cpp::TopicHolder response_topic;
if (!rmw_fastrtps_shared_cpp::cast_or_create_topic(
dds_participant, response_topic_desc,
response_topic_name, response_type_name, topic_qos, false, &response_topic))
{
info->response_topic_ = participant_info->find_or_create_topic(
response_topic_name, response_type_name, topic_qos);
if (!info->response_topic_) {
RMW_SET_ERROR_MSG("create_client() failed to create response topic");
return nullptr;
}

response_topic_desc = response_topic.desc;
response_topic_desc = info->response_topic_;

// Create request topic
rmw_fastrtps_shared_cpp::TopicHolder request_topic;
if (!rmw_fastrtps_shared_cpp::cast_or_create_topic(
dds_participant, request_topic_desc,
request_topic_name, request_type_name, topic_qos, true, &request_topic))
{
info->request_topic_ = participant_info->find_or_create_topic(
request_topic_name, request_type_name, topic_qos);
if (!info->request_topic_) {
RMW_SET_ERROR_MSG("create_client() failed to create request topic");
return nullptr;
}

info->request_topic_ = request_topic_name;
info->response_topic_ = response_topic_name;
info->request_topic_name_ = request_topic_name;
info->response_topic_name_ = response_topic_name;

// Keyword to find DataWriter and DataReader QoS
const std::string topic_name_fallback = "client";
Expand Down Expand Up @@ -381,7 +375,7 @@ rmw_create_client(

// Creates DataWriter with a mask enabling publication_matched calls for the listener
info->request_writer_ = publisher->create_datawriter(
request_topic.topic,
info->request_topic_,
writer_qos,
info->pub_listener_,
eprosima::fastdds::dds::StatusMask::publication_matched());
Expand Down Expand Up @@ -479,8 +473,6 @@ rmw_create_client(
}
}

request_topic.should_be_deleted = false;
response_topic.should_be_deleted = false;
cleanup_rmw_client.cancel();
cleanup_datawriter.cancel();
cleanup_datareader.cancel();
Expand Down
34 changes: 13 additions & 21 deletions rmw_fastrtps_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,15 +197,13 @@ rmw_create_service(
return nullptr;
}
auto cleanup_info = rcpputils::make_scope_exit(
[info, dds_participant]() {
[info, participant_info]() {
delete info->pub_listener_;
delete info->listener_;
if (info->response_type_support_) {
dds_participant->unregister_type(info->response_type_support_.get_type_name());
}
if (info->request_type_support_) {
dds_participant->unregister_type(info->request_type_support_.get_type_name());
}
rmw_fastrtps_shared_cpp::remove_topic_and_type(
participant_info, info->response_topic_, info->response_type_support_);
rmw_fastrtps_shared_cpp::remove_topic_and_type(
participant_info, info->request_topic_, info->request_type_support_);
delete info;
});

Expand Down Expand Up @@ -271,23 +269,19 @@ rmw_create_service(
}

// Create request topic
rmw_fastrtps_shared_cpp::TopicHolder request_topic;
if (!rmw_fastrtps_shared_cpp::cast_or_create_topic(
dds_participant, request_topic_desc,
request_topic_name, request_type_name, topic_qos, false, &request_topic))
{
info->request_topic_ = participant_info->find_or_create_topic(
request_topic_name, request_type_name, topic_qos);
if (!info->request_topic_) {
RMW_SET_ERROR_MSG("create_service() failed to create request topic");
return nullptr;
}

request_topic_desc = request_topic.desc;
request_topic_desc = info->request_topic_;

// Create response topic
rmw_fastrtps_shared_cpp::TopicHolder response_topic;
if (!rmw_fastrtps_shared_cpp::cast_or_create_topic(
dds_participant, response_topic_desc,
response_topic_name, response_type_name, topic_qos, true, &response_topic))
{
info->response_topic_ = participant_info->find_or_create_topic(
response_topic_name, response_type_name, topic_qos);
if (!info->response_topic_) {
RMW_SET_ERROR_MSG("create_service() failed to create response topic");
return nullptr;
}
Expand Down Expand Up @@ -384,7 +378,7 @@ rmw_create_service(

// Creates DataWriter with a mask enabling publication_matched calls for the listener
info->response_writer_ = publisher->create_datawriter(
response_topic.topic,
info->response_topic_,
writer_qos,
info->pub_listener_,
eprosima::fastdds::dds::StatusMask::publication_matched());
Expand Down Expand Up @@ -478,8 +472,6 @@ rmw_create_service(
}
}

request_topic.should_be_deleted = false;
response_topic.should_be_deleted = false;
cleanup_rmw_service.cancel();
cleanup_datawriter.cancel();
cleanup_datareader.cancel();
Expand Down
21 changes: 8 additions & 13 deletions rmw_fastrtps_cpp/src/subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ namespace rmw_fastrtps_cpp

rmw_subscription_t *
create_subscription(
const CustomParticipantInfo * participant_info,
CustomParticipantInfo * participant_info,
const rosidl_message_type_support_t * type_supports,
const char * topic_name,
const rmw_qos_profile_t * qos_policies,
Expand Down Expand Up @@ -163,12 +163,11 @@ create_subscription(
}

auto cleanup_info = rcpputils::make_scope_exit(
[info, dds_participant]()
[info, participant_info]()
{
delete info->listener_;
if (info->type_support_) {
dds_participant->unregister_type(info->type_support_.get_type_name());
}
rmw_fastrtps_shared_cpp::remove_topic_and_type(
participant_info, info->topic_, info->type_support_);
delete info;
});

Expand Down Expand Up @@ -222,20 +221,17 @@ create_subscription(
return nullptr;
}

rmw_fastrtps_shared_cpp::TopicHolder topic;
if (!rmw_fastrtps_shared_cpp::cast_or_create_topic(
dds_participant, des_topic,
topic_name_mangled, type_name, topic_qos, false, &topic))
{
info->topic_ = participant_info->find_or_create_topic(topic_name_mangled, type_name, topic_qos);
if (!info->topic_) {
RMW_SET_ERROR_MSG("create_subscription() failed to create topic");
return nullptr;
}

info->dds_participant_ = dds_participant;
info->subscriber_ = subscriber;
info->topic_name_mangled_ = topic_name_mangled;
info->topic_ = topic.desc;
des_topic = topic.desc;

des_topic = info->topic_;

// Create ContentFilteredTopic
if (subscription_options->content_filter_options) {
Expand Down Expand Up @@ -338,7 +334,6 @@ create_subscription(
rmw_fastrtps_shared_cpp::__init_subscription_for_loans(rmw_subscription);
rmw_subscription->is_cft_enabled = info->filtered_topic_ != nullptr;

topic.should_be_deleted = false;
cleanup_rmw_subscription.cancel();
cleanup_datareader.cancel();
cleanup_info.cancel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace rmw_fastrtps_dynamic_cpp

rmw_publisher_t *
create_publisher(
const CustomParticipantInfo * participant_info,
CustomParticipantInfo * participant_info,
const rosidl_message_type_support_t * type_supports,
const char * topic_name,
const rmw_qos_profile_t * qos_policies,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace rmw_fastrtps_dynamic_cpp

rmw_subscription_t *
create_subscription(
const CustomParticipantInfo * participant_info,
CustomParticipantInfo * participant_info,
const rosidl_message_type_support_t * type_supports,
const char * topic_name,
const rmw_qos_profile_t * qos_policies,
Expand Down
19 changes: 7 additions & 12 deletions rmw_fastrtps_dynamic_cpp/src/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ using TypeSupportProxy = rmw_fastrtps_dynamic_cpp::TypeSupportProxy;

rmw_publisher_t *
rmw_fastrtps_dynamic_cpp::create_publisher(
const CustomParticipantInfo * participant_info,
CustomParticipantInfo * participant_info,
const rosidl_message_type_support_t * type_supports,
const char * topic_name,
const rmw_qos_profile_t * qos_policies,
Expand Down Expand Up @@ -164,11 +164,10 @@ rmw_fastrtps_dynamic_cpp::create_publisher(
}

auto cleanup_info = rcpputils::make_scope_exit(
[info, dds_participant]() {
[info, participant_info]() {
delete info->listener_;
if (info->type_support_) {
dds_participant->unregister_type(info->type_support_.get_type_name());
}
rmw_fastrtps_shared_cpp::remove_topic_and_type(
participant_info, info->topic_, info->type_support_);
delete info;
});

Expand Down Expand Up @@ -231,11 +230,8 @@ rmw_fastrtps_dynamic_cpp::create_publisher(
return nullptr;
}

rmw_fastrtps_shared_cpp::TopicHolder topic;
if (!rmw_fastrtps_shared_cpp::cast_or_create_topic(
dds_participant, des_topic,
topic_name_mangled, type_name, topic_qos, true, &topic))
{
info->topic_ = participant_info->find_or_create_topic(topic_name_mangled, type_name, topic_qos);
if (!info->topic_) {
RMW_SET_ERROR_MSG("create_publisher() failed to create topic");
return nullptr;
}
Expand Down Expand Up @@ -275,7 +271,7 @@ rmw_fastrtps_dynamic_cpp::create_publisher(

// Creates DataWriter (with publisher name to not change name policy)
info->data_writer_ = publisher->create_datawriter(
topic.topic,
info->topic_,
writer_qos,
info->listener_,
eprosima::fastdds::dds::StatusMask::publication_matched());
Expand Down Expand Up @@ -325,7 +321,6 @@ rmw_fastrtps_dynamic_cpp::create_publisher(

rmw_publisher->options = *publisher_options;

topic.should_be_deleted = false;
cleanup_rmw_publisher.cancel();
cleanup_datawriter.cancel();
return_type_support.cancel();
Expand Down
Loading

0 comments on commit 8510fc1

Please sign in to comment.