Skip to content

Commit

Permalink
Add content filter topic feature (#513)
Browse files Browse the repository at this point in the history
* Add content filter topic feature

Signed-off-by: Chen.Lihui <[email protected]>
  • Loading branch information
Chen Lihui authored Mar 21, 2022
1 parent a4c07a1 commit 490f06e
Show file tree
Hide file tree
Showing 17 changed files with 933 additions and 49 deletions.
4 changes: 2 additions & 2 deletions rmw_fastrtps_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ ament_export_dependencies(rosidl_typesupport_fastrtps_c)
ament_export_dependencies(rosidl_typesupport_fastrtps_cpp)

register_rmw_implementation(
"c:rosidl_typesupport_c:rosidl_typesupport_fastrtps_c"
"cpp:rosidl_typesupport_cpp:rosidl_typesupport_fastrtps_cpp")
"c:rosidl_typesupport_c:rosidl_typesupport_fastrtps_c:rosidl_typesupport_introspection_c"
"cpp:rosidl_typesupport_cpp:rosidl_typesupport_fastrtps_cpp:rosidl_typesupport_introspection_cpp")

if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
Expand Down
7 changes: 7 additions & 0 deletions rmw_fastrtps_cpp/src/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,13 @@ rmw_fastrtps_cpp::create_publisher(
}
info->type_support_ = fastdds_type;

if (!rmw_fastrtps_shared_cpp::register_type_object(type_supports, type_name)) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"failed to register type object with incompatible type %s",
type_name.c_str());
return nullptr;
}

/////
// Create Listener
if (create_publisher_listener) {
Expand Down
43 changes: 42 additions & 1 deletion rmw_fastrtps_cpp/src/rmw_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ rmw_create_subscription(
}

auto common_context = static_cast<rmw_dds_common::Context *>(node->context->impl->common);
auto info = static_cast<const CustomSubscriberInfo *>(subscription->data);
auto info = static_cast<CustomSubscriberInfo *>(subscription->data);

{
// Update graph
std::lock_guard<std::mutex> guard(common_context->node_update_mutex);
Expand Down Expand Up @@ -112,6 +113,9 @@ rmw_create_subscription(
return nullptr;
}
}
info->node_ = node;
info->common_context_ = common_context;

return subscription;
}

Expand Down Expand Up @@ -148,6 +152,43 @@ rmw_subscription_get_actual_qos(
return rmw_fastrtps_shared_cpp::__rmw_subscription_get_actual_qos(subscription, qos);
}

rmw_ret_t
rmw_subscription_set_content_filter(
rmw_subscription_t * subscription,
const rmw_subscription_content_filter_options_t * options)
{
RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(options, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
subscription,
subscription->implementation_identifier,
eprosima_fastrtps_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
rmw_ret_t ret = rmw_fastrtps_shared_cpp::__rmw_subscription_set_content_filter(
subscription, options);
auto info = static_cast<const CustomSubscriberInfo *>(subscription->data);
subscription->is_cft_enabled = (info && info->filtered_topic_);
return ret;
}

rmw_ret_t
rmw_subscription_get_content_filter(
const rmw_subscription_t * subscription,
rcutils_allocator_t * allocator,
rmw_subscription_content_filter_options_t * options)
{
RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(allocator, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(options, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
subscription,
subscription->implementation_identifier,
eprosima_fastrtps_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
return rmw_fastrtps_shared_cpp::__rmw_subscription_get_content_filter(
subscription, allocator, options);
}

rmw_ret_t
rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription)
{
Expand Down
76 changes: 40 additions & 36 deletions rmw_fastrtps_cpp/src/subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,13 @@ create_subscription(
}
info->type_support_ = fastdds_type;

if (!rmw_fastrtps_shared_cpp::register_type_object(type_supports, type_name)) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"failed to register type object with incompatible type %s",
type_name.c_str());
return nullptr;
}

/////
// Create Listener
if (create_subscription_listener) {
Expand Down Expand Up @@ -224,8 +231,30 @@ create_subscription(
return nullptr;
}

info->dds_participant_ = dds_participant;
info->subscriber_ = subscriber;
info->topic_name_mangled_ = topic_name_mangled;
info->topic_ = topic.desc;
des_topic = topic.desc;

// Create ContentFilteredTopic
if (subscription_options->content_filter_options) {
rmw_subscription_content_filter_options_t * options =
subscription_options->content_filter_options;
if (nullptr != options->filter_expression) {
eprosima::fastdds::dds::ContentFilteredTopic * filtered_topic = nullptr;
if (!rmw_fastrtps_shared_cpp::create_content_filtered_topic(
dds_participant, des_topic,
topic_name_mangled, options, &filtered_topic))
{
RMW_SET_ERROR_MSG("create_contentfilteredtopic() failed to create contentfilteredtopic");
return nullptr;
}
info->filtered_topic_ = filtered_topic;
des_topic = filtered_topic;
}
}

/////
// Create DataReader

Expand All @@ -251,44 +280,18 @@ create_subscription(
return nullptr;
}

eprosima::fastdds::dds::DataReaderQos original_qos = reader_qos;
switch (subscription_options->require_unique_network_flow_endpoints) {
default:
case RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_SYSTEM_DEFAULT:
case RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_NOT_REQUIRED:
// Unique network flow endpoints not required. We leave the decission to the XML profile.
break;

case RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_OPTIONALLY_REQUIRED:
case RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_STRICTLY_REQUIRED:
// Ensure we request unique network flow endpoints
if (nullptr ==
PropertyPolicyHelper::find_property(
reader_qos.properties(),
"fastdds.unique_network_flows"))
{
reader_qos.properties().properties().emplace_back("fastdds.unique_network_flows", "");
}
break;
}
info->datareader_qos_ = reader_qos;

// Creates DataReader (with subscriber name to not change name policy)
info->data_reader_ = subscriber->create_datareader(
des_topic,
reader_qos,
info->listener_);
if (!info->data_reader_ &&
(RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_OPTIONALLY_REQUIRED ==
subscription_options->require_unique_network_flow_endpoints))
{
info->data_reader_ = subscriber->create_datareader(
// create_datareader
if (!rmw_fastrtps_shared_cpp::create_datareader(
info->datareader_qos_,
subscription_options,
subscriber,
des_topic,
original_qos,
info->listener_);
}

if (!info->data_reader_) {
RMW_SET_ERROR_MSG("create_subscription() could not create data reader");
info->listener_,
&info->data_reader_))
{
RMW_SET_ERROR_MSG("create_datareader() could not create data reader");
return nullptr;
}

Expand Down Expand Up @@ -327,6 +330,7 @@ create_subscription(
}
rmw_subscription->options = *subscription_options;
rmw_fastrtps_shared_cpp::__init_subscription_for_loans(rmw_subscription);
rmw_subscription->is_cft_enabled = info->filtered_topic_ != nullptr;

topic.should_be_deleted = false;
cleanup_rmw_subscription.cancel();
Expand Down
31 changes: 30 additions & 1 deletion rmw_fastrtps_dynamic_cpp/src/rmw_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ rmw_create_subscription(
}

auto common_context = static_cast<rmw_dds_common::Context *>(node->context->impl->common);
auto info = static_cast<const CustomSubscriberInfo *>(subscription->data);
auto info = static_cast<CustomSubscriberInfo *>(subscription->data);
{
// Update graph
std::lock_guard<std::mutex> guard(common_context->node_update_mutex);
Expand Down Expand Up @@ -115,6 +115,9 @@ rmw_create_subscription(
return nullptr;
}
}
info->node_ = node;
info->common_context_ = common_context;

return subscription;
}

Expand Down Expand Up @@ -151,6 +154,32 @@ rmw_subscription_get_actual_qos(
return rmw_fastrtps_shared_cpp::__rmw_subscription_get_actual_qos(subscription, qos);
}

rmw_ret_t
rmw_subscription_set_content_filter(
rmw_subscription_t * subscription,
const rmw_subscription_content_filter_options_t * options)
{
// Unused in current implementation.
(void) subscription;
(void) options;
RMW_SET_ERROR_MSG("unimplemented");
return RMW_RET_UNSUPPORTED;
}

rmw_ret_t
rmw_subscription_get_content_filter(
const rmw_subscription_t * subscription,
rcutils_allocator_t * allocator,
rmw_subscription_content_filter_options_t * options)
{
// Unused in current implementation.
(void) subscription;
(void) allocator;
(void) options;
RMW_SET_ERROR_MSG("unimplemented");
return RMW_RET_UNSUPPORTED;
}

using BaseTypeSupport = rmw_fastrtps_dynamic_cpp::BaseTypeSupport;

rmw_ret_t
Expand Down
2 changes: 2 additions & 0 deletions rmw_fastrtps_dynamic_cpp/src/subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,8 @@ create_subscription(

rmw_subscription->options = *subscription_options;
rmw_fastrtps_shared_cpp::__init_subscription_for_loans(rmw_subscription);
// TODO(iuhilnehc-ynos): update after rmw_fastrtps_cpp is confirmed
rmw_subscription->is_cft_enabled = false;

topic.should_be_deleted = false;
cleanup_rmw_subscription.cancel();
Expand Down
6 changes: 6 additions & 0 deletions rmw_fastrtps_shared_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ find_package(ament_cmake_ros REQUIRED)
find_package(rcpputils REQUIRED)
find_package(rcutils REQUIRED)
find_package(rmw_dds_common REQUIRED)
find_package(rosidl_typesupport_introspection_c REQUIRED)
find_package(rosidl_typesupport_introspection_cpp REQUIRED)

find_package(fastrtps_cmake_module REQUIRED)
find_package(fastcdr REQUIRED CONFIG)
Expand Down Expand Up @@ -105,6 +107,8 @@ ament_target_dependencies(rmw_fastrtps_shared_cpp
"rcutils"
"rmw"
"rmw_dds_common"
"rosidl_typesupport_introspection_c"
"rosidl_typesupport_introspection_cpp"
)

# Causes the visibility macros to use dllexport rather than dllimport,
Expand All @@ -124,6 +128,8 @@ ament_export_dependencies(rcpputils)
ament_export_dependencies(rcutils)
ament_export_dependencies(rmw)
ament_export_dependencies(rmw_dds_common)
ament_export_dependencies(rosidl_typesupport_introspection_c)
ament_export_dependencies(rosidl_typesupport_introspection_cpp)

if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

#include "rcutils/logging_macros.h"

#include "rosidl_runtime_c/message_type_support_struct.h"

#include "./visibility_control.h"

namespace rmw_fastrtps_shared_cpp
Expand Down Expand Up @@ -106,6 +108,11 @@ class TypeSupport : public eprosima::fastdds::dds::TopicDataType
bool is_plain_;
};

RMW_FASTRTPS_SHARED_CPP_PUBLIC
bool register_type_object(
const rosidl_message_type_support_t * type_supports,
const std::string & type_name);

} // namespace rmw_fastrtps_shared_cpp

#endif // RMW_FASTRTPS_SHARED_CPP__TYPESUPPORT_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@
#include <memory>
#include <mutex>
#include <set>
#include <string>
#include <utility>

#include "fastdds/dds/core/status/DeadlineMissedStatus.hpp"
#include "fastdds/dds/core/status/LivelinessChangedStatus.hpp"
#include "fastdds/dds/core/status/SubscriptionMatchedStatus.hpp"
#include "fastdds/dds/subscriber/DataReader.hpp"
#include "fastdds/dds/subscriber/DataReaderListener.hpp"
#include "fastdds/dds/subscriber/qos/DataReaderQos.hpp"
#include "fastdds/dds/topic/ContentFilteredTopic.hpp"
#include "fastdds/dds/topic/TypeSupport.hpp"

#include "fastdds/rtps/common/Guid.h"
Expand All @@ -39,6 +42,8 @@
#include "rmw/impl/cpp/macros.hpp"
#include "rmw/event_callback_type.h"

#include "rmw_dds_common/context.hpp"

#include "rmw_fastrtps_shared_cpp/custom_event_info.hpp"


Expand All @@ -61,6 +66,16 @@ struct CustomSubscriberInfo : public CustomEventInfo
const char * typesupport_identifier_{nullptr};
std::shared_ptr<rmw_fastrtps_shared_cpp::LoanManager> loan_manager_;

// for re-create or delete content filtered topic
const rmw_node_t * node_ {nullptr};
rmw_dds_common::Context * common_context_ {nullptr};
eprosima::fastdds::dds::DomainParticipant * dds_participant_ {nullptr};
eprosima::fastdds::dds::Subscriber * subscriber_ {nullptr};
std::string topic_name_mangled_;
eprosima::fastdds::dds::TopicDescription * topic_ {nullptr};
eprosima::fastdds::dds::ContentFilteredTopic * filtered_topic_ {nullptr};
eprosima::fastdds::dds::DataReaderQos datareader_qos_;

RMW_FASTRTPS_SHARED_CPP_PUBLIC
EventListenerInterface *
getListener() const final;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,8 @@ rmw_ret_t
__rmw_destroy_subscription(
const char * identifier,
const rmw_node_t * node,
rmw_subscription_t * subscription);
rmw_subscription_t * subscription,
bool reset_cft = false);

RMW_FASTRTPS_SHARED_CPP_PUBLIC
rmw_ret_t
Expand All @@ -318,6 +319,19 @@ __rmw_subscription_get_actual_qos(
const rmw_subscription_t * subscription,
rmw_qos_profile_t * qos);

RMW_FASTRTPS_SHARED_CPP_PUBLIC
rmw_ret_t
__rmw_subscription_set_content_filter(
rmw_subscription_t * subscription,
const rmw_subscription_content_filter_options_t * options);

RMW_FASTRTPS_SHARED_CPP_PUBLIC
rmw_ret_t
__rmw_subscription_get_content_filter(
const rmw_subscription_t * subscription,
rcutils_allocator_t * allocator,
rmw_subscription_content_filter_options_t * options);

RMW_FASTRTPS_SHARED_CPP_PUBLIC
rmw_ret_t
__rmw_service_response_publisher_get_actual_qos(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ rmw_ret_t
destroy_subscription(
const char * identifier,
CustomParticipantInfo * participant_info,
rmw_subscription_t * subscription);
rmw_subscription_t * subscription,
bool reset_cft = false);

} // namespace rmw_fastrtps_shared_cpp

Expand Down
Loading

0 comments on commit 490f06e

Please sign in to comment.