Skip to content

Commit

Permalink
Using new callback wrapper in AsyncMotionModel
Browse files Browse the repository at this point in the history
  • Loading branch information
ayrton04 committed May 19, 2022
1 parent 0137559 commit 586e9b8
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 33 deletions.
2 changes: 1 addition & 1 deletion fuse_core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pkg_check_modules(libglog REQUIRED)
## fuse_core library
add_library(
${PROJECT_NAME} SHARED
# src/async_motion_model.cpp
src/async_motion_model.cpp
# src/async_publisher.cpp
# src/async_sensor_model.cpp
src/ceres_options.cpp
Expand Down
13 changes: 5 additions & 8 deletions fuse_core/include/fuse_core/async_motion_model.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@
#include <fuse_core/fuse_macros.h>
#include <fuse_core/motion_model.h>
#include <fuse_core/transaction.h>
#include <ros/callback_queue.h>
#include <ros/node_handle.h>
#include <ros/spinner.h>
#include <rclcpp/rclcpp.hpp>

#include <string>

Expand Down Expand Up @@ -85,7 +83,7 @@ namespace fuse_core
class AsyncMotionModel : public MotionModel
{
public:
FUSE_SMART_PTR_ALIASES_ONLY(AsyncMotionModel);
FUSE_SMART_PTR_ALIASES_ONLY(AsyncMotionModel)

/**
* @brief Destructor
Expand Down Expand Up @@ -170,11 +168,10 @@ class AsyncMotionModel : public MotionModel
void stop() override;

protected:
ros::CallbackQueue callback_queue_; //!< The local callback queue used for all subscriptions
std::string name_; //!< The unique name for this motion model instance
ros::NodeHandle node_handle_; //!< A node handle in the global namespace using the local callback queue
ros::NodeHandle private_node_handle_; //!< A node handle in the private namespace using the local callback queue
ros::AsyncSpinner spinner_; //!< A single/multi-threaded spinner assigned to the local callback queue
rclcpp::Node::SharedPtr node_; //!< The node for this motion model
rclcpp::executors::MultiThreadedExecutor executor_; //!< A single/multi-threaded spinner assigned to the local callback queue
rclcpp::node_interfaces::NodeWaitablesInterface::SharedPtr waitables_interface_;

/**
* @brief Constructor
Expand Down
2 changes: 1 addition & 1 deletion fuse_core/include/fuse_core/motion_model.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ namespace fuse_core
class MotionModel
{
public:
FUSE_SMART_PTR_ALIASES_ONLY(MotionModel);
FUSE_SMART_PTR_ALIASES_ONLY(MotionModel)

/**
* @brief Destructor
Expand Down
56 changes: 33 additions & 23 deletions fuse_core/src/async_motion_model.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,10 @@
#include <fuse_core/callback_wrapper.h>
#include <fuse_core/graph.h>
#include <fuse_core/transaction.h>
#include <ros/node_handle.h>

#include <boost/make_shared.hpp>
#include <rclcpp/rclcpp.hpp>

#include <functional>
#include <memory>
#include <utility>
#include <string>

Expand All @@ -50,7 +49,7 @@ namespace fuse_core

AsyncMotionModel::AsyncMotionModel(size_t thread_count) :
name_("uninitialized"),
spinner_(thread_count, &callback_queue_)
executor_(rclcpp::ExecutorOptions(), thread_count)
{
}

Expand All @@ -62,56 +61,67 @@ bool AsyncMotionModel::apply(Transaction& transaction)
// Thus, it is functionally similar to a service callback, and should be a familiar pattern for ROS developers.
// This function blocks until the queryCallback() call completes, thus enforcing that motion models are generated
// in order.
auto callback = boost::make_shared<CallbackWrapper<bool>>(
auto callback = std::make_shared<CallbackWrapper<bool>>(
std::bind(&AsyncMotionModel::applyCallback, this, std::ref(transaction)));
auto result = callback->getFuture();
callback_queue_.addCallback(callback, reinterpret_cast<uint64_t>(this));
auto result = callback->get_future();
waitables_interface_->add_waitable(callback, nullptr);
result.wait();
waitables_interface_->remove_waitable(callback, nullptr);

return result.get();
}

void AsyncMotionModel::graphCallback(Graph::ConstSharedPtr graph)
{
callback_queue_.addCallback(
boost::make_shared<CallbackWrapper<void>>(std::bind(&AsyncMotionModel::onGraphUpdate, this, std::move(graph))),
reinterpret_cast<uint64_t>(this));
auto callback = std::make_shared<CallbackWrapper<void>>(std::bind(&AsyncMotionModel::onGraphUpdate, this, std::move(graph)));
auto result = callback->get_future();
waitables_interface_->add_waitable(callback, nullptr);
result.wait();
waitables_interface_->remove_waitable(callback, nullptr);

// @todo TAM: figure out whether we needed the second argument to addCallback here
//callback_queue_.addCallback(
// boost::make_shared<CallbackWrapper<void>>(std::bind(&AsyncMotionModel::onGraphUpdate, this, std::move(graph))),
// reinterpret_cast<uint64_t>(this));
}

void AsyncMotionModel::initialize(const std::string& name)
{
// Initialize internal state
name_ = name;
node_handle_.setCallbackQueue(&callback_queue_);
private_node_handle_ = ros::NodeHandle("~/" + name_);
private_node_handle_.setCallbackQueue(&callback_queue_);
node_ = rclcpp::Node::make_shared(name);
waitables_interface_ = node_->get_node_waitables_interface();

// Call the derived onInit() function to perform implementation-specific initialization
onInit();

// Start the async spinner to service the local callback queue
spinner_.start();
executor_.add_node(node_);
}

void AsyncMotionModel::start()
{
auto callback = boost::make_shared<CallbackWrapper<void>>(std::bind(&AsyncMotionModel::onStart, this));
auto result = callback->getFuture();
callback_queue_.addCallback(callback, reinterpret_cast<uint64_t>(this));
auto callback = std::make_shared<CallbackWrapper<void>>(std::bind(&AsyncMotionModel::onStart, this));
auto result = callback->get_future();
waitables_interface_->add_waitable(callback, nullptr);
result.wait();
waitables_interface_->remove_waitable(callback, nullptr);
}

void AsyncMotionModel::stop()
{
if (ros::ok())
if (rclcpp::ok())
{
auto callback = boost::make_shared<CallbackWrapper<void>>(std::bind(&AsyncMotionModel::onStop, this));
auto result = callback->getFuture();
callback_queue_.addCallback(callback, reinterpret_cast<uint64_t>(this));
auto callback = std::make_shared<CallbackWrapper<void>>(std::bind(&AsyncMotionModel::onStop, this));
auto result = callback->get_future();
waitables_interface_->add_waitable(callback, nullptr);
result.wait();
waitables_interface_->remove_waitable(callback, nullptr);
}
else
{
spinner_.stop();
executor_.cancel();
executor_.remove_node(node_);

onStop();
}
}
Expand Down

0 comments on commit 586e9b8

Please sign in to comment.