Skip to content

Commit

Permalink
add implementation for rmw_fastrtps_cpp and make build successfully
Browse files Browse the repository at this point in the history
failed to run the tests in test_rmw_implementation because
'Could not find factory for filter class DDSSQL -> Function create_contentfilteredtopic'

Signed-off-by: Chen Lihui <[email protected]>
  • Loading branch information
Chen Lihui committed Jan 14, 2022
1 parent 9c23c8f commit c2bc5c1
Show file tree
Hide file tree
Showing 11 changed files with 395 additions and 59 deletions.
39 changes: 27 additions & 12 deletions rmw_fastrtps_cpp/src/rmw_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ rmw_create_subscription(
}

auto common_context = static_cast<rmw_dds_common::Context *>(node->context->impl->common);
auto info = static_cast<const CustomSubscriberInfo *>(subscription->data);
auto info = static_cast<CustomSubscriberInfo *>(subscription->data);

{
// Update graph
std::lock_guard<std::mutex> guard(common_context->node_update_mutex);
Expand Down Expand Up @@ -112,6 +113,9 @@ rmw_create_subscription(
return nullptr;
}
}
info->node_ = node;
info->common_context_ = common_context;

return subscription;
}

Expand Down Expand Up @@ -153,11 +157,18 @@ rmw_subscription_set_content_filter(
rmw_subscription_t * subscription,
const rmw_subscription_content_filter_options_t * options)
{
// Unused in current implementation.
(void) subscription;
(void) options;
RMW_SET_ERROR_MSG("unimplemented");
return RMW_RET_UNSUPPORTED;
RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(options, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
subscription,
subscription->implementation_identifier,
eprosima_fastrtps_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
rmw_ret_t ret = rmw_fastrtps_shared_cpp::__rmw_subscription_set_content_filter(
subscription, options);
auto info = static_cast<const CustomSubscriberInfo *>(subscription->data);
subscription->is_cft_enabled = (info && info->filtered_topic_);
return ret;
}

rmw_ret_t
Expand All @@ -166,12 +177,16 @@ rmw_subscription_get_content_filter(
rcutils_allocator_t * allocator,
rmw_subscription_content_filter_options_t * options)
{
// Unused in current implementation.
(void) subscription;
(void) allocator;
(void) options;
RMW_SET_ERROR_MSG("unimplemented");
return RMW_RET_UNSUPPORTED;
RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(allocator, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(options, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
subscription,
subscription->implementation_identifier,
eprosima_fastrtps_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
return rmw_fastrtps_shared_cpp::__rmw_subscription_get_content_filter(
subscription, allocator, options);
}

rmw_ret_t
Expand Down
70 changes: 33 additions & 37 deletions rmw_fastrtps_cpp/src/subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,30 @@ create_subscription(
return nullptr;
}

info->dds_participant_ = dds_participant;
info->subscriber_ = subscriber;
info->topic_name_mangled_ = topic_name_mangled;
info->topic_ = topic.desc;
des_topic = topic.desc;

// Create ContentFilteredTopic
if (subscription_options->content_filter_options) {
rmw_subscription_content_filter_options_t * options =
subscription_options->content_filter_options;
if (nullptr != options->filter_expression) {
eprosima::fastdds::dds::ContentFilteredTopic * filtered_topic = nullptr;
if (!rmw_fastrtps_shared_cpp::create_content_filtered_topic(
dds_participant, des_topic,
topic_name_mangled, options, &filtered_topic))
{
RMW_SET_ERROR_MSG("create_contentfilteredtopic() failed to create contentfilteredtopic");
return nullptr;
}
info->filtered_topic_ = filtered_topic;
des_topic = filtered_topic;
}
}

/////
// Create DataReader

Expand All @@ -252,44 +274,18 @@ create_subscription(
return nullptr;
}

eprosima::fastdds::dds::DataReaderQos original_qos = reader_qos;
switch (subscription_options->require_unique_network_flow_endpoints) {
default:
case RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_SYSTEM_DEFAULT:
case RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_NOT_REQUIRED:
// Unique network flow endpoints not required. We leave the decission to the XML profile.
break;

case RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_OPTIONALLY_REQUIRED:
case RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_STRICTLY_REQUIRED:
// Ensure we request unique network flow endpoints
if (nullptr ==
PropertyPolicyHelper::find_property(
reader_qos.properties(),
"fastdds.unique_network_flows"))
{
reader_qos.properties().properties().emplace_back("fastdds.unique_network_flows", "");
}
break;
}
info->datareader_qos_ = reader_qos;

// Creates DataReader (with subscriber name to not change name policy)
info->data_reader_ = subscriber->create_datareader(
des_topic,
reader_qos,
info->listener_);
if (!info->data_reader_ &&
(RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_OPTIONALLY_REQUIRED ==
subscription_options->require_unique_network_flow_endpoints))
{
info->data_reader_ = subscriber->create_datareader(
// create_datareader
if (!rmw_fastrtps_shared_cpp::create_datareader(
info->datareader_qos_,
subscription_options,
subscriber,
des_topic,
original_qos,
info->listener_);
}

if (!info->data_reader_) {
RMW_SET_ERROR_MSG("create_subscription() could not create data reader");
info->listener_,
&info->data_reader_))
{
RMW_SET_ERROR_MSG("create_datareader() could not create data reader");
return nullptr;
}

Expand Down Expand Up @@ -328,7 +324,7 @@ create_subscription(
}
rmw_subscription->options = *subscription_options;
rmw_fastrtps_shared_cpp::__init_subscription_for_loans(rmw_subscription);
rmw_subscription->is_cft_enabled = false;
rmw_subscription->is_cft_enabled = info->filtered_topic_ != nullptr;

topic.should_be_deleted = false;
cleanup_rmw_subscription.cancel();
Expand Down
5 changes: 4 additions & 1 deletion rmw_fastrtps_dynamic_cpp/src/rmw_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ rmw_create_subscription(
}

auto common_context = static_cast<rmw_dds_common::Context *>(node->context->impl->common);
auto info = static_cast<const CustomSubscriberInfo *>(subscription->data);
auto info = static_cast<CustomSubscriberInfo *>(subscription->data);
{
// Update graph
std::lock_guard<std::mutex> guard(common_context->node_update_mutex);
Expand Down Expand Up @@ -115,6 +115,9 @@ rmw_create_subscription(
return nullptr;
}
}
info->node_ = node;
info->common_context_ = common_context;

return subscription;
}

Expand Down
1 change: 1 addition & 0 deletions rmw_fastrtps_dynamic_cpp/src/subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ create_subscription(

rmw_subscription->options = *subscription_options;
rmw_fastrtps_shared_cpp::__init_subscription_for_loans(rmw_subscription);
// TODO(iuhilnehc-ynos): update after rmw_fastrtps_cpp is confirmed
rmw_subscription->is_cft_enabled = false;

topic.should_be_deleted = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
#include "fastdds/dds/core/status/SubscriptionMatchedStatus.hpp"
#include "fastdds/dds/subscriber/DataReader.hpp"
#include "fastdds/dds/subscriber/DataReaderListener.hpp"
#include "fastdds/dds/subscriber/qos/DataReaderQos.hpp"
#include "fastdds/dds/topic/ContentFilteredTopic.hpp"
#include "fastdds/dds/topic/TypeSupport.hpp"

#include "fastdds/rtps/common/Guid.h"
Expand All @@ -36,6 +38,8 @@

#include "rmw/impl/cpp/macros.hpp"

#include "rmw_dds_common/context.hpp"

#include "rmw_fastrtps_shared_cpp/custom_event_info.hpp"


Expand All @@ -58,6 +62,16 @@ struct CustomSubscriberInfo : public CustomEventInfo
const char * typesupport_identifier_{nullptr};
std::shared_ptr<rmw_fastrtps_shared_cpp::LoanManager> loan_manager_;

// for re-create or delete content filtered topic
const rmw_node_t * node_ {nullptr};
rmw_dds_common::Context * common_context_ {nullptr};
eprosima::fastdds::dds::DomainParticipant * dds_participant_ {nullptr};
eprosima::fastdds::dds::Subscriber * subscriber_ {nullptr};
std::string topic_name_mangled_;
eprosima::fastdds::dds::TopicDescription * topic_ {nullptr};
eprosima::fastdds::dds::ContentFilteredTopic * filtered_topic_ {nullptr};
eprosima::fastdds::dds::DataReaderQos datareader_qos_;

RMW_FASTRTPS_SHARED_CPP_PUBLIC
EventListenerInterface *
getListener() const final;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,8 @@ rmw_ret_t
__rmw_destroy_subscription(
const char * identifier,
const rmw_node_t * node,
rmw_subscription_t * subscription);
rmw_subscription_t * subscription,
bool reset_cft = false);

RMW_FASTRTPS_SHARED_CPP_PUBLIC
rmw_ret_t
Expand All @@ -317,6 +318,19 @@ __rmw_subscription_get_actual_qos(
const rmw_subscription_t * subscription,
rmw_qos_profile_t * qos);

RMW_FASTRTPS_SHARED_CPP_PUBLIC
rmw_ret_t
__rmw_subscription_set_content_filter(
rmw_subscription_t * subscription,
const rmw_subscription_content_filter_options_t * options);

RMW_FASTRTPS_SHARED_CPP_PUBLIC
rmw_ret_t
__rmw_subscription_get_content_filter(
const rmw_subscription_t * subscription,
rcutils_allocator_t * allocator,
rmw_subscription_content_filter_options_t * options);

RMW_FASTRTPS_SHARED_CPP_PUBLIC
rmw_ret_t
__rmw_take(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ rmw_ret_t
destroy_subscription(
const char * identifier,
CustomParticipantInfo * participant_info,
rmw_subscription_t * subscription);
rmw_subscription_t * subscription,
bool reset_cft = false);

} // namespace rmw_fastrtps_shared_cpp

Expand Down
45 changes: 45 additions & 0 deletions rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "fastrtps/types/TypesBase.h"

#include "rmw_fastrtps_shared_cpp/custom_participant_info.hpp"
#include "rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp"
#include "rmw_fastrtps_shared_cpp/TypeSupport.hpp"

#include "rmw/rmw.h"
Expand Down Expand Up @@ -137,6 +138,50 @@ remove_topic_and_type(
const eprosima::fastdds::dds::TopicDescription * topic,
const eprosima::fastdds::dds::TypeSupport & type);

/**
* Create content filtered topic.
*
* \param[in] participant DomainParticipant where the topic will be created.
* \param[in] topic_desc TopicDescription returned by find_and_check_topic_and_type.
* \param[in] topic_name_mangled Mangled Name of the topic.
* \param[in] options Options of the content filtered topic.
* \param[out] content_filtered_topic Will hold the pointer to the content filtered topic along
with the necessary information for its deletion.
*
* \return true when the content filtered topic was created
* \return false when the content filtered topic could not be created
*/
bool
create_content_filtered_topic(
eprosima::fastdds::dds::DomainParticipant * participant,
eprosima::fastdds::dds::TopicDescription * topic_desc,
const std::string & topic_name_mangled,
const rmw_subscription_content_filter_options_t * options,
eprosima::fastdds::dds::ContentFilteredTopic ** content_filtered_topic);


/**
* Create data reader.
*
* \param[in] datareader_qos QoS of data reader.
* \param[in] subscription_options Options of the subscription.
* \param[in] subscriber A subsciber to create the data reader.
* \param[in] des_topic TopicDescription returned by find_and_check_topic_and_type.
* \param[in] listener The listener of the data reader.
* \param[out] data_reader Will hold the pointer to the data reader.
*
* \return true when the data reader was created
* \return false when the data reader could not be created
*/
bool
create_datareader(
const eprosima::fastdds::dds::DataReaderQos & datareader_qos,
const rmw_subscription_options_t * subscription_options,
eprosima::fastdds::dds::Subscriber * subscriber,
eprosima::fastdds::dds::TopicDescription * des_topic,
SubListener * listener,
eprosima::fastdds::dds::DataReader ** data_reader);

} // namespace rmw_fastrtps_shared_cpp

#endif // RMW_FASTRTPS_SHARED_CPP__UTILS_HPP_
Loading

0 comments on commit c2bc5c1

Please sign in to comment.