diff --git a/rclpy/CMakeLists.txt b/rclpy/CMakeLists.txt
index 15ce01ab1..948fdf541 100644
--- a/rclpy/CMakeLists.txt
+++ b/rclpy/CMakeLists.txt
@@ -17,6 +17,7 @@ endif()
find_package(ament_cmake REQUIRED)
find_package(ament_cmake_python REQUIRED)
+find_package(Boost REQUIRED)
find_package(lifecycle_msgs REQUIRED)
find_package(rcl REQUIRED)
find_package(rcl_action REQUIRED)
@@ -90,6 +91,9 @@ pybind11_add_module(_rclpy_pybind11 SHARED
src/rclpy/destroyable.cpp
src/rclpy/duration.cpp
src/rclpy/clock_event.cpp
+ src/rclpy/events_executor/events_executor.cpp
+ src/rclpy/events_executor/rcl_support.cpp
+ src/rclpy/events_executor/timers_manager.cpp
src/rclpy/exceptions.cpp
src/rclpy/graph.cpp
src/rclpy/guard_condition.cpp
@@ -121,6 +125,7 @@ target_include_directories(_rclpy_pybind11 PRIVATE
src/rclpy/
)
target_link_libraries(_rclpy_pybind11 PRIVATE
+ Boost::boost
lifecycle_msgs::lifecycle_msgs__rosidl_generator_c
lifecycle_msgs::lifecycle_msgs__rosidl_typesupport_c
rcl::rcl
@@ -182,6 +187,7 @@ if(BUILD_TESTING)
test/test_create_node.py
test/test_create_while_spinning.py
test/test_destruction.py
+ test/test_events_executor.py
test/test_executor.py
test/test_expand_topic_name.py
test/test_guard_condition.py
diff --git a/rclpy/package.xml b/rclpy/package.xml
index a08514c6f..4a2fb0f87 100644
--- a/rclpy/package.xml
+++ b/rclpy/package.xml
@@ -17,6 +17,7 @@
ament_cmake
+ boost
pybind11_vendor
python3-dev
rcpputils
diff --git a/rclpy/rclpy/experimental/__init__.py b/rclpy/rclpy/experimental/__init__.py
new file mode 100644
index 000000000..a8997525f
--- /dev/null
+++ b/rclpy/rclpy/experimental/__init__.py
@@ -0,0 +1,15 @@
+# Copyright 2024 Brad Martin
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from .events_executor import EventsExecutor
diff --git a/rclpy/rclpy/experimental/events_executor.py b/rclpy/rclpy/experimental/events_executor.py
new file mode 100644
index 000000000..98fd7d082
--- /dev/null
+++ b/rclpy/rclpy/experimental/events_executor.py
@@ -0,0 +1,44 @@
+# Copyright 2024 Brad Martin
+# Copyright 2024 Merlin Labs, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import faulthandler
+import typing
+
+import rclpy.executors
+import rclpy.node
+from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy
+
+
+# Try to look like we inherit from the rclpy Executor for type checking purposes without
+# getting any of the code from the base class.
+def EventsExecutor(*, context: rclpy.Context | None = None) -> rclpy.executors.Executor:
+ if context is None:
+ context = rclpy.get_default_context()
+
+ # For debugging purposes, if anything goes wrong in C++ make sure we also get a
+ # Python backtrace dumped with the crash.
+ faulthandler.enable()
+
+ ex = typing.cast(rclpy.executors.Executor, _rclpy.EventsExecutor(context))
+
+ # rclpy.Executor does this too. Note, the context itself is smart enough to check
+ # for bound methods, and check whether the instances they're bound to still exist at
+ # callback time, so we don't have to worry about tearing down this stale callback at
+ # destruction time.
+ # TODO(bmartin427) This should really be done inside of the EventsExecutor
+ # implementation itself, but I'm unable to figure out a pybind11 incantation that
+ # allows me to pass this bound method call from C++.
+ context.on_shutdown(ex.wake)
+
+ return ex
diff --git a/rclpy/src/rclpy/_rclpy_pybind11.cpp b/rclpy/src/rclpy/_rclpy_pybind11.cpp
index 4da96a5d5..f33fdaa2f 100644
--- a/rclpy/src/rclpy/_rclpy_pybind11.cpp
+++ b/rclpy/src/rclpy/_rclpy_pybind11.cpp
@@ -32,6 +32,7 @@
#include "duration.hpp"
#include "clock_event.hpp"
#include "event_handle.hpp"
+#include "events_executor/events_executor.hpp"
#include "exceptions.hpp"
#include "graph.hpp"
#include "guard_condition.hpp"
@@ -247,4 +248,6 @@ PYBIND11_MODULE(_rclpy_pybind11, m) {
rclpy::define_signal_handler_api(m);
rclpy::define_clock_event(m);
rclpy::define_lifecycle_api(m);
+
+ rclpy::events_executor::define_events_executor(m);
}
diff --git a/rclpy/src/rclpy/events_executor/events_executor.cpp b/rclpy/src/rclpy/events_executor/events_executor.cpp
new file mode 100644
index 000000000..8cc964c66
--- /dev/null
+++ b/rclpy/src/rclpy/events_executor/events_executor.cpp
@@ -0,0 +1,878 @@
+// Copyright 2024 Brad Martin
+// Copyright 2024 Merlin Labs, Inc.
+//
+// Based on a similar approach as the iRobot rclcpp EventsExecutor implementation:
+// https://github.com/ros2/rclcpp/blob/7907b2fee0b1381dc21900efd1745e11f5caa670/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp
+// Original copyright for that file is:
+// Copyright 2023 iRobot Corporation.
+//
+// Also borrows some code from the original rclpy Executor implementations:
+// https://github.com/ros2/rclpy/blob/06d78fb28a6d61ede793201ae75474f3e5432b47/rclpy/rclpy/executors.py
+// Original copyright for that file is:
+// Copyright 2017 Open Source Robotics Foundation, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+#include "events_executor/events_executor.hpp"
+
+#include
+#include
+
+#include
+
+#include
+#include
+
+#include
+#include
+#include
+
+namespace pl = std::placeholders;
+namespace py = pybind11;
+
+namespace rclpy {
+namespace events_executor {
+
+EventsExecutor::EventsExecutor(py::object context)
+ : rclpy_context_(context),
+ asyncio_run_(py::module_::import("asyncio").attr("run")),
+ rclpy_task_(py::module_::import("rclpy.task").attr("Task")),
+ signals_(io_context_, SIGINT, SIGTERM),
+ rcl_callback_manager_(io_context_.get_executor()),
+ timers_manager_(io_context_.get_executor(),
+ std::bind(&EventsExecutor::HandleTimerReady, this, pl::_1)) {
+ // rclpy.Executor creates a sigint handling guard condition here. This is necessary
+ // because a sleeping event loop won't notice Ctrl-C unless some other event wakes
+ // it up otherwise.
+ //
+ // Unfortunately it doesn't look like we can either support generic guard conditions
+ // or hook into the existing rclpy signal handling in any other useful way. We'll
+ // just establish our own signal handling directly instead. This unfortunately
+ // bypasses the rclpy.init() options that allow a user to disable certain signal
+ // handlers, but it doesn't look like we can do any better.
+ signals_.async_wait([this](const boost::system::error_code& ec, int) {
+ if (!ec) {
+ py::gil_scoped_acquire gil_acquire;
+ // Don't call context.try_shutdown() here, because that can call back to us to
+ // request a blocking shutdown(), which doesn't make any sense because we have to
+ // be spinning to process the callback that's asked to wait for spinning to stop.
+ // We'll have to call that later outside of any spin loop.
+ // https://github.com/ros2/rclpy/blob/06d78fb28a6d61ede793201ae75474f3e5432b47/rclpy/rclpy/__init__.py#L105-L109
+ sigint_pending_.store(true);
+ io_context_.stop();
+ }
+ });
+}
+
+EventsExecutor::~EventsExecutor() { shutdown(); }
+
+pybind11::object EventsExecutor::create_task(py::object callback, py::args args,
+ const py::kwargs& kwargs) {
+ // Create and return a rclpy.task.Task() object, and schedule it to be called later.
+ using py::literals::operator""_a;
+ py::object task = rclpy_task_(callback, args, kwargs, "executor"_a = py::cast(this));
+ // The Task needs to be owned at least until we invoke it from the callback we post,
+ // however we can't pass a bare py::object because that's going to try to do Python
+ // refcounting while preparing to go into or coming back from the callback, while the
+ // GIL is not held. We'll do manual refcounting on it instead.
+ py::handle cb_task_handle = task;
+ cb_task_handle.inc_ref();
+ boost::asio::post(io_context_,
+ std::bind(&EventsExecutor::IterateTask, this, cb_task_handle));
+ return task;
+}
+
+bool EventsExecutor::shutdown(std::optional timeout) {
+ // NOTE: The rclpy context can invoke this with a lock on the context held. Therefore
+ // we must not try to go access that context during this method or we can deadlock.
+ // https://github.com/ros2/rclpy/blob/06d78fb28a6d61ede793201ae75474f3e5432b47/rclpy/rclpy/context.py#L101-L103
+
+ io_context_.stop();
+
+ // Block until spinning is done, or timeout.
+ std::unique_lock spin_lock(spinning_mutex_, std::defer_lock);
+ if (timeout) {
+ if (!spin_lock.try_lock_for(std::chrono::duration(*timeout))) {
+ return false;
+ }
+ } else {
+ spin_lock.lock();
+ }
+ // Tear down any callbacks we still have registered.
+ for (py::handle node : py::list(nodes_)) {
+ remove_node(node);
+ }
+ UpdateEntitiesFromNodes(true);
+ return true;
+}
+
+bool EventsExecutor::add_node(py::object node) {
+ std::lock_guard lock(nodes_mutex_);
+ if (nodes_.contains(node)) {
+ return false;
+ }
+ nodes_.add(node);
+ // Caution, the Node executor setter method calls executor.add_node() again making
+ // this reentrant.
+ node.attr("executor") = py::cast(this);
+ wake();
+ return true;
+}
+
+void EventsExecutor::remove_node(py::handle node) {
+ std::lock_guard lock(nodes_mutex_);
+ if (!nodes_.contains(node)) {
+ return;
+ }
+ // Why does pybind11 provide a C++ method for add() but not discard() or remove()?
+ nodes_.attr("remove")(node);
+ // Not sure why rclpy doesn't change the node.executor at this point
+ wake();
+}
+
+void EventsExecutor::wake() {
+ if (!wake_pending_.exchange(true)) {
+ // Update tracked entities.
+ boost::asio::post(io_context_, [this]() {
+ py::gil_scoped_acquire gil_acquire;
+ UpdateEntitiesFromNodes(!py::cast(rclpy_context_.attr("ok")()));
+ });
+ }
+}
+
+// NOTE: The timeouts on the below two methods are always realtime even if we're running
+// in debug time. This is true of other executors too, because debug time is always
+// associated with a specific node and more than one node may be connected to an
+// executor instance.
+// https://github.com/ros2/rclpy/blob/06d78fb28a6d61ede793201ae75474f3e5432b47/rclpy/rclpy/executors.py#L184-L185
+
+void EventsExecutor::spin(std::optional timeout_sec) {
+ {
+ std::unique_lock spin_lock(spinning_mutex_, std::try_to_lock);
+ if (!spin_lock) {
+ throw std::runtime_error("Attempt to spin an already-spinning Executor");
+ }
+ // Release the GIL while we block. Any callbacks on the io_context that want to
+ // touch Python will need to reacquire it though.
+ py::gil_scoped_release gil_release;
+ // Don't let asio auto stop if there's nothing to do
+ const auto work = boost::asio::make_work_guard(io_context_);
+ if (timeout_sec) {
+ io_context_.run_for(std::chrono::duration_cast(
+ std::chrono::duration(*timeout_sec)));
+ } else {
+ io_context_.run();
+ }
+ io_context_.restart();
+ }
+
+ if (sigint_pending_.exchange(false)) {
+ rclpy_context_.attr("try_shutdown")();
+ }
+}
+
+void EventsExecutor::spin_once(std::optional timeout_sec) {
+ {
+ std::unique_lock spin_lock(spinning_mutex_, std::try_to_lock);
+ if (!spin_lock) {
+ throw std::runtime_error("Attempt to spin an already-spinning Executor");
+ }
+ // Release the GIL while we block. Any callbacks on the io_context that want to
+ // touch Python will need to reacquire it though.
+ py::gil_scoped_release gil_release;
+ // Don't let asio auto stop if there's nothing to do
+ const auto work = boost::asio::make_work_guard(io_context_);
+ if (timeout_sec) {
+ io_context_.run_one_for(std::chrono::duration_cast(
+ std::chrono::duration(*timeout_sec)));
+ } else {
+ io_context_.run_one();
+ }
+ io_context_.restart();
+ }
+
+ if (sigint_pending_.exchange(false)) {
+ rclpy_context_.attr("try_shutdown")();
+ }
+}
+
+void EventsExecutor::spin_until_future_complete(py::handle future,
+ std::optional timeout_sec) {
+ future.attr("add_done_callback")(
+ py::cpp_function([this](py::handle) { io_context_.stop(); }));
+ spin(timeout_sec);
+}
+
+void EventsExecutor::spin_once_until_future_complete(
+ py::handle future, std::optional timeout_sec) {
+ future.attr("add_done_callback")(
+ py::cpp_function([this](py::handle) { io_context_.stop(); }));
+ spin_once(timeout_sec);
+}
+
+void EventsExecutor::UpdateEntitiesFromNodes(bool shutdown) {
+ // Clear pending flag as early as possible, so we error on the side of retriggering
+ // a few harmless updates rather than potentially missing important additions.
+ wake_pending_.store(false);
+
+ // Collect all entities currently associated with our nodes
+ py::set subscriptions;
+ py::set timers;
+ py::set clients;
+ py::set services;
+ py::set waitables;
+ {
+ std::lock_guard lock(nodes_mutex_);
+ if (!shutdown) {
+ for (py::handle node : nodes_) {
+ subscriptions.attr("update")(py::set(node.attr("subscriptions")));
+ timers.attr("update")(py::set(node.attr("timers")));
+ clients.attr("update")(py::set(node.attr("clients")));
+ services.attr("update")(py::set(node.attr("services")));
+ waitables.attr("update")(py::set(node.attr("waitables")));
+
+ // It doesn't seem to be possible to support guard conditions with a
+ // callback-based (as opposed to waitset-based) API. Fortunately we don't
+ // seem to need to.
+ if (!py::set(node.attr("guards")).empty()) {
+ throw std::runtime_error("Guard conditions not supported");
+ }
+ }
+ } else {
+ // Remove all tracked entities and nodes.
+ nodes_.clear();
+ }
+ }
+
+ // Perform updates for added and removed entities
+ UpdateEntitySet(subscriptions_, subscriptions,
+ std::bind(&EventsExecutor::HandleAddedSubscription, this, pl::_1),
+ std::bind(&EventsExecutor::HandleRemovedSubscription, this, pl::_1));
+ UpdateEntitySet(timers_, timers,
+ std::bind(&EventsExecutor::HandleAddedTimer, this, pl::_1),
+ std::bind(&EventsExecutor::HandleRemovedTimer, this, pl::_1));
+ UpdateEntitySet(clients_, clients,
+ std::bind(&EventsExecutor::HandleAddedClient, this, pl::_1),
+ std::bind(&EventsExecutor::HandleRemovedClient, this, pl::_1));
+ UpdateEntitySet(services_, services,
+ std::bind(&EventsExecutor::HandleAddedService, this, pl::_1),
+ std::bind(&EventsExecutor::HandleRemovedService, this, pl::_1));
+ UpdateEntitySet(waitables_, waitables,
+ std::bind(&EventsExecutor::HandleAddedWaitable, this, pl::_1),
+ std::bind(&EventsExecutor::HandleRemovedWaitable, this, pl::_1));
+
+ if (shutdown) {
+ // Stop spinning after everything is torn down.
+ io_context_.stop();
+ }
+}
+
+void EventsExecutor::UpdateEntitySet(
+ py::set& entity_set, const py::set& new_entity_set,
+ std::function added_entity_callback,
+ std::function removed_entity_callback) {
+ py::set added_entities = new_entity_set - entity_set;
+ for (py::handle added_entity : added_entities) {
+ added_entity_callback(added_entity);
+ }
+
+ py::set removed_entities = entity_set - new_entity_set;
+ for (py::handle removed_entity : removed_entities) {
+ removed_entity_callback(removed_entity);
+ }
+
+ entity_set = new_entity_set;
+}
+
+void EventsExecutor::HandleAddedSubscription(py::handle subscription) {
+ py::handle handle = subscription.attr("handle");
+ auto with = std::make_shared(handle);
+ const rcl_subscription_t* rcl_ptr = GetRclSubscription(handle);
+ const auto cb =
+ std::bind(&EventsExecutor::HandleSubscriptionReady, this, subscription, pl::_1);
+ if (RCL_RET_OK != rcl_subscription_set_on_new_message_callback(
+ rcl_ptr, RclEventCallbackTrampoline,
+ rcl_callback_manager_.MakeCallback(rcl_ptr, cb, with))) {
+ throw std::runtime_error(
+ std::string("Failed to set the on new message callback for subscription: ") +
+ rcl_get_error_string().str);
+ }
+}
+
+void EventsExecutor::HandleRemovedSubscription(py::handle subscription) {
+ py::handle handle = subscription.attr("handle");
+ const rcl_subscription_t* rcl_ptr = GetRclSubscription(handle);
+ if (RCL_RET_OK !=
+ rcl_subscription_set_on_new_message_callback(rcl_ptr, nullptr, nullptr)) {
+ throw std::runtime_error(
+ std::string("Failed to clear the on new message callback for subscription: ") +
+ rcl_get_error_string().str);
+ }
+ rcl_callback_manager_.RemoveCallback(rcl_ptr);
+}
+
+void EventsExecutor::HandleSubscriptionReady(py::handle subscription,
+ size_t number_of_events) {
+ py::gil_scoped_acquire gil_acquire;
+
+ // Largely based on rclpy.Executor._take_subscription() and _execute_subcription().
+ // https://github.com/ros2/rclpy/blob/06d78fb28a6d61ede793201ae75474f3e5432b47/rclpy/rclpy/executors.py#L355-L367
+ //
+ // NOTE: Simple object attributes we can count on to be owned by the parent object,
+ // but bound method calls and function return values need to be owned by us.
+ const py::handle handle = subscription.attr("handle");
+ const py::object take_message = handle.attr("take_message");
+ const py::handle msg_type = subscription.attr("msg_type");
+ const py::handle raw = subscription.attr("raw");
+ const int callback_type =
+ py::cast(subscription.attr("_callback_type").attr("value"));
+ const int message_only = py::cast(
+ subscription.attr("CallbackType").attr("MessageOnly").attr("value"));
+ const py::handle callback = subscription.attr("callback");
+
+ // rmw_cyclonedds has a bug which causes number_of_events to be zero in the case where
+ // messages were waiting for us when we registered the callback, and the topic is
+ // using KEEP_ALL history policy. We'll work around that by checking for zero and
+ // just taking messages until we start getting None in that case.
+ // https://github.com/ros2/rmw_cyclonedds/issues/509
+ bool got_none = false;
+ for (size_t i = 0; number_of_events ? i < number_of_events : !got_none; ++i) {
+ py::object msg_info = take_message(msg_type, raw);
+ if (!msg_info.is_none()) {
+ try {
+ if (callback_type == message_only) {
+ callback(py::cast(msg_info)[0]);
+ } else {
+ callback(msg_info);
+ }
+ } catch (const py::error_already_set& e) {
+ HandleCallbackExceptionInNodeEntity(e, subscription, "subscriptions");
+ throw;
+ }
+ } else {
+ got_none = true;
+ }
+ }
+}
+
+void EventsExecutor::HandleAddedTimer(py::handle timer) {
+ timers_manager_.AddTimer(timer);
+}
+
+void EventsExecutor::HandleRemovedTimer(py::handle timer) {
+ timers_manager_.RemoveTimer(timer);
+}
+
+void EventsExecutor::HandleTimerReady(py::handle timer) {
+ py::gil_scoped_acquire gil_acquire;
+
+ try {
+ // Unlike most rclpy objects this doesn't document whether it's a Callable or might
+ // be a Coroutine. Let's hope it's the former. 🤞
+ timer.attr("callback")();
+ } catch (const py::error_already_set& e) {
+ HandleCallbackExceptionInNodeEntity(e, timer, "timers");
+ throw;
+ }
+}
+
+void EventsExecutor::HandleAddedClient(py::handle client) {
+ py::handle handle = client.attr("handle");
+ auto with = std::make_shared(handle);
+ const rcl_client_t* rcl_ptr = GetRclClient(handle);
+ const auto cb = std::bind(&EventsExecutor::HandleClientReady, this, client, pl::_1);
+ if (RCL_RET_OK != rcl_client_set_on_new_response_callback(
+ rcl_ptr, RclEventCallbackTrampoline,
+ rcl_callback_manager_.MakeCallback(rcl_ptr, cb, with))) {
+ throw std::runtime_error(
+ std::string("Failed to set the on new response callback for client: ") +
+ rcl_get_error_string().str);
+ }
+}
+
+void EventsExecutor::HandleRemovedClient(py::handle client) {
+ py::handle handle = client.attr("handle");
+ const rcl_client_t* rcl_ptr = GetRclClient(handle);
+ if (RCL_RET_OK !=
+ rcl_client_set_on_new_response_callback(rcl_ptr, nullptr, nullptr)) {
+ throw std::runtime_error(
+ std::string("Failed to clear the on new response callback for client: ") +
+ rcl_get_error_string().str);
+ }
+ rcl_callback_manager_.RemoveCallback(rcl_ptr);
+}
+
+void EventsExecutor::HandleClientReady(py::handle client, size_t number_of_events) {
+ py::gil_scoped_acquire gil_acquire;
+
+ // Largely based on rclpy.Executor._take_client() and _execute_client().
+ // https://github.com/ros2/rclpy/blob/06d78fb28a6d61ede793201ae75474f3e5432b47/rclpy/rclpy/executors.py#L369-L384
+ const py::handle handle = client.attr("handle");
+ const py::object take_response = handle.attr("take_response");
+ const py::handle srv_type = client.attr("srv_type");
+ const py::handle res_type = srv_type.attr("Response");
+ const py::object get_pending_request = client.attr("get_pending_request");
+
+ for (size_t i = 0; i < number_of_events; ++i) {
+ py::tuple seq_and_response = take_response(res_type);
+ py::handle header = seq_and_response[0];
+ py::handle response = seq_and_response[1];
+ if (!header.is_none()) {
+ py::object sequence = header.attr("request_id").attr("sequence_number");
+ py::object future;
+ try {
+ future = get_pending_request(sequence);
+ } catch (const py::error_already_set& e) {
+ if (e.matches(PyExc_KeyError)) {
+ // The request was cancelled
+ continue;
+ }
+ throw;
+ }
+ future.attr("_set_executor")(py::cast(this));
+ try {
+ future.attr("set_result")(response);
+ } catch (const py::error_already_set& e) {
+ HandleCallbackExceptionInNodeEntity(e, client, "clients");
+ throw;
+ }
+ }
+ }
+}
+
+void EventsExecutor::HandleAddedService(py::handle service) {
+ py::handle handle = service.attr("handle");
+ auto with = std::make_shared(handle);
+ const rcl_service_t* rcl_ptr = GetRclService(handle);
+ const auto cb = std::bind(&EventsExecutor::HandleServiceReady, this, service, pl::_1);
+ if (RCL_RET_OK != rcl_service_set_on_new_request_callback(
+ rcl_ptr, RclEventCallbackTrampoline,
+ rcl_callback_manager_.MakeCallback(rcl_ptr, cb, with))) {
+ throw std::runtime_error(
+ std::string("Failed to set the on new request callback for service: ") +
+ rcl_get_error_string().str);
+ }
+}
+
+void EventsExecutor::HandleRemovedService(py::handle service) {
+ py::handle handle = service.attr("handle");
+ const rcl_service_t* rcl_ptr = GetRclService(handle);
+ if (RCL_RET_OK !=
+ rcl_service_set_on_new_request_callback(rcl_ptr, nullptr, nullptr)) {
+ throw std::runtime_error(
+ std::string("Failed to clear the on new request callback for service: ") +
+ rcl_get_error_string().str);
+ }
+ rcl_callback_manager_.RemoveCallback(rcl_ptr);
+}
+
+void EventsExecutor::HandleServiceReady(py::handle service, size_t number_of_events) {
+ py::gil_scoped_acquire gil_acquire;
+
+ // Largely based on rclpy.Executor._take_service() and _execute_service().
+ // https://github.com/ros2/rclpy/blob/06d78fb28a6d61ede793201ae75474f3e5432b47/rclpy/rclpy/executors.py#L386-L397
+ const py::handle handle = service.attr("handle");
+ const py::object service_take_request = handle.attr("service_take_request");
+ const py::handle srv_type = service.attr("srv_type");
+ const py::handle req_type = srv_type.attr("Request");
+ const py::handle res_type = srv_type.attr("Response");
+ const py::handle callback = service.attr("callback");
+ const py::object send_response = service.attr("send_response");
+
+ for (size_t i = 0; i < number_of_events; ++i) {
+ py::object maybe_request_and_header = service_take_request(req_type);
+ if (!maybe_request_and_header.is_none()) {
+ py::tuple request_and_header = py::cast(maybe_request_and_header);
+ py::handle request = request_and_header[0];
+ py::handle header = request_and_header[1];
+ if (!request.is_none()) {
+ py::object response;
+ try {
+ response = callback(request, res_type());
+ } catch (const py::error_already_set& e) {
+ HandleCallbackExceptionInNodeEntity(e, service, "services");
+ throw;
+ }
+ send_response(response, header);
+ }
+ }
+ }
+}
+
+void EventsExecutor::HandleAddedWaitable(py::handle waitable) {
+ // The Waitable API is too abstract for us to work with directly; it only exposes APIs
+ // for dealing with wait sets, and all of the rcl callback API requires knowing
+ // exactly what kinds of rcl objects you're working with. We'll try to figure out
+ // what kind of stuff is hiding behind the abstraction by having the Waitable add
+ // itself to a wait set, then take stock of what all ended up there. We'll also have
+ // to hope that no Waitable implementations ever change their component entities over
+ // their lifetimes. 😬
+ auto with_waitable = std::make_shared(waitable);
+ const py::object num_entities = waitable.attr("get_num_entities")();
+ if (py::cast(num_entities.attr("num_guard_conditions")) != 0) {
+ throw std::runtime_error("Guard conditions not supported");
+ }
+ const py::object _rclpy = py::module_::import("rclpy.impl.implementation_singleton")
+ .attr("rclpy_implementation");
+ py::object wait_set = _rclpy.attr("WaitSet")(
+ num_entities.attr("num_subscriptions"), 0, num_entities.attr("num_timers"),
+ num_entities.attr("num_clients"), num_entities.attr("num_services"),
+ num_entities.attr("num_events"), rclpy_context_.attr("handle"));
+ auto with_waitset = std::make_shared(wait_set);
+ waitable.attr("add_to_wait_set")(wait_set);
+ rcl_wait_set_t* const rcl_waitset = GetRclWaitSet(wait_set);
+ // We null out each entry in the waitset as we set it up, so that the waitset itself
+ // can be reused when something becomes ready to signal to the Waitable what's ready
+ // and what's not. We also bind with_waitset into each callback we set up, to ensure
+ // that object doesn't get destroyed while any of these callbacks are still
+ // registered.
+ WaitableSubEntities sub_entities;
+ for (size_t i = 0; i < rcl_waitset->size_of_subscriptions; ++i) {
+ const rcl_subscription_t* const rcl_sub = rcl_waitset->subscriptions[i];
+ rcl_waitset->subscriptions[i] = nullptr;
+ sub_entities.subscriptions.push_back(rcl_sub);
+ const auto cb = std::bind(&EventsExecutor::HandleWaitableSubReady, this, waitable,
+ rcl_sub, wait_set, i, with_waitset, pl::_1);
+ if (RCL_RET_OK !=
+ rcl_subscription_set_on_new_message_callback(
+ rcl_sub, RclEventCallbackTrampoline,
+ rcl_callback_manager_.MakeCallback(rcl_sub, cb, with_waitable))) {
+ throw std::runtime_error(
+ std::string(
+ "Failed to set the on new message callback for Waitable subscription: ") +
+ rcl_get_error_string().str);
+ }
+ }
+ for (size_t i = 0; i < rcl_waitset->size_of_timers; ++i) {
+ // Unfortunately we do require a non-const pointer here, while the waitset structure
+ // contains a const pointer.
+ rcl_timer_t* const rcl_timer = const_cast(rcl_waitset->timers[i]);
+ rcl_waitset->timers[i] = nullptr;
+ sub_entities.timers.push_back(rcl_timer);
+ // Since this callback doesn't go through RclCallbackManager which would otherwise
+ // own an instance of `with_waitable` associated with this callback, we'll bind it
+ // directly into the callback instead.
+ const auto cb = std::bind(&EventsExecutor::HandleWaitableTimerReady, this, waitable,
+ rcl_timer, wait_set, i, with_waitable, with_waitset);
+ timers_manager_.rcl_manager().AddTimer(rcl_timer, cb);
+ }
+ for (size_t i = 0; i < rcl_waitset->size_of_clients; ++i) {
+ const rcl_client_t* const rcl_client = rcl_waitset->clients[i];
+ rcl_waitset->clients[i] = nullptr;
+ sub_entities.clients.push_back(rcl_client);
+ const auto cb = std::bind(&EventsExecutor::HandleWaitableClientReady, this,
+ waitable, rcl_client, wait_set, i, with_waitset, pl::_1);
+ if (RCL_RET_OK !=
+ rcl_client_set_on_new_response_callback(
+ rcl_client, RclEventCallbackTrampoline,
+ rcl_callback_manager_.MakeCallback(rcl_client, cb, with_waitable))) {
+ throw std::runtime_error(
+ std::string(
+ "Failed to set the on new response callback for Waitable client: ") +
+ rcl_get_error_string().str);
+ }
+ }
+ for (size_t i = 0; i < rcl_waitset->size_of_services; ++i) {
+ const rcl_service_t* const rcl_service = rcl_waitset->services[i];
+ rcl_waitset->services[i] = nullptr;
+ sub_entities.services.push_back(rcl_service);
+ const auto cb = std::bind(&EventsExecutor::HandleWaitableServiceReady, this,
+ waitable, rcl_service, wait_set, i, with_waitset, pl::_1);
+ if (RCL_RET_OK !=
+ rcl_service_set_on_new_request_callback(
+ rcl_service, RclEventCallbackTrampoline,
+ rcl_callback_manager_.MakeCallback(rcl_service, cb, with_waitable))) {
+ throw std::runtime_error(
+ std::string(
+ "Failed to set the on new request callback for Waitable service: ") +
+ rcl_get_error_string().str);
+ }
+ }
+ for (size_t i = 0; i < rcl_waitset->size_of_events; ++i) {
+ const rcl_event_t* const rcl_event = rcl_waitset->events[i];
+ rcl_waitset->events[i] = nullptr;
+ sub_entities.events.push_back(rcl_event);
+ const auto cb = std::bind(&EventsExecutor::HandleWaitableEventReady, this, waitable,
+ rcl_event, wait_set, i, with_waitset, pl::_1);
+ if (RCL_RET_OK != rcl_event_set_callback(rcl_event, RclEventCallbackTrampoline,
+ rcl_callback_manager_.MakeCallback(
+ rcl_event, cb, with_waitable))) {
+ throw std::runtime_error(
+ std::string("Failed to set the callback for Waitable event: ") +
+ rcl_get_error_string().str);
+ }
+ }
+
+ // Save the set of discovered sub-entities for later use during tear-down since we
+ // can't repeat the wait set trick then, as the RCL context may already be destroyed
+ // at that point.
+ waitable_entities_[waitable] = std::move(sub_entities);
+}
+
+void EventsExecutor::HandleRemovedWaitable(py::handle waitable) {
+ const auto nh = waitable_entities_.extract(waitable);
+ if (!nh) {
+ throw std::runtime_error("Couldn't find sub-entities entry for removed Waitable");
+ }
+ const WaitableSubEntities& sub_entities = nh.mapped();
+ for (const rcl_subscription_t* const rcl_sub : sub_entities.subscriptions) {
+ if (RCL_RET_OK !=
+ rcl_subscription_set_on_new_message_callback(rcl_sub, nullptr, nullptr)) {
+ throw std::runtime_error(std::string("Failed to clear the on new message "
+ "callback for Waitable subscription: ") +
+ rcl_get_error_string().str);
+ }
+ rcl_callback_manager_.RemoveCallback(rcl_sub);
+ }
+ for (rcl_timer_t* const rcl_timer : sub_entities.timers) {
+ timers_manager_.rcl_manager().RemoveTimer(rcl_timer);
+ }
+ for (const rcl_client_t* const rcl_client : sub_entities.clients) {
+ if (RCL_RET_OK !=
+ rcl_client_set_on_new_response_callback(rcl_client, nullptr, nullptr)) {
+ throw std::runtime_error(std::string("Failed to clear the on new response "
+ "callback for Waitable client: ") +
+ rcl_get_error_string().str);
+ }
+ rcl_callback_manager_.RemoveCallback(rcl_client);
+ }
+ for (const rcl_service_t* const rcl_service : sub_entities.services) {
+ if (RCL_RET_OK !=
+ rcl_service_set_on_new_request_callback(rcl_service, nullptr, nullptr)) {
+ throw std::runtime_error(std::string("Failed to clear the on new request "
+ "callback for Waitable service: ") +
+ rcl_get_error_string().str);
+ }
+ rcl_callback_manager_.RemoveCallback(rcl_service);
+ }
+ for (const rcl_event_t* const rcl_event : sub_entities.events) {
+ if (RCL_RET_OK != rcl_event_set_callback(rcl_event, nullptr, nullptr)) {
+ throw std::runtime_error(
+ std::string("Failed to clear the callback for Waitable event: ") +
+ rcl_get_error_string().str);
+ }
+ rcl_callback_manager_.RemoveCallback(rcl_event);
+ }
+}
+
+void EventsExecutor::HandleWaitableSubReady(
+ py::handle waitable, const rcl_subscription_t* rcl_sub, py::handle wait_set,
+ size_t wait_set_sub_index, std::shared_ptr, size_t number_of_events) {
+ py::gil_scoped_acquire gil_acquire;
+
+ // We need to set up the wait set to make it look like our subscription object is
+ // ready, and then poke the Waitable to do what it needs to do from there.
+ rcl_wait_set_t* const rcl_waitset = GetRclWaitSet(wait_set);
+ rcl_waitset->subscriptions[wait_set_sub_index] = rcl_sub;
+ HandleWaitableReady(waitable, wait_set, number_of_events);
+ // Null out the wait set again so that other callbacks can use it on other objects.
+ rcl_waitset->subscriptions[wait_set_sub_index] = nullptr;
+}
+
+void EventsExecutor::HandleWaitableTimerReady(py::handle waitable,
+ const rcl_timer_t* rcl_timer,
+ py::handle wait_set,
+ size_t wait_set_timer_index,
+ std::shared_ptr,
+ std::shared_ptr) {
+ py::gil_scoped_acquire gil_acquire;
+
+ // We need to set up the wait set to make it look like our timer object is ready, and
+ // then poke the Waitable to do what it needs to do from there.
+ rcl_wait_set_t* const rcl_waitset = GetRclWaitSet(wait_set);
+ rcl_waitset->timers[wait_set_timer_index] = rcl_timer;
+ HandleWaitableReady(waitable, wait_set, 1);
+ // Null out the wait set again so that other callbacks can use it on other objects.
+ rcl_waitset->timers[wait_set_timer_index] = nullptr;
+}
+
+void EventsExecutor::HandleWaitableClientReady(py::handle waitable,
+ const rcl_client_t* rcl_client,
+ py::handle wait_set,
+ size_t wait_set_client_index,
+ std::shared_ptr,
+ size_t number_of_events) {
+ py::gil_scoped_acquire gil_acquire;
+
+ // We need to set up the wait set to make it look like our client object is ready, and
+ // then poke the Waitable to do what it needs to do from there.
+ rcl_wait_set_t* const rcl_waitset = GetRclWaitSet(wait_set);
+ rcl_waitset->clients[wait_set_client_index] = rcl_client;
+ HandleWaitableReady(waitable, wait_set, number_of_events);
+ // Null out the wait set again so that other callbacks can use it on other objects.
+ rcl_waitset->clients[wait_set_client_index] = nullptr;
+}
+
+void EventsExecutor::HandleWaitableServiceReady(py::handle waitable,
+ const rcl_service_t* rcl_service,
+ py::handle wait_set,
+ size_t wait_set_service_index,
+ std::shared_ptr,
+ size_t number_of_events) {
+ py::gil_scoped_acquire gil_acquire;
+
+ // We need to set up the wait set to make it look like our service object is ready,
+ // and then poke the Waitable to do what it needs to do from there.
+ rcl_wait_set_t* const rcl_waitset = GetRclWaitSet(wait_set);
+ rcl_waitset->services[wait_set_service_index] = rcl_service;
+ HandleWaitableReady(waitable, wait_set, number_of_events);
+ // Null out the wait set again so that other callbacks can use it on other objects.
+ rcl_waitset->services[wait_set_service_index] = nullptr;
+}
+
+void EventsExecutor::HandleWaitableEventReady(
+ py::handle waitable, const rcl_event_t* rcl_event, py::handle wait_set,
+ size_t wait_set_event_index, std::shared_ptr, size_t number_of_events) {
+ py::gil_scoped_acquire gil_acquire;
+
+ // We need to set up the wait set to make it look like our event object is ready, and
+ // then poke the Waitable to do what it needs to do from there.
+ rcl_wait_set_t* const rcl_waitset = GetRclWaitSet(wait_set);
+ rcl_waitset->events[wait_set_event_index] = rcl_event;
+ HandleWaitableReady(waitable, wait_set, number_of_events);
+ // Null out the wait set again so that other callbacks can use it on other objects.
+ rcl_waitset->events[wait_set_event_index] = nullptr;
+}
+
+void EventsExecutor::HandleWaitableReady(py::handle waitable, py::handle wait_set,
+ size_t number_of_events) {
+ // Largely based on rclpy.Executor._take_waitable()
+ // https://github.com/ros2/rclpy/blob/a19180c238d4d97ed2b58868d8fb7fa3e3b621f2/rclpy/rclpy/executors.py#L447-L454
+ py::object is_ready = waitable.attr("is_ready");
+ py::object take_data = waitable.attr("take_data");
+ py::object execute = waitable.attr("execute");
+ py::object futures = waitable.attr("_futures");
+ for (auto& future : futures) {
+ future.attr("_set_executor")(py::cast(this));
+ }
+ for (size_t i = 0; i < number_of_events; ++i) {
+ // This method can have side effects, so it needs to be called even though it looks
+ // like just an accessor.
+ if (!is_ready(wait_set)) {
+ throw std::runtime_error("Failed to make Waitable ready");
+ }
+ py::object data = take_data();
+ try {
+ // execute() is an async method, we need to use asyncio to run it
+ // TODO(bmartin427) Don't run all of this immediately, blocking everything else
+ asyncio_run_(execute(data));
+ } catch (const py::error_already_set& e) {
+ HandleCallbackExceptionInNodeEntity(e, waitable, "waitables");
+ throw;
+ }
+ }
+}
+
+void EventsExecutor::IterateTask(py::handle task) {
+ py::gil_scoped_acquire gil_acquire;
+ // Calling this won't throw, but it may set the exception property on the task object.
+ task();
+ if (task.attr("done")()) {
+ py::object ex = task.attr("exception")();
+ // Drop reference with GIL held. This doesn't necessarily destroy the underlying
+ // Task, since the `create_task()` caller may have retained a reference to the
+ // returned value.
+ task.dec_ref();
+
+ if (!ex.is_none()) {
+ // It's not clear how to easily turn a Python exception into a C++ one, so let's
+ // just throw it again and let pybind translate it normally.
+ py::dict scope;
+ scope["ex"] = ex;
+ try {
+ py::exec("raise ex", scope);
+ } catch (py::error_already_set& cpp_ex) {
+ // There's no good way to know what node this task came from. If we only have
+ // one node, we can use the logger from that, otherwise we'll have to leave it
+ // undefined.
+ py::object logger = py::none();
+ if (nodes_.size() == 1) {
+ logger = nodes_[0].attr("get_logger")();
+ }
+ HandleCallbackExceptionWithLogger(cpp_ex, logger, "task");
+ throw;
+ }
+ }
+ } else {
+ // Task needs more iteration. Post back to the asio loop again.
+ // TODO(bmartin427) Not sure this is correct; in particular, it's unclear how a task
+ // that needs to wait a while can avoid either blocking or spinning. Revisit when
+ // asyncio support is intentionally added.
+ boost::asio::post(io_context_, std::bind(&EventsExecutor::IterateTask, this, task));
+ }
+}
+
+void EventsExecutor::HandleCallbackExceptionInNodeEntity(
+ const py::error_already_set& exc, py::handle entity,
+ const std::string& node_entity_attr) {
+ // Try to identify the node associated with the entity that threw the exception, so we
+ // can log to it.
+ for (py::handle node : nodes_) {
+ if (py::set(node.attr(node_entity_attr.c_str())).contains(entity)) {
+ HandleCallbackExceptionWithLogger(exc, node.attr("get_logger")(),
+ node_entity_attr);
+ return;
+ }
+ }
+
+ // Failed to find a node
+ HandleCallbackExceptionWithLogger(exc, py::none(), node_entity_attr);
+}
+
+void EventsExecutor::HandleCallbackExceptionWithLogger(const py::error_already_set& exc,
+ py::object logger,
+ const std::string& entity_type) {
+ if (logger.is_none()) {
+ py::object logging = py::module_::import("rclpy.logging");
+ logger = logging.attr("get_logger")("UNKNOWN");
+ }
+
+ // The logger API won't let you call it with two different severities, from what it
+ // considers the same code location. Since it has no visibility into C++, all calls
+ // made from here will be attributed to the python that last called into here.
+ // Instead we will call out to python for logging.
+ py::dict scope;
+ scope["logger"] = logger;
+ scope["node_entity_attr"] = entity_type;
+ scope["exc_value"] = exc.value();
+ scope["exc_trace"] = exc.trace();
+ py::exec(
+ R"(
+import traceback
+logger.fatal(f"Exception in '{node_entity_attr}' callback: {exc_value}")
+logger.warn("Error occurred at:\n" + "".join(traceback.format_tb(exc_trace)))
+)",
+ scope);
+}
+
+// pybind11 module bindings
+
+void define_events_executor(py::object module) {
+ py::class_(module, "EventsExecutor")
+ .def(py::init(), py::arg("context"))
+ .def_property_readonly("context", &EventsExecutor::get_context)
+ .def("create_task", &EventsExecutor::create_task, py::arg("callback"))
+ .def("shutdown", &EventsExecutor::shutdown, py::arg("timeout_sec") = py::none())
+ .def("add_node", &EventsExecutor::add_node, py::arg("node"))
+ .def("remove_node", &EventsExecutor::remove_node, py::arg("node"))
+ .def("wake", &EventsExecutor::wake)
+ .def("spin", [](EventsExecutor& exec) { exec.spin(); })
+ .def("spin_once", &EventsExecutor::spin_once, py::arg("timeout_sec") = py::none())
+ .def("spin_until_future_complete", &EventsExecutor::spin_until_future_complete,
+ py::arg("future"), py::arg("timeout_sec") = py::none())
+ .def("spin_once_until_future_complete",
+ &EventsExecutor::spin_once_until_future_complete, py::arg("future"),
+ py::arg("timeout_sec") = py::none());
+}
+
+} // namespace events_executor
+} // namespace rclpy
diff --git a/rclpy/src/rclpy/events_executor/events_executor.hpp b/rclpy/src/rclpy/events_executor/events_executor.hpp
new file mode 100644
index 000000000..513250f1b
--- /dev/null
+++ b/rclpy/src/rclpy/events_executor/events_executor.hpp
@@ -0,0 +1,186 @@
+// Copyright 2024 Brad Martin
+// Copyright 2024 Merlin Labs, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+#pragma once
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include
+#include
+
+#include
+#include
+#include
+#include
+#include
+
+#include
+
+#include "rcl_support.hpp"
+#include "scoped_with.hpp"
+#include "timers_manager.hpp"
+
+namespace rclpy {
+namespace events_executor {
+
+/// Events executor implementation for rclpy
+///
+/// This executor implementation attempts to replicate the function of the rclcpp
+/// EventsExecutor for the benefit of rclpy applications. It is implemented in C++ to
+/// minimize the overhead of processing the event loop.
+///
+/// We assume all public methods could be invoked from any thread. Callbacks on the
+/// executor loop will be issued on the thread that called one of the spin*() variants
+/// (ignoring any parallelism that might be allowed by the callback group
+/// configuration).
+class EventsExecutor {
+ public:
+ /// @param context the rclpy Context object to operate on
+ explicit EventsExecutor(pybind11::object context);
+
+ ~EventsExecutor();
+
+ // rclpy Executor API methods:
+ pybind11::object get_context() const { return rclpy_context_; }
+ pybind11::object create_task(pybind11::object callback, pybind11::args args,
+ const pybind11::kwargs& kwargs);
+ bool shutdown(std::optional timeout_sec = {});
+ bool add_node(pybind11::object node);
+ void remove_node(pybind11::handle node);
+ void wake();
+ void spin(std::optional timeout_sec = {});
+ void spin_once(std::optional timeout_sec = {});
+ void spin_until_future_complete(pybind11::handle future,
+ std::optional timeout_sec = {});
+ void spin_once_until_future_complete(pybind11::handle future,
+ std::optional timeout_sec = {});
+
+ private:
+ // Structure to hold entities discovered underlying a Waitable object.
+ struct WaitableSubEntities {
+ std::vector subscriptions;
+ std::vector timers; // Can't be const
+ std::vector clients;
+ std::vector services;
+ std::vector events;
+ };
+
+ /// Updates the sets of known entities based on the currently tracked nodes. This is
+ /// not thread safe, so it must be posted to the io_context if the executor is
+ /// currently spinning. Expects the GIL to be held before calling. If @p shutdown is
+ /// true, a purge of all known nodes and entities is forced.
+ void UpdateEntitiesFromNodes(bool shutdown);
+
+ /// Given an existing set of entities and a set with the desired new state, updates
+ /// the existing set and invokes callbacks on each added or removed entity.
+ void UpdateEntitySet(pybind11::set& entity_set, const pybind11::set& new_entity_set,
+ std::function added_entity_callback,
+ std::function removed_entity_callback);
+
+ void HandleAddedSubscription(pybind11::handle);
+ void HandleRemovedSubscription(pybind11::handle);
+ void HandleSubscriptionReady(pybind11::handle, size_t number_of_events);
+
+ void HandleAddedTimer(pybind11::handle);
+ void HandleRemovedTimer(pybind11::handle);
+ void HandleTimerReady(pybind11::handle);
+
+ void HandleAddedClient(pybind11::handle);
+ void HandleRemovedClient(pybind11::handle);
+ void HandleClientReady(pybind11::handle, size_t number_of_events);
+
+ void HandleAddedService(pybind11::handle);
+ void HandleRemovedService(pybind11::handle);
+ void HandleServiceReady(pybind11::handle, size_t number_of_events);
+
+ void HandleAddedWaitable(pybind11::handle);
+ void HandleRemovedWaitable(pybind11::handle);
+ void HandleWaitableSubReady(pybind11::handle waitable, const rcl_subscription_t*,
+ pybind11::handle wait_set, size_t wait_set_sub_index,
+ std::shared_ptr with_waitset,
+ size_t number_of_events);
+ void HandleWaitableTimerReady(pybind11::handle waitable, const rcl_timer_t*,
+ pybind11::handle wait_set, size_t wait_set_timer_index,
+ std::shared_ptr with_waitable,
+ std::shared_ptr with_waitset);
+ void HandleWaitableClientReady(pybind11::handle waitable, const rcl_client_t*,
+ pybind11::handle wait_set,
+ size_t wait_set_client_index,
+ std::shared_ptr with_waitset,
+ size_t number_of_events);
+ void HandleWaitableServiceReady(pybind11::handle waitable, const rcl_service_t*,
+ pybind11::handle wait_set,
+ size_t wait_set_service_index,
+ std::shared_ptr with_waitset,
+ size_t number_of_events);
+ void HandleWaitableEventReady(pybind11::handle waitable, const rcl_event_t*,
+ pybind11::handle wait_set, size_t wait_set_event_index,
+ std::shared_ptr with_waitset,
+ size_t number_of_events);
+ void HandleWaitableReady(pybind11::handle waitable, pybind11::handle wait_set,
+ size_t number_of_events);
+
+ /// Helper for create_task(). @p task needs to have had one reference manually added
+ /// to it. See create_task() implementation for details.
+ void IterateTask(pybind11::handle task);
+
+ void HandleCallbackExceptionInNodeEntity(const pybind11::error_already_set&,
+ pybind11::handle entity,
+ const std::string& node_entity_attr);
+ void HandleCallbackExceptionWithLogger(const pybind11::error_already_set&,
+ pybind11::object logger,
+ const std::string& entity_type);
+
+ const pybind11::object rclpy_context_;
+
+ // Imported python objects we depend on
+ const pybind11::object asyncio_run_;
+ const pybind11::object rclpy_task_;
+
+ boost::asio::io_context io_context_;
+ boost::asio::signal_set signals_;
+
+ std::recursive_mutex nodes_mutex_; ///< Protects the node set
+ pybind11::set nodes_; ///< The set of all nodes we're executing
+ std::atomic wake_pending_{}; ///< An unhandled call to wake() has been made
+ std::timed_mutex spinning_mutex_; ///< Held while a thread is spinning
+ std::atomic sigint_pending_{};
+
+ // Collection of awaitable entities we're servicing
+ pybind11::set subscriptions_;
+ pybind11::set timers_;
+ pybind11::set clients_;
+ pybind11::set services_;
+ pybind11::set waitables_;
+
+ /// Cache for rcl pointers underlying each waitables_ entry, because those are harder
+ /// to retrieve than the other entity types.
+ std::unordered_map
+ waitable_entities_;
+
+ RclCallbackManager rcl_callback_manager_;
+ TimersManager timers_manager_;
+};
+
+void define_events_executor(pybind11::object module);
+
+} // namespace events_executor
+} // namespace rclpy
diff --git a/rclpy/src/rclpy/events_executor/python_hasher.hpp b/rclpy/src/rclpy/events_executor/python_hasher.hpp
new file mode 100644
index 000000000..1ff9f8c2b
--- /dev/null
+++ b/rclpy/src/rclpy/events_executor/python_hasher.hpp
@@ -0,0 +1,30 @@
+// Copyright 2024 Brad Martin
+// Copyright 2024 Merlin Labs, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+#pragma once
+
+#include
+
+namespace rclpy {
+namespace events_executor {
+/// This is intended to be used as the Hash template arg to STL containers using a
+/// pybind11::handle as a Key. This is the same hash that a native Python dict or set
+/// would use given the same key.
+struct PythonHasher {
+ inline ssize_t operator()(const pybind11::handle& handle) const {
+ return pybind11::hash(handle);
+ }
+};
+} // namespace events_executor
+} // namespace rclpy
diff --git a/rclpy/src/rclpy/events_executor/rcl_support.cpp b/rclpy/src/rclpy/events_executor/rcl_support.cpp
new file mode 100644
index 000000000..1a53a9e7e
--- /dev/null
+++ b/rclpy/src/rclpy/events_executor/rcl_support.cpp
@@ -0,0 +1,112 @@
+// Copyright 2024 Brad Martin
+// Copyright 2024 Merlin Labs, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+#include "events_executor/rcl_support.hpp"
+
+#include
+#include
+#include
+
+#include
+
+namespace py = pybind11;
+
+namespace rclpy {
+namespace events_executor {
+
+extern "C" void RclEventCallbackTrampoline(const void* user_data,
+ size_t number_of_events) {
+ const auto cb = reinterpret_cast*>(user_data);
+ (*cb)(number_of_events);
+}
+
+RclCallbackManager::RclCallbackManager(const boost::asio::any_io_executor& executor)
+ : executor_(executor) {}
+
+RclCallbackManager::~RclCallbackManager() {
+ // Should not still have any callbacks registered when we exit, because otherwise RCL
+ // can call pointers that will no longer be valid. We can't throw an exception here,
+ // but we can explode.
+ if (!owned_cbs_.empty()) {
+ py::gil_scoped_acquire gil_acquire;
+ py::print("Destroying callback manager with callbacks remaining");
+ ::abort();
+ }
+}
+
+const void* RclCallbackManager::MakeCallback(const void* key,
+ std::function callback,
+ std::shared_ptr with) {
+ // We don't support replacing an existing callback with a new one, because it gets
+ // tricky making sure we don't delete an old callback while the middleware still holds
+ // a pointer to it.
+ if (owned_cbs_.count(key) != 0) {
+ throw py::key_error("Attempt to replace existing callback");
+ }
+ CbEntry new_entry;
+ new_entry.cb = std::make_unique>(
+ [this, callback, key](size_t number_of_events) {
+ boost::asio::post(executor_, [this, callback, key, number_of_events]() {
+ if (!owned_cbs_.count(key)) {
+ // This callback has been removed, just drop it as the objects it may want
+ // to touch may no longer exist.
+ return;
+ }
+ callback(number_of_events);
+ });
+ });
+ new_entry.with = with;
+ const void* ret = new_entry.cb.get();
+ owned_cbs_[key] = std::move(new_entry);
+ return ret;
+}
+
+void RclCallbackManager::RemoveCallback(const void* key) {
+ if (!owned_cbs_.erase(key)) {
+ throw py::key_error("Attempt to remove nonexistent callback");
+ }
+}
+
+namespace {
+// This helper function is used for retrieving rcl pointers from _rclpy C++ objects.
+// Because _rclpy doesn't install its C++ headers for public use, it's difficult to use
+// the C++ classes directly. But, we can treat them as if they are Python objects using
+// their defined Python API. Unfortunately the python interfaces convert the returned
+// pointer to integers, so recovering that looks a little weird.
+template
+RclT* GetRclPtr(py::handle py_ent_handle) {
+ return reinterpret_cast(py::cast(py_ent_handle.attr("pointer")));
+}
+} // namespace
+
+rcl_subscription_t* GetRclSubscription(py::handle sub_handle) {
+ return GetRclPtr(sub_handle);
+}
+
+rcl_timer_t* GetRclTimer(py::handle timer_handle) {
+ return GetRclPtr(timer_handle);
+}
+
+rcl_client_t* GetRclClient(py::handle cl_handle) {
+ return GetRclPtr(cl_handle);
+}
+
+rcl_service_t* GetRclService(py::handle srv_handle) {
+ return GetRclPtr(srv_handle);
+}
+
+rcl_wait_set_t* GetRclWaitSet(py::handle ws) { return GetRclPtr(ws); }
+
+} // namespace events_executor
+} // namespace rclpy
diff --git a/rclpy/src/rclpy/events_executor/rcl_support.hpp b/rclpy/src/rclpy/events_executor/rcl_support.hpp
new file mode 100644
index 000000000..982461bf1
--- /dev/null
+++ b/rclpy/src/rclpy/events_executor/rcl_support.hpp
@@ -0,0 +1,108 @@
+// Copyright 2024 Brad Martin
+// Copyright 2024 Merlin Labs, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+#pragma once
+
+#include
+#include
+#include
+
+#include
+
+#include
+#include
+#include
+#include
+#include
+
+#include
+
+#include "scoped_with.hpp"
+
+namespace rclpy {
+namespace events_executor {
+
+/// Use this for all RCL event callbacks. Use the return value from
+/// RclCallbackManager::MakeCallback() as the user data arg.
+///
+/// Note that RCL callbacks are invoked in some arbitrary thread originating from the
+/// middleware. Callbacks should process quickly to avoid blocking the middleware; i.e.
+/// all actual work should be posted to an asio loop in another thread.
+extern "C" void RclEventCallbackTrampoline(const void* user_data,
+ size_t number_of_events);
+
+/// Creates and maintains callback wrappers used with the RCL C library.
+class RclCallbackManager {
+ public:
+ /// All user callbacks will be posted on the @p executor given to the constructor.
+ /// These callbacks will be invoked without the Python Global Interpreter Lock held,
+ /// so if they need to access Python at all make sure to acquire that explicitly.
+ explicit RclCallbackManager(const boost::asio::any_io_executor& executor);
+ ~RclCallbackManager();
+
+ /// Creates a callback wrapper to be passed to RCL C functions. @p key should be a
+ /// pointer to the rcl object that will be associated with the callback. @p with
+ /// protects the _rclpy object handle owning the RCL object, for the duration the
+ /// callback is established.
+ const void* MakeCallback(const void* key, std::function callback,
+ std::shared_ptr with);
+
+ /// Discard a previously constructed callback. @p key should be the same value
+ /// provided to MakeCallback(). Caution: ensure that RCL is no longer using a
+ /// callback before invoking this.
+ void RemoveCallback(const void* key);
+
+ private:
+ /// The C RCL interface deals in raw pointers, so someone needs to own the C++
+ /// function objects we'll be calling into. We use unique pointers so the raw pointer
+ /// to the object remains stable while the map is manipulated.
+ struct CbEntry {
+ std::unique_ptr> cb;
+ std::shared_ptr with;
+ };
+
+ boost::asio::any_io_executor executor_;
+
+ /// The map key is the raw pointer to the RCL entity object (subscription, etc)
+ /// associated with the callback.
+ std::unordered_map owned_cbs_;
+};
+
+/// Returns the RCL subscription object pointer from a subscription handle (i.e. the
+/// handle attribute of an rclpy Subscription object, which is a _rclpy C++
+/// Subscription object). Assumes that a ScopedWith has already been entered on the
+/// given handle.
+rcl_subscription_t* GetRclSubscription(pybind11::handle);
+
+/// Returns the RCL timer object pointer from a timer handle (i.e. the handle attribute
+/// of an rclpy Timer object, which is a _rclpy C++ Timer object). Assumes that a
+/// ScopedWith has already been entered on the given handle.
+rcl_timer_t* GetRclTimer(pybind11::handle);
+
+/// Returns the RCL client object pointer from a client handle (i.e. the handle
+/// attribute of an rclpy Client object, which is a _rclpy C++ Client object). Assumes
+/// that a ScopedWith has already been entered on the given handle.
+rcl_client_t* GetRclClient(pybind11::handle);
+
+/// Returns the RCL service object pointer from a service handle (i.e. the handle
+/// attribute of an rclpy Service object, which is a _rclpy C++ Service object).
+/// Assumes that a ScopedWith has already been entered on the given handle.
+rcl_service_t* GetRclService(pybind11::handle);
+
+/// Returns the RCL wait set object pointer from a _rclpy C++ WaitSet object. Assumes
+/// that a ScopedWith has already been entered on the given object.
+rcl_wait_set_t* GetRclWaitSet(pybind11::handle);
+
+} // namespace events_executor
+} // namespace rclpy
diff --git a/rclpy/src/rclpy/events_executor/scoped_with.hpp b/rclpy/src/rclpy/events_executor/scoped_with.hpp
new file mode 100644
index 000000000..53e3e6daa
--- /dev/null
+++ b/rclpy/src/rclpy/events_executor/scoped_with.hpp
@@ -0,0 +1,39 @@
+// Copyright 2024 Brad Martin
+// Copyright 2024 Merlin Labs, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+#pragma once
+
+#include
+
+namespace rclpy {
+namespace events_executor {
+
+/// Enters a python context manager for the scope of this object instance.
+class ScopedWith {
+ public:
+ explicit ScopedWith(pybind11::handle object)
+ : object_(pybind11::cast(object)) {
+ object_.attr("__enter__")();
+ }
+
+ ~ScopedWith() {
+ object_.attr("__exit__")(pybind11::none(), pybind11::none(), pybind11::none());
+ }
+
+ private:
+ pybind11::object object_;
+};
+
+} // namespace events_executor
+} // namespace rclpy
diff --git a/rclpy/src/rclpy/events_executor/timers_manager.cpp b/rclpy/src/rclpy/events_executor/timers_manager.cpp
new file mode 100644
index 000000000..2318754e5
--- /dev/null
+++ b/rclpy/src/rclpy/events_executor/timers_manager.cpp
@@ -0,0 +1,329 @@
+// Copyright 2024 Brad Martin
+// Copyright 2024 Merlin Labs, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+#include "events_executor/timers_manager.hpp"
+
+#include
+#include
+#include
+#include
+#include
+
+#include
+#include
+#include
+#include
+
+#include
+
+#include "rcl_support.hpp"
+
+namespace py = pybind11;
+
+namespace rclpy {
+namespace events_executor {
+
+namespace {
+
+// Implementation note: the original iRobot TimersManager associated with the rclcpp
+// EventsExecutor maintained a heap with all outstanding timers sorted by their next
+// expiration time. Here that approach will be skipped in favor of just looking at
+// every timer every update for the following reasons:
+//
+// * This approach is simpler
+// * In the applications this has been used in so far, no Node types exist that have a
+// large number of timers outstanding at once. Assuming no more than a few timers
+// exist in the whole process, the heap seems like overkill.
+// * Every time a timer ticks or is reset, the heap needs to be resorted anyway.
+// * The rcl timer interface doesn't expose any way to get timer expiration info in
+// absolute terms; you can only find out 'time until next callback'. This means if
+// you are trying to sort the list of timers, the 'value' of each entry in the heap
+// changes depending on when you look at it during the process of sorting.
+//
+// We will however yell a bit if we ever see a large number of timers that disproves
+// this assumption, so we can reassess this decision.
+constexpr size_t WARN_TIMERS_COUNT = 8;
+
+typedef std::function ClockJumpCallbackT;
+typedef std::function TimerResetCallbackT;
+
+extern "C" void RclClockJumpTrampoline(const rcl_time_jump_t* time_jump,
+ bool before_jump, void* user_data) {
+ // rcl calls this both before and after a clock change, and we never want the before
+ // callback, so let's just eliminate that case early.
+ if (before_jump) {
+ return;
+ }
+ auto cb = reinterpret_cast(user_data);
+ (*cb)(time_jump);
+}
+
+extern "C" void RclTimerResetTrampoline(const void* user_data, size_t) {
+ auto cb = reinterpret_cast(user_data);
+ (*cb)();
+}
+
+} // namespace
+
+/// Manages a single clock source, and all timers operating on that source. All methods
+/// (including construction and destruction) are assumed to happen on the thread running
+/// the provided asio executor.
+class RclTimersManager::ClockManager {
+ public:
+ ClockManager(const boost::asio::any_io_executor& executor, rcl_clock_t* clock)
+ : executor_(executor), clock_(clock) {
+ // Need to establish a clock jump callback so we can tell when debug time is
+ // updated.
+ rcl_jump_threshold_t threshold{
+ .on_clock_change = true, .min_forward = 1, .min_backward = -1};
+ // Note, this callback could happen on any thread
+ jump_cb_ = [this](const rcl_time_jump_t* time_jump) {
+ bool on_debug{};
+ switch (time_jump->clock_change) {
+ case RCL_ROS_TIME_NO_CHANGE:
+ case RCL_ROS_TIME_ACTIVATED:
+ on_debug = true;
+ break;
+ case RCL_ROS_TIME_DEACTIVATED:
+ case RCL_SYSTEM_TIME_NO_CHANGE:
+ on_debug = false;
+ break;
+ }
+ boost::asio::post(executor_,
+ std::bind(&ClockManager::HandleJump, this, on_debug));
+ };
+ if (RCL_RET_OK != rcl_clock_add_jump_callback(clock_, threshold,
+ RclClockJumpTrampoline, &jump_cb_)) {
+ throw std::runtime_error(std::string("Failed to set RCL clock jump callback: ") +
+ rcl_get_error_string().str);
+ }
+
+ // This isn't necessary yet but every timer will eventually depend on it. Again,
+ // this could happen on any thread.
+ reset_cb_ = [this]() {
+ boost::asio::post(executor_, std::bind(&ClockManager::UpdateTimers, this));
+ };
+
+ // Initialize which timebase we're on
+ if (clock_->type == RCL_ROS_TIME) {
+ if (RCL_RET_OK != rcl_is_enabled_ros_time_override(clock_, &on_debug_time_)) {
+ throw std::runtime_error(
+ std::string("Failed to get RCL clock override state: ") +
+ rcl_get_error_string().str);
+ }
+ }
+ }
+
+ ~ClockManager() {
+ if (RCL_RET_OK !=
+ rcl_clock_remove_jump_callback(clock_, RclClockJumpTrampoline, &jump_cb_)) {
+ py::gil_scoped_acquire gil_acquire;
+ py::print(std::string("Failed to remove RCL clock jump callback: ") +
+ rcl_get_error_string().str);
+ }
+ while (!timers_.empty()) {
+ RemoveTimer(timers_.begin()->first);
+ }
+ }
+
+ bool empty() const { return timers_.empty(); }
+
+ void AddTimer(rcl_timer_t* timer, std::function ready_callback) {
+ // All timers have the same reset callback
+ if (RCL_RET_OK !=
+ rcl_timer_set_on_reset_callback(timer, RclTimerResetTrampoline, &reset_cb_)) {
+ throw std::runtime_error(std::string("Failed to set timer reset callback: ") +
+ rcl_get_error_string().str);
+ }
+ timers_[timer] = ready_callback;
+ if (timers_.size() == WARN_TIMERS_COUNT) {
+ py::print("Warning, the number of timers associated with this clock is large.");
+ py::print("Management of this number of timers may be inefficient.");
+ }
+ UpdateTimers();
+ }
+
+ void RemoveTimer(rcl_timer_t* timer) {
+ auto it = timers_.find(timer);
+ if (it == timers_.end()) {
+ throw py::key_error("Attempt to remove unmanaged timer");
+ }
+
+ if (RCL_RET_OK != rcl_timer_set_on_reset_callback(timer, nullptr, nullptr)) {
+ throw std::runtime_error(std::string("Failed to clear timer reset callback: ") +
+ rcl_get_error_string().str);
+ }
+ timers_.erase(it);
+ // We could re-evaluate how long we need to block for now that a timer has been
+ // removed; but, there's no real harm in one extra wakeup that then decides it
+ // doesn't need to do anything, and this timer might not even be the next to fire,
+ // so we won't bother.
+ }
+
+ private:
+ void HandleJump(bool on_debug_time) {
+ on_debug_time_ = on_debug_time;
+ UpdateTimers();
+ }
+
+ void UpdateTimers() {
+ // First, evaluate all of our timers and dispatch any that are ready now. While
+ // we're at it, keep track of the earliest next timer callback that is due.
+ std::optional next_ready_ns;
+ for (const auto& timer_cb_pair : timers_) {
+ auto next_call_ns = GetNextCallNanoseconds(timer_cb_pair.first);
+ if (next_call_ns <= 0) {
+ // This just notifies RCL that we're considering the timer triggered, for the
+ // purposes of updating the next trigger time.
+ const auto ret = rcl_timer_call(timer_cb_pair.first);
+ switch (ret) {
+ case RCL_RET_OK:
+ break;
+ case RCL_RET_TIMER_CANCELED:
+ // Someone apparently canceled the timer *after* we just queried the next
+ // call time? Nevermind, then...
+ rcl_reset_error();
+ continue;
+ default:
+ throw std::runtime_error(std::string("Failed to call RCL timer: ") +
+ rcl_get_error_string().str);
+ }
+
+ // Post the user callback to be invoked later once timing-sensitive code is
+ // done.
+ boost::asio::post(executor_, timer_cb_pair.second);
+
+ // Update time until *next* call.
+ next_call_ns = GetNextCallNanoseconds(timer_cb_pair.first);
+ }
+ if (!next_ready_ns || (next_call_ns < *next_ready_ns)) {
+ next_ready_ns = next_call_ns;
+ }
+ }
+
+ // If we're not on debug time, we should schedule another wakeup when we
+ // anticipate the next timer being ready. If we are, we'll just re-check
+ // everything at the next jump callback.
+ if (!on_debug_time_ && next_ready_ns) {
+ // Boost doesn't support nanoseconds by default. Round the conversion up so we
+ // don't wake early and find nothing to do.
+ const int64_t sleep_us = (*next_ready_ns + 999) / 1000;
+ next_update_wait_.expires_from_now(boost::posix_time::microseconds(sleep_us));
+ next_update_wait_.async_wait([this](const boost::system::error_code& ec) {
+ if (!ec) {
+ UpdateTimers();
+ } else if (ec != boost::asio::error::operation_aborted) {
+ throw std::runtime_error("Error waiting for next timer: " + ec.message());
+ }
+ });
+ } else {
+ next_update_wait_.cancel();
+ }
+ }
+
+ /// Returns the number of nanoseconds until the next callback on the given timer is
+ /// due. Value may be negative or zero if callback time has already been reached.
+ /// Returns std::nullopt if the timer is canceled.
+ static std::optional GetNextCallNanoseconds(const rcl_timer_t* rcl_timer) {
+ int64_t time_until_next_call{};
+ const rcl_ret_t ret =
+ rcl_timer_get_time_until_next_call(rcl_timer, &time_until_next_call);
+ switch (ret) {
+ case RCL_RET_OK:
+ return time_until_next_call;
+ case RCL_RET_TIMER_CANCELED:
+ return {};
+ default:
+ throw std::runtime_error(std::string("Failed to fetch timer ready time: ") +
+ rcl_get_error_string().str);
+ }
+ }
+
+ boost::asio::any_io_executor executor_;
+ rcl_clock_t* const clock_;
+ ClockJumpCallbackT jump_cb_;
+ TimerResetCallbackT reset_cb_;
+ bool on_debug_time_{};
+
+ std::unordered_map> timers_;
+ boost::asio::deadline_timer next_update_wait_{executor_};
+};
+
+RclTimersManager::RclTimersManager(const boost::asio::any_io_executor& executor)
+ : executor_(executor) {}
+
+RclTimersManager::~RclTimersManager() {}
+
+namespace {
+rcl_clock_t* GetTimerClock(rcl_timer_t* timer) {
+ rcl_clock_t* clock{};
+ if (RCL_RET_OK != rcl_timer_clock(timer, &clock)) {
+ throw std::runtime_error(std::string("Failed to determine clock for timer: ") +
+ rcl_get_error_string().str);
+ }
+ return clock;
+}
+} // namespace
+
+void RclTimersManager::AddTimer(rcl_timer_t* timer,
+ std::function ready_callback) {
+ // Figure out the clock this timer is using, make sure a manager exists for that
+ // clock, then forward the timer to that clock's manager.
+ rcl_clock_t* clock = GetTimerClock(timer);
+ auto it = clock_managers_.find(clock);
+ if (it == clock_managers_.end()) {
+ std::tie(it, std::ignore) = clock_managers_.insert(
+ std::make_pair(clock, std::make_unique(executor_, clock)));
+ }
+ it->second->AddTimer(timer, ready_callback);
+}
+
+void RclTimersManager::RemoveTimer(rcl_timer_t* timer) {
+ const rcl_clock_t* clock = GetTimerClock(timer);
+ auto it = clock_managers_.find(clock);
+ if (it == clock_managers_.end()) {
+ throw py::key_error("Attempt to remove timer from unmanaged clock");
+ }
+ it->second->RemoveTimer(timer);
+ if (it->second->empty()) {
+ clock_managers_.erase(it);
+ }
+}
+
+TimersManager::TimersManager(const boost::asio::any_io_executor& executor,
+ std::function timer_ready_callback)
+ : rcl_manager_(executor), ready_callback_(timer_ready_callback) {}
+
+TimersManager::~TimersManager() {}
+
+void TimersManager::AddTimer(py::handle timer) {
+ PyRclMapping mapping;
+ py::handle handle = timer.attr("handle");
+ mapping.with = std::make_unique(handle);
+ mapping.rcl_ptr = GetRclTimer(handle);
+ rcl_manager_.AddTimer(mapping.rcl_ptr, std::bind(ready_callback_, timer));
+ timer_mappings_[timer] = std::move(mapping);
+}
+
+void TimersManager::RemoveTimer(py::handle timer) {
+ const auto it = timer_mappings_.find(timer);
+ if (it == timer_mappings_.end()) {
+ throw py::key_error("Attempt to remove unmanaged timer");
+ }
+ rcl_manager_.RemoveTimer(it->second.rcl_ptr);
+ timer_mappings_.erase(it);
+}
+
+} // namespace events_executor
+} // namespace rclpy
diff --git a/rclpy/src/rclpy/events_executor/timers_manager.hpp b/rclpy/src/rclpy/events_executor/timers_manager.hpp
new file mode 100644
index 000000000..314dac9b4
--- /dev/null
+++ b/rclpy/src/rclpy/events_executor/timers_manager.hpp
@@ -0,0 +1,84 @@
+// Copyright 2024 Brad Martin
+// Copyright 2024 Merlin Labs, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+#pragma once
+
+#include
+#include
+#include
+
+#include
+
+#include
+
+#include
+#include
+
+#include "events_executor/python_hasher.hpp"
+#include "events_executor/scoped_with.hpp"
+
+namespace rclpy {
+namespace events_executor {
+
+/// This class manages low-level rcl timers in the system on behalf of EventsExecutor.
+class RclTimersManager {
+ public:
+ explicit RclTimersManager(const boost::asio::any_io_executor&);
+ ~RclTimersManager();
+
+ void AddTimer(rcl_timer_t*, std::function ready_callback);
+ void RemoveTimer(rcl_timer_t*);
+
+ private:
+ boost::asio::any_io_executor executor_;
+
+ class ClockManager;
+ /// Handlers for each distinct clock source in the system.
+ std::unordered_map> clock_managers_;
+};
+
+/// This class manages rclpy.Timer Python objects on behalf of EventsExecutor.
+class TimersManager {
+ public:
+ /// @p timer_ready_callback will be invoked with the timer handle whenever a managed
+ /// timer is ready for servicing.
+ TimersManager(const boost::asio::any_io_executor&,
+ std::function timer_ready_callback);
+ ~TimersManager();
+
+ /// Accessor for underlying rcl timer manager, for management of non-Python timers.
+ RclTimersManager& rcl_manager() { return rcl_manager_; }
+
+ // Both of these methods expect the GIL to be held when they are called.
+ void AddTimer(pybind11::handle timer);
+ void RemoveTimer(pybind11::handle timer);
+
+ private:
+ struct PyRclMapping {
+ /// Marks the corresponding Python object as in-use for as long as we're using the
+ /// rcl pointer derived from it.
+ std::unique_ptr with;
+
+ /// The underlying rcl object
+ rcl_timer_t* rcl_ptr{};
+ };
+
+ RclTimersManager rcl_manager_;
+ const std::function ready_callback_;
+
+ std::unordered_map timer_mappings_;
+};
+
+} // namespace events_executor
+} // namespace rclpy
diff --git a/rclpy/test/test_events_executor.py b/rclpy/test/test_events_executor.py
new file mode 100644
index 000000000..8addb8ba1
--- /dev/null
+++ b/rclpy/test/test_events_executor.py
@@ -0,0 +1,659 @@
+# Copyright 2024 Brad Martin
+# Copyright 2024 Merlin Labs, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import os
+import typing
+import unittest
+
+import action_msgs.msg
+import rclpy.action
+import rclpy.duration
+import rclpy.event_handler
+import rclpy.executors
+import rclpy.experimental
+import rclpy.node
+import rclpy.qos
+import rclpy.time
+import rosgraph_msgs.msg
+import test_msgs.action
+import test_msgs.msg
+import test_msgs.srv
+
+
+def _get_pub_sub_qos(transient_local: bool) -> rclpy.qos.QoSProfile:
+ if not transient_local:
+ return rclpy.qos.QoSProfile(history=rclpy.qos.HistoryPolicy.KEEP_ALL)
+ # For test purposes we deliberately want a TRANSIENT_LOCAL QoS with KEEP_ALL
+ # history.
+ return rclpy.qos.QoSProfile(
+ history=rclpy.qos.HistoryPolicy.KEEP_ALL,
+ durability=rclpy.qos.DurabilityPolicy.TRANSIENT_LOCAL,
+ )
+
+
+class SubTestNode(rclpy.node.Node):
+ """Node to test subscriptions and subscription-related events."""
+
+ def __init__(self, *, transient_local: bool = False):
+ super().__init__("test_sub_node")
+ self._new_pub_future: rclpy.Future[
+ rclpy.event_handler.QoSSubscriptionMatchedInfo
+ ] | None = None
+ self._received_future: rclpy.Future[test_msgs.msg.BasicTypes] | None = None
+ self.create_subscription(
+ test_msgs.msg.BasicTypes,
+ # This node seems to get stale discovery data and then complain about QoS
+ # changes if we reuse the same topic name.
+ "test_topic" + ("_transient_local" if transient_local else ""),
+ self._handle_sub,
+ _get_pub_sub_qos(transient_local),
+ event_callbacks=rclpy.event_handler.SubscriptionEventCallbacks(
+ matched=self._handle_matched_sub
+ ),
+ )
+
+ def expect_pub_info(
+ self,
+ ) -> rclpy.Future[rclpy.event_handler.QoSSubscriptionMatchedInfo]:
+ self._new_pub_future = rclpy.Future()
+ return self._new_pub_future
+
+ def expect_message(self) -> rclpy.Future[test_msgs.msg.BasicTypes]:
+ self._received_future = rclpy.Future()
+ return self._received_future
+
+ def _handle_sub(self, msg: test_msgs.msg.BasicTypes) -> None:
+ if self._received_future is not None:
+ future = self._received_future
+ self._received_future = None
+ future.set_result(msg)
+
+ def _handle_matched_sub(
+ self, info: rclpy.event_handler.QoSSubscriptionMatchedInfo
+ ) -> None:
+ """Handle a new publisher being matched to our subscription."""
+ if self._new_pub_future is not None:
+ self._new_pub_future.set_result(info)
+ self._new_pub_future = None
+
+
+class PubTestNode(rclpy.node.Node):
+ """Node to test publications and publication-related events."""
+
+ def __init__(self, *, transient_local: bool = False):
+ super().__init__("test_pub_node")
+ self._new_sub_future: rclpy.Future[
+ rclpy.event_handler.QoSPublisherMatchedInfo
+ ] | None = None
+ self._pub = self.create_publisher(
+ test_msgs.msg.BasicTypes,
+ "test_topic" + ("_transient_local" if transient_local else ""),
+ _get_pub_sub_qos(transient_local),
+ event_callbacks=rclpy.event_handler.PublisherEventCallbacks(
+ matched=self._handle_matched_pub
+ ),
+ )
+
+ def expect_sub_info(
+ self,
+ ) -> rclpy.Future[rclpy.event_handler.QoSPublisherMatchedInfo]:
+ self._new_sub_future = rclpy.Future()
+ return self._new_sub_future
+
+ def publish(self, value: float) -> None:
+ self._pub.publish(test_msgs.msg.BasicTypes(float32_value=value))
+
+ def _handle_matched_pub(
+ self, info: rclpy.event_handler.QoSPublisherMatchedInfo
+ ) -> None:
+ """Handle a new subscriber being matched to our publication."""
+ if self._new_sub_future is not None:
+ self._new_sub_future.set_result(info)
+ self._new_sub_future = None
+
+
+class ServiceServerTestNode(rclpy.node.Node):
+ """Node to test service server-side operation."""
+
+ def __init__(self):
+ super().__init__("test_service_server_node")
+ self._got_request_future: rclpy.Future[
+ test_msgs.srv.BasicTypes.Request
+ ] | None = None
+ self._pending_response: test_msgs.srv.BasicTypes.Response | None = None
+ self.create_service(
+ test_msgs.srv.BasicTypes, "test_service", self._handle_request
+ )
+
+ def expect_request(
+ self, success: bool, error_msg: str
+ ) -> rclpy.Future[test_msgs.srv.BasicTypes.Request]:
+ """Expect an incoming request.
+
+ The arguments are used to compose the response.
+ """
+ self._got_request_future = rclpy.Future()
+ self._pending_response = test_msgs.srv.BasicTypes.Response(
+ bool_value=success, string_value=error_msg
+ )
+ return self._got_request_future
+
+ def _handle_request(
+ self,
+ req: test_msgs.srv.BasicTypes.Request,
+ res: test_msgs.srv.BasicTypes.Response,
+ ) -> test_msgs.srv.BasicTypes.Response:
+ if self._got_request_future is not None:
+ self._got_request_future.set_result(req)
+ self._got_request_future = None
+ if self._pending_response is not None:
+ res = self._pending_response
+ self._pending_response = None
+ return res
+
+
+class ServiceClientTestNode(rclpy.node.Node):
+ """Node to test service client-side operation."""
+
+ def __init__(self):
+ super().__init__("test_service_client_node")
+ self._client = self.create_client(test_msgs.srv.BasicTypes, "test_service")
+
+ def issue_request(
+ self, value: float
+ ) -> rclpy.Future[test_msgs.srv.BasicTypes.Response]:
+ req = test_msgs.srv.BasicTypes.Request(float32_value=value)
+ return self._client.call_async(req)
+
+
+class TimerTestNode(rclpy.node.Node):
+ """Node to test timer operation."""
+
+ def __init__(
+ self, index: int = 0, parameter_overrides: list[rclpy.Parameter] | None = None
+ ):
+ super().__init__(f"test_timer{index}", parameter_overrides=parameter_overrides)
+ self._timer_events = 0
+ self._tick_future: rclpy.Future[None] | None = None
+ self._timer = self.create_timer(0.1, self._handle_timer)
+
+ @property
+ def timer_events(self) -> int:
+ return self._timer_events
+
+ def expect_tick(self) -> rclpy.Future[None]:
+ self._tick_future = rclpy.Future()
+ return self._tick_future
+
+ def _handle_timer(self) -> None:
+ self._timer_events += 1
+ if self._tick_future is not None:
+ self._tick_future.set_result(None)
+ self._tick_future = None
+
+
+class ClockPublisherNode(rclpy.node.Node):
+ """Node to publish rostime clock updates."""
+
+ def __init__(self):
+ super().__init__("clock_node")
+ self._now = rclpy.time.Time()
+ self._pub = self.create_publisher(
+ rosgraph_msgs.msg.Clock,
+ "/clock",
+ rclpy.qos.QoSProfile(
+ depth=1, reliability=rclpy.qos.ReliabilityPolicy.BEST_EFFORT
+ ),
+ )
+
+ def advance_time(self, millisec: int) -> None:
+ self._now += rclpy.duration.Duration(nanoseconds=millisec * 1000000)
+ self._pub.publish(rosgraph_msgs.msg.Clock(clock=self._now.to_msg()))
+
+
+class ActionServerTestNode(rclpy.node.Node):
+ """Node to test action server-side operation."""
+
+ def __init__(self):
+ super().__init__(
+ "test_action_server_node",
+ parameter_overrides=[rclpy.Parameter("use_sim_time", value=True)],
+ )
+ self._got_goal_future: rclpy.Future[
+ test_msgs.action.Fibonacci.Goal
+ ] | None = None
+ self._srv = rclpy.action.ActionServer(
+ self,
+ test_msgs.action.Fibonacci,
+ "test_action",
+ self._handle_execute,
+ handle_accepted_callback=self._handle_accepted,
+ result_timeout=10,
+ )
+ self._goal_handle: rclpy.action.server.ServerGoalHandle | None = None
+ self._sequence: list[int] = []
+
+ def expect_goal(self) -> rclpy.Future[test_msgs.action.Fibonacci.Goal]:
+ self._goal_handle = None
+ self._got_goal_future = rclpy.Future()
+ return self._got_goal_future
+
+ def _handle_accepted(
+ self, goal_handle: rclpy.action.server.ServerGoalHandle
+ ) -> None:
+ self._goal_handle = goal_handle
+ self._sequence = [0, 1]
+ if self._got_goal_future is not None:
+ self._got_goal_future.set_result(goal_handle.request)
+ self._got_goal_future = None
+ # Wait to finish until instructed by test
+
+ def advance_feedback(self) -> list[int] | None:
+ """Add an entry to the result in progress and sends a feedback message.
+
+ Returns the current sequence in progress if incomplete, or None if the sequence
+ is complete and it's time to complete the operation instead.
+
+ """
+ assert self._goal_handle is not None
+ n = self._goal_handle.request.order + 1
+ if len(self._sequence) < n:
+ self._sequence.append(self._sequence[-2] + self._sequence[-1])
+ if len(self._sequence) >= n:
+ return None
+
+ # FYI normally feedbacks would be sent from the execute handler, but we've tied
+ # it to its own public method for testing
+ fb = test_msgs.action.Fibonacci.Feedback()
+ fb.sequence = self._sequence
+ self._goal_handle.publish_feedback(fb)
+ return self._sequence
+
+ def execute(self) -> rclpy.action.server.ServerGoalHandle:
+ """Completes the action in progress.
+
+ Returns the handle to the goal executed.
+
+ """
+ handle = self._goal_handle
+ self._goal_handle = None
+ assert handle is not None
+ handle.execute()
+ return handle
+
+ def _handle_execute(
+ self, goal_handle: rclpy.action.server.ServerGoalHandle
+ ) -> test_msgs.action.Fibonacci.Result:
+ goal_handle.succeed()
+ result = test_msgs.action.Fibonacci.Result()
+ result.sequence = self._sequence
+ return result
+
+
+class ActionClientTestNode(rclpy.node.Node):
+ """Node to test action client-side operation."""
+
+ def __init__(self):
+ super().__init__("test_action_client_node")
+ self._client = rclpy.action.ActionClient(
+ self, test_msgs.action.Fibonacci, "test_action"
+ )
+ self._feedback_future: rclpy.Future[
+ test_msgs.action.Fibonacci.Feedback
+ ] | None = None
+ self._result_future: rclpy.Future[
+ test_msgs.action.Fibonacci.Result
+ ] | None = None
+
+ def send_goal(
+ self, order: int
+ ) -> rclpy.Future[rclpy.action.client.ClientGoalHandle]:
+ """Send a new goal.
+
+ The future will contain the goal handle when the goal submission response has
+ been received.
+
+ """
+ self._client.wait_for_server()
+ goal_ack_future = self._client.send_goal_async(
+ test_msgs.action.Fibonacci.Goal(order=order),
+ feedback_callback=self._handle_feedback,
+ )
+ goal_ack_future.add_done_callback(self._handle_goal_ack)
+ return goal_ack_future
+
+ def _handle_goal_ack(
+ self, future: rclpy.Future[rclpy.action.client.ClientGoalHandle]
+ ) -> None:
+ handle = future.result()
+ assert handle is not None
+ result_future = handle.get_result_async()
+ result_future.add_done_callback(self._handle_result_response)
+
+ def expect_feedback(self) -> rclpy.Future[test_msgs.action.Fibonacci.Feedback]:
+ self._feedback_future = rclpy.Future()
+ return self._feedback_future
+
+ def _handle_feedback(
+ self,
+ # If this is a private 'Impl' detail, why is rclpy handing this out??
+ fb_msg: test_msgs.action.Fibonacci.Impl.FeedbackMessage,
+ ) -> None:
+ if self._feedback_future is not None:
+ self._feedback_future.set_result(fb_msg.feedback)
+ self._feedback_future = None
+
+ def expect_result(
+ self,
+ ) -> rclpy.Future[test_msgs.action.Fibonacci.Result]:
+ self._result_future = rclpy.Future()
+ return self._result_future
+
+ def _handle_result_response(
+ self, future: rclpy.Future[test_msgs.action.Fibonacci_GetResult_Response]
+ ) -> None:
+ response: test_msgs.action.Fibonacci_GetResult_Response | None = future.result()
+ assert response is not None
+ assert self._result_future is not None
+ result: test_msgs.action.Fibonacci.Result = response.result
+ self._result_future.set_result(result)
+ self._result_future = None
+
+
+class TestEventsExecutor(unittest.TestCase):
+ def setUp(self, *args, **kwargs) -> None:
+ super().__init__(*args, **kwargs)
+ # Prevent nodes under test from discovering other random stuff to talk to
+ os.environ["ROS_AUTOMATIC_DISCOVERY_RANGE"] = "OFF"
+ rclpy.init()
+
+ self.executor = rclpy.experimental.EventsExecutor()
+
+ def tearDown(self) -> None:
+ rclpy.shutdown()
+
+ def _expect_future_done(self, future: rclpy.Future[typing.Any]) -> None:
+ # Use a moderately long timeout with the expectation that we shouldn't often
+ # need the whole duration.
+ self.executor.spin_until_future_complete(future, 1.0)
+ self.assertTrue(future.done())
+
+ def _expect_future_not_done(self, future: rclpy.Future[typing.Any]) -> None:
+ # Use a short timeout to give the future some time to complete if we are going
+ # to fail, but not very long because we'll be waiting the full duration every
+ # time during successful tests. It's ok if the timeout is a bit short and the
+ # failure isn't 100% deterministic.
+ self.executor.spin_until_future_complete(future, 0.2)
+ self.assertFalse(future.done())
+
+ def _spin_for(self, sec: float) -> None:
+ """Spins the executor for the given number of realtime seconds."""
+ # Note that this roundabout approach of waiting on a future that will never
+ # finish with a timeout seems to be the only way with the rclpy.Executor API to
+ # spin for a fixed time.
+ self.executor.spin_until_future_complete(rclpy.Future(), sec)
+
+ def _check_match_event_future(
+ self,
+ future: rclpy.Future[
+ rclpy.event_handler.QoSSubscriptionMatchedInfo
+ | rclpy.event_handler.QoSPublisherMatchedInfo
+ ],
+ total_count: int,
+ current_count: int,
+ ) -> None:
+ # NOTE: fastdds appears to be buggy and reports a change in total_count with
+ # total_count_change staying zero. cyclonedds works as expected. Rather than
+ # have this test be sensitive to which RMW is selected, let's just avoid testing
+ # the change fields altogether.
+ self._expect_future_done(future)
+ info: ( # These two python types are both actually rmw_matched_status_t
+ rclpy.event_handler.QoSSubscriptionMatchedInfo
+ | rclpy.event_handler.QoSPublisherMatchedInfo
+ | None
+ ) = future.result()
+ assert info is not None
+ self.assertEqual(info.total_count, total_count)
+ self.assertEqual(info.current_count, current_count)
+
+ def _check_message_future(
+ self, future: rclpy.Future[test_msgs.msg.BasicTypes], value: float
+ ) -> None:
+ self._expect_future_done(future)
+ msg: test_msgs.msg.BasicTypes | None = future.result()
+ assert msg is not None
+ self.assertAlmostEqual(msg.float32_value, value, places=5)
+
+ def _check_service_request_future(
+ self, future: rclpy.Future[test_msgs.srv.BasicTypes.Request], value: float
+ ) -> None:
+ self._expect_future_done(future)
+ req: test_msgs.srv.BasicTypes.Request | None = future.result()
+ assert req is not None
+ self.assertAlmostEqual(req.float32_value, value, places=5)
+
+ def _check_service_response_future(
+ self,
+ future: rclpy.Future[test_msgs.srv.BasicTypes.Response],
+ success: bool,
+ error_msg: str,
+ ) -> None:
+ self._expect_future_done(future)
+ res: test_msgs.srv.BasicTypes.Response | None = future.result()
+ assert res is not None
+ self.assertEqual(res.bool_value, success)
+ self.assertEqual(res.string_value, error_msg)
+
+ def test_pub_sub(self) -> None:
+ sub_node = SubTestNode()
+ new_pub_future = sub_node.expect_pub_info()
+ received_future = sub_node.expect_message()
+ self.executor.add_node(sub_node)
+
+ # With subscriber node alone, should be no publisher or messages
+ self._expect_future_not_done(new_pub_future)
+ self.assertFalse(received_future.done()) # Already waited a bit
+
+ pub_node = PubTestNode()
+ new_sub_future = pub_node.expect_sub_info()
+ self.executor.add_node(pub_node)
+
+ # Publisher and subscriber should find each other but no messages should be
+ # exchanged yet
+ self._check_match_event_future(new_pub_future, 1, 1)
+ new_pub_future = sub_node.expect_pub_info()
+ self._check_match_event_future(new_sub_future, 1, 1)
+ new_sub_future = pub_node.expect_sub_info()
+ self._expect_future_not_done(received_future)
+
+ # Send messages and make sure they're received.
+ for i in range(300):
+ pub_node.publish(0.1 * i)
+ self._check_message_future(received_future, 0.1 * i)
+ received_future = sub_node.expect_message()
+
+ # Destroy the subscriber node, make sure the publisher is notified
+ self.executor.remove_node(sub_node)
+ sub_node.destroy_node()
+ self._check_match_event_future(new_sub_future, 1, 0)
+
+ # Publish another message to ensure all subscriber callbacks got cleaned up
+ pub_node.publish(4.7)
+ self._expect_future_not_done(new_pub_future)
+ self.assertFalse(received_future.done()) # Already waited a bit
+
+ def test_pub_sub_multi_message(self) -> None:
+ # Creates a transient local publisher and queues multiple messages on it. Then
+ # creates a subscriber and makes sure all sent messages get delivered when it
+ # comes up.
+ pub_node = PubTestNode(transient_local=True)
+ self.executor.add_node(pub_node)
+ for i in range(5):
+ pub_node.publish(0.1 * i)
+
+ sub_node = SubTestNode(transient_local=True)
+ received_future = sub_node.expect_message()
+ received_messages: list[test_msgs.msg.BasicTypes] = []
+
+ def handle_message(future: rclpy.Future[test_msgs.msg.BasicTypes]) -> None:
+ nonlocal received_future
+ msg = future.result()
+ assert msg is not None
+ received_messages.append(msg)
+ received_future = sub_node.expect_message()
+ received_future.add_done_callback(handle_message)
+
+ received_future.add_done_callback(handle_message)
+ self._expect_future_not_done(received_future)
+ self.executor.add_node(sub_node)
+ while len(received_messages) < 5:
+ self._expect_future_done(received_future)
+ self.assertEqual(len(received_messages), 5)
+ for i in range(5):
+ self.assertAlmostEqual(
+ received_messages[i].float32_value, 0.1 * i, places=5
+ )
+ self._expect_future_not_done(received_future)
+
+ pub_node.publish(0.5)
+ self._check_message_future(received_future, 0.5)
+
+ def test_service(self) -> None:
+ server_node = ServiceServerTestNode()
+ got_request_future = server_node.expect_request(True, "test response 0")
+ self.executor.add_node(server_node)
+ self._expect_future_not_done(got_request_future)
+
+ client_node = ServiceClientTestNode()
+ self.executor.add_node(client_node)
+ self._expect_future_not_done(got_request_future)
+ for i in range(300):
+ got_response_future = client_node.issue_request(7.1)
+ self._check_service_request_future(got_request_future, 7.1)
+ got_request_future = server_node.expect_request(
+ True, f"test response {i + 1}"
+ )
+ self._check_service_response_future(
+ got_response_future, True, f"test response {i}"
+ )
+
+ # Destroy server node and retry issuing a request
+ self.executor.remove_node(server_node)
+ server_node.destroy_node()
+ self._expect_future_not_done(got_request_future)
+ got_response_future = client_node.issue_request(5.0)
+ self._expect_future_not_done(got_request_future)
+ self.assertFalse(got_response_future.done()) # Already waited a bit
+
+ def test_timers(self) -> None:
+ realtime_node = TimerTestNode(index=0)
+ rostime_node = TimerTestNode(
+ index=1, parameter_overrides=[rclpy.Parameter("use_sim_time", value=True)]
+ )
+ clock_node = ClockPublisherNode()
+ for node in [realtime_node, rostime_node, clock_node]:
+ self.executor.add_node(node)
+
+ # Wait a bit, and make sure the realtime timer ticks, and the rostime one does
+ # not. Since this is based on wall time, be very flexible on tolerances here.
+ self._spin_for(1.0)
+ realtime_ticks = realtime_node.timer_events
+ self.assertGreater(realtime_ticks, 1)
+ self.assertLess(realtime_ticks, 50)
+ self.assertEqual(rostime_node.timer_events, 0)
+
+ # Manually tick the rostime timer by less than a full interval.
+ rostime_tick_future = rostime_node.expect_tick()
+ for _ in range(99):
+ clock_node.advance_time(1)
+ self._expect_future_not_done(rostime_tick_future)
+ clock_node.advance_time(1)
+ self._expect_future_done(rostime_tick_future)
+ # Now tick by a bunch of full intervals.
+ for _ in range(300):
+ rostime_tick_future = rostime_node.expect_tick()
+ clock_node.advance_time(100)
+ self._expect_future_done(rostime_tick_future)
+
+ # Ensure the realtime timer ticked much less than the rostime one.
+ self.assertLess(realtime_node.timer_events, rostime_node.timer_events)
+
+ def test_action(self) -> None:
+ clock_node = ClockPublisherNode()
+ self.executor.add_node(clock_node)
+
+ server_node = ActionServerTestNode()
+ got_goal_future = server_node.expect_goal()
+ self.executor.add_node(server_node)
+ clock_node.advance_time(0)
+ self._expect_future_not_done(got_goal_future)
+
+ client_node = ActionClientTestNode()
+ self.executor.add_node(client_node)
+ self._expect_future_not_done(got_goal_future)
+ for i in range(300):
+ order = (i % 40) + 1 # Don't want sequence to get too big
+ goal_acknowledged_future = client_node.send_goal(order)
+
+ self._expect_future_done(got_goal_future)
+ self._expect_future_done(goal_acknowledged_future)
+ req: test_msgs.action.Fibonacci.Goal | None = got_goal_future.result()
+ assert req is not None
+ self.assertEqual(req.order, order)
+ result_future = client_node.expect_result()
+
+ while True:
+ got_feedback_future = client_node.expect_feedback()
+ seq = server_node.advance_feedback()
+ if seq is None:
+ break
+ self._expect_future_done(got_feedback_future)
+ feedback = got_feedback_future.result()
+ assert feedback is not None
+ self.assertEqual(len(feedback.sequence), len(seq))
+
+ last_handle = server_node.execute()
+ self._expect_future_done(result_future)
+ self.assertFalse(got_feedback_future.done())
+
+ res: test_msgs.action.Fibonacci.Result | None = result_future.result()
+ assert res is not None
+ self.assertEqual(len(res.sequence), order + 1)
+
+ got_goal_future = server_node.expect_goal()
+
+ # Test completed goal expiration by time
+ self.assertEqual(
+ last_handle.status, action_msgs.msg.GoalStatus.STATUS_SUCCEEDED
+ )
+ clock_node.advance_time(9999)
+ self._spin_for(0.2)
+ self.assertEqual(
+ last_handle.status, action_msgs.msg.GoalStatus.STATUS_SUCCEEDED
+ )
+ clock_node.advance_time(2)
+ self._spin_for(0.2)
+ self.assertEqual(last_handle.status, action_msgs.msg.GoalStatus.STATUS_UNKNOWN)
+
+ # Destroy server node and retry issuing a goal
+ self.executor.remove_node(server_node)
+ server_node.destroy_node()
+ self._expect_future_not_done(got_goal_future)
+ goal_acknowledged_future = client_node.send_goal(5)
+ self._expect_future_not_done(got_goal_future)
+ self.assertFalse(goal_acknowledged_future.done()) # Already waited a bit
+
+
+if __name__ == "__main__":
+ unittest.main()