Skip to content

Commit

Permalink
fuse -> ROS 2 fuse_publishers: Port fuse_publishers (#299)
Browse files Browse the repository at this point in the history
Co-authored-by: Shane Loretz <[email protected]>
  • Loading branch information
methylDragon and sloretz authored Dec 30, 2022
1 parent ecccdac commit 5f4c5ed
Show file tree
Hide file tree
Showing 35 changed files with 765 additions and 629 deletions.
5 changes: 4 additions & 1 deletion fuse_constraints/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ install(DIRECTORY include/
DESTINATION include/${PROJECT_NAME}
)

install(DIRECTORY cmake DESTINATION share/${PROJECT_NAME})
install(
DIRECTORY cmake
DESTINATION share/${PROJECT_NAME}
)

pluginlib_export_plugin_description_file(fuse_core fuse_plugins.xml)

Expand Down
2 changes: 1 addition & 1 deletion fuse_constraints/suitesparse-extras.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@
# POSSIBILITY OF SUCH DAMAGE.

if(DEFINED fuse_constraints_DIR)
list(APPEND CMAKE_MODULE_PATH "${fuse_constraints_DIR}")
list(APPEND CMAKE_MODULE_PATH "${fuse_constraints_DIR}")
endif()
find_package(SUITESPARSE REQUIRED COMPONENTS CCOLAMD)
14 changes: 10 additions & 4 deletions fuse_core/include/fuse_core/async_publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include <fuse_core/callback_wrapper.hpp>
#include <fuse_core/fuse_macros.hpp>
#include <fuse_core/graph.hpp>
#include <fuse_core/node_interfaces/node_interfaces.hpp>
#include <fuse_core/publisher.hpp>
#include <fuse_core/transaction.hpp>
#include <rclcpp/rclcpp.hpp>
Expand Down Expand Up @@ -81,10 +82,13 @@ class AsyncPublisher : public Publisher
* AsyncPublisher::onInit() method will be called from here, once the internal node is properly
* configured.
*
* @param[in] interfaces The node interfaces to be used with the publisher
* @param[in] name A unique name to give this plugin instance
* @throws runtime_error if already initialized
*/
void initialize(const std::string & name) override;
void initialize(
node_interfaces::NodeInterfaces<ALL_FUSE_CORE_NODE_INTERFACES> interfaces,
const std::string & name) override;

/**
* @brief Get the unique name of this publisher
Expand Down Expand Up @@ -151,12 +155,14 @@ class AsyncPublisher : public Publisher
std::shared_ptr<fuse_core::CallbackAdapter> callback_queue_;

std::string name_; //!< The unique name for this publisher instance
rclcpp::Node::SharedPtr node_; //!< The node for this publisher
rclcpp::CallbackGroup::SharedPtr cb_group_; //!< Internal re-entrant callback group

//! The node interfaces
node_interfaces::NodeInterfaces<node_interfaces::Base, node_interfaces::Waitables> interfaces_;

//! A single/multi-threaded executor assigned to the local callback queue
rclcpp::executors::MultiThreadedExecutor::SharedPtr executor_;
size_t executor_thread_count_;
rclcpp::Executor::SharedPtr executor_;
size_t executor_thread_count_{1};
std::thread spinner_; //!< Internal thread for spinning the executor
std::atomic<bool> initialized_ = false; //!< True if instance has been fully initialized

Expand Down
13 changes: 13 additions & 0 deletions fuse_core/include/fuse_core/node_interfaces/node_interfaces.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,19 @@

#include "fuse_core/node_interfaces/node_interfaces_helpers.hpp"

#define ALL_FUSE_CORE_NODE_INTERFACES \
fuse_core::node_interfaces::Base, \
fuse_core::node_interfaces::Clock, \
fuse_core::node_interfaces::Graph, \
fuse_core::node_interfaces::Logging, \
fuse_core::node_interfaces::Parameters, \
fuse_core::node_interfaces::Services, \
fuse_core::node_interfaces::TimeSource, \
fuse_core::node_interfaces::Timers, \
fuse_core::node_interfaces::Topics, \
fuse_core::node_interfaces::Waitables


namespace fuse_core
{
namespace node_interfaces
Expand Down
6 changes: 5 additions & 1 deletion fuse_core/include/fuse_core/publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,13 @@ class Publisher
* conflicts and allow the same plugin to be used multiple times with different settings and
* topics.
*
* This requires all possible interfaces, but plugins may choose to use only some of them.
*
* @param[in] name A unique name to give this plugin instance
*/
virtual void initialize(const std::string & name) = 0;
virtual void initialize(
node_interfaces::NodeInterfaces<ALL_FUSE_CORE_NODE_INTERFACES> interfaces,
const std::string & name) = 0;

/**
* @brief Get the unique name of this publisher
Expand Down
46 changes: 24 additions & 22 deletions fuse_core/src/async_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,43 +48,45 @@ AsyncPublisher::~AsyncPublisher()
internal_stop();
}

void AsyncPublisher::initialize(const std::string & name)
void AsyncPublisher::initialize(
node_interfaces::NodeInterfaces<ALL_FUSE_CORE_NODE_INTERFACES> interfaces,
const std::string & name)
{
interfaces_ = interfaces;

if (initialized_) {
throw std::runtime_error("Calling initialize on an already initialized AsyncPublisher!");
}

// Initialize internal state
name_ = name;
std::string node_namespace = "";

// TODO(CH3): Pass in the context or a node to get the context from
rclcpp::Context::SharedPtr ros_context = rclcpp::contexts::get_global_default_context();
auto node_options = rclcpp::NodeOptions();
node_options.context(ros_context); // set a context to generate the node in

// TODO(CH3): Potentially pass in the optimizer node instead of spinning a new one
node_ = rclcpp::Node::make_shared(name_, node_namespace, node_options);
name_ = name; // NOTE(methylDragon): Used in derived classes

auto context = interfaces_.get_node_base_interface()->get_context();
auto executor_options = rclcpp::ExecutorOptions();
executor_options.context = ros_context;
executor_ = rclcpp::executors::MultiThreadedExecutor::make_shared(
executor_options,
executor_thread_count_);
executor_options.context = context;

if (executor_thread_count_ == 1) {
executor_ = rclcpp::executors::SingleThreadedExecutor::make_shared(executor_options);
} else {
executor_ = rclcpp::executors::MultiThreadedExecutor::make_shared(
executor_options, executor_thread_count_);
}

callback_queue_ = std::make_shared<CallbackAdapter>(ros_context);
callback_queue_ =
std::make_shared<CallbackAdapter>(context);

// This callback group MUST be re-entrant in order to support parallelization
cb_group_ = node_->create_callback_group(rclcpp::CallbackGroupType::Reentrant);
node_->get_node_waitables_interface()->add_waitable(
callback_queue_, cb_group_);
cb_group_ = interfaces_.get_node_base_interface()->create_callback_group(
rclcpp::CallbackGroupType::Reentrant, false);
interfaces_.get_node_waitables_interface()->add_waitable(callback_queue_, cb_group_);

// Call the derived onInit() function to perform implementation-specific initialization
onInit();

// Make sure the executor will service the given node
// TODO(sloretz) add just the callback group here when using Optimizer's Node
executor_->add_node(node_);
// We can add this without any guards because the callback group was set to not get automatically
// added to executors
executor_->add_callback_group(cb_group_, interfaces_.get_node_base_interface());

// Start the executor
spinner_ = std::thread(
Expand Down Expand Up @@ -127,7 +129,7 @@ void AsyncPublisher::stop()
{
// Prefer to call onStop in executor's thread so downstream users don't have
// to worry about threads in ROS callbacks when there's only 1 thread.
if (node_->get_node_base_interface()->get_context()->is_valid()) {
if (interfaces_.get_node_base_interface()->get_context()->is_valid()) {
auto callback = std::make_shared<CallbackWrapper<void>>(
std::bind(&AsyncPublisher::onStop, this)
);
Expand Down
2 changes: 1 addition & 1 deletion fuse_core/test/launch_tests/test_parameters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ TEST_F(TestParameters, GetCovarianceDiagonalParam)
fuse_core::Matrix3d default_covariance = fuse_core::Matrix3d::Identity();
default_covariance *= default_variance;

// Load covariance matrix diagonal from the parameter server:
auto node = rclcpp::Node::make_shared("test_parameters_node");

// Load covariance matrix diagonal from the parameter server:
// A covariance diagonal with the expected size and valid should be the same as the expected one:
{
const std::string parameter_name{"covariance_diagonal"};
Expand Down
2 changes: 1 addition & 1 deletion fuse_core/test/test_async_motion_model.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class TestAsyncMotionModel : public ::testing::Test

TEST_F(TestAsyncMotionModel, OnInit)
{
for (int i = 0; i < 250; i++) {
for (int i = 0; i < 50; i++) {
MyMotionModel motion_model;
motion_model.initialize("my_motion_model_" + std::to_string(i));
EXPECT_TRUE(motion_model.initialized);
Expand Down
15 changes: 9 additions & 6 deletions fuse_core/test/test_async_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class MyPublisher : public fuse_core::AsyncPublisher
{
public:
MyPublisher()
: fuse_core::AsyncPublisher(0),
: fuse_core::AsyncPublisher(1),
callback_processed(false),
initialized(false)
{
Expand Down Expand Up @@ -86,25 +86,28 @@ class TestAsyncPublisher : public ::testing::Test

TEST_F(TestAsyncPublisher, OnInit)
{
for (int i = 0; i < 250; i++) {
for (int i = 0; i < 50; i++) {
auto node = rclcpp::Node::make_shared("test_async_pub_node");
MyPublisher publisher;
publisher.initialize("my_publisher_" + std::to_string(i));
publisher.initialize(node, "my_publisher_" + std::to_string(i));
EXPECT_TRUE(publisher.initialized);
}
}

TEST_F(TestAsyncPublisher, DoubleInit)
{
auto node = rclcpp::Node::make_shared("test_async_pub_node");
MyPublisher publisher;
publisher.initialize("my_publisher");
publisher.initialize(node, "my_publisher");
EXPECT_TRUE(publisher.initialized);
EXPECT_THROW(publisher.initialize("test"), std::runtime_error);
EXPECT_THROW(publisher.initialize(node, "test"), std::runtime_error);
}

TEST_F(TestAsyncPublisher, notifyCallback)
{
auto node = rclcpp::Node::make_shared("test_async_pub_node");
MyPublisher publisher;
publisher.initialize("my_publisher");
publisher.initialize(node, "my_publisher");

// Execute the notify() method in this thread. This should push a call to
// MyPublisher::notifyCallback() into MyPublisher's callback queue, which will get executed by
Expand Down
2 changes: 1 addition & 1 deletion fuse_core/test/test_async_sensor_model.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class TestAsyncSensorModel : public ::testing::Test

TEST_F(TestAsyncSensorModel, OnInit)
{
for (int i = 0; i < 250; i++) {
for (int i = 0; i < 50; i++) {
MySensor sensor;
sensor.initialize("my_sensor_" + std::to_string(i), &transactionCallback);
EXPECT_TRUE(sensor.initialized);
Expand Down
2 changes: 1 addition & 1 deletion fuse_models/include/fuse_models/odometry_2d_publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ namespace fuse_models
* - tf, tf_static (tf2_msgs::TFMessage) Subscribes to tf data to obtain the requisite odom->base_link transform,
* but only if the world_frame_id is set to the value of the map_frame_id.
*/
class Odometry2DPublisher : public fuse_core::AsyncPublisher
class Odometry2DPublisher : public fuse_core::AsyncPublisher // TODO(methylDragon): Refactor this in the same way it was done in fuse_publishers
{
public:
FUSE_SMART_PTR_DEFINITIONS_WITH_EIGEN(Odometry2DPublisher)
Expand Down
Loading

0 comments on commit 5f4c5ed

Please sign in to comment.