From 45f7f5f0c65f441174f4b8a3d755098257e3c322 Mon Sep 17 00:00:00 2001 From: Brad Martin Date: Tue, 31 Dec 2024 14:49:31 -0500 Subject: [PATCH] Introduce EventsExecutor implementation (#1389) Signed-off-by: Brad Martin --- rclpy/CMakeLists.txt | 6 + rclpy/package.xml | 1 + rclpy/rclpy/experimental/__init__.py | 15 + rclpy/rclpy/experimental/events_executor.py | 44 + rclpy/src/rclpy/_rclpy_pybind11.cpp | 3 + .../rclpy/events_executor/events_executor.cpp | 878 ++++++++++++++++++ .../rclpy/events_executor/events_executor.hpp | 186 ++++ .../rclpy/events_executor/python_hasher.hpp | 30 + .../src/rclpy/events_executor/rcl_support.cpp | 112 +++ .../src/rclpy/events_executor/rcl_support.hpp | 108 +++ .../src/rclpy/events_executor/scoped_with.hpp | 39 + .../rclpy/events_executor/timers_manager.cpp | 329 +++++++ .../rclpy/events_executor/timers_manager.hpp | 84 ++ rclpy/test/test_events_executor.py | 659 +++++++++++++ 14 files changed, 2494 insertions(+) create mode 100644 rclpy/rclpy/experimental/__init__.py create mode 100644 rclpy/rclpy/experimental/events_executor.py create mode 100644 rclpy/src/rclpy/events_executor/events_executor.cpp create mode 100644 rclpy/src/rclpy/events_executor/events_executor.hpp create mode 100644 rclpy/src/rclpy/events_executor/python_hasher.hpp create mode 100644 rclpy/src/rclpy/events_executor/rcl_support.cpp create mode 100644 rclpy/src/rclpy/events_executor/rcl_support.hpp create mode 100644 rclpy/src/rclpy/events_executor/scoped_with.hpp create mode 100644 rclpy/src/rclpy/events_executor/timers_manager.cpp create mode 100644 rclpy/src/rclpy/events_executor/timers_manager.hpp create mode 100644 rclpy/test/test_events_executor.py diff --git a/rclpy/CMakeLists.txt b/rclpy/CMakeLists.txt index 15ce01ab1..948fdf541 100644 --- a/rclpy/CMakeLists.txt +++ b/rclpy/CMakeLists.txt @@ -17,6 +17,7 @@ endif() find_package(ament_cmake REQUIRED) find_package(ament_cmake_python REQUIRED) +find_package(Boost REQUIRED) find_package(lifecycle_msgs REQUIRED) find_package(rcl REQUIRED) find_package(rcl_action REQUIRED) @@ -90,6 +91,9 @@ pybind11_add_module(_rclpy_pybind11 SHARED src/rclpy/destroyable.cpp src/rclpy/duration.cpp src/rclpy/clock_event.cpp + src/rclpy/events_executor/events_executor.cpp + src/rclpy/events_executor/rcl_support.cpp + src/rclpy/events_executor/timers_manager.cpp src/rclpy/exceptions.cpp src/rclpy/graph.cpp src/rclpy/guard_condition.cpp @@ -121,6 +125,7 @@ target_include_directories(_rclpy_pybind11 PRIVATE src/rclpy/ ) target_link_libraries(_rclpy_pybind11 PRIVATE + Boost::boost lifecycle_msgs::lifecycle_msgs__rosidl_generator_c lifecycle_msgs::lifecycle_msgs__rosidl_typesupport_c rcl::rcl @@ -182,6 +187,7 @@ if(BUILD_TESTING) test/test_create_node.py test/test_create_while_spinning.py test/test_destruction.py + test/test_events_executor.py test/test_executor.py test/test_expand_topic_name.py test/test_guard_condition.py diff --git a/rclpy/package.xml b/rclpy/package.xml index a08514c6f..4a2fb0f87 100644 --- a/rclpy/package.xml +++ b/rclpy/package.xml @@ -17,6 +17,7 @@ ament_cmake + boost pybind11_vendor python3-dev rcpputils diff --git a/rclpy/rclpy/experimental/__init__.py b/rclpy/rclpy/experimental/__init__.py new file mode 100644 index 000000000..a8997525f --- /dev/null +++ b/rclpy/rclpy/experimental/__init__.py @@ -0,0 +1,15 @@ +# Copyright 2024 Brad Martin +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .events_executor import EventsExecutor diff --git a/rclpy/rclpy/experimental/events_executor.py b/rclpy/rclpy/experimental/events_executor.py new file mode 100644 index 000000000..98fd7d082 --- /dev/null +++ b/rclpy/rclpy/experimental/events_executor.py @@ -0,0 +1,44 @@ +# Copyright 2024 Brad Martin +# Copyright 2024 Merlin Labs, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import faulthandler +import typing + +import rclpy.executors +import rclpy.node +from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy + + +# Try to look like we inherit from the rclpy Executor for type checking purposes without +# getting any of the code from the base class. +def EventsExecutor(*, context: rclpy.Context | None = None) -> rclpy.executors.Executor: + if context is None: + context = rclpy.get_default_context() + + # For debugging purposes, if anything goes wrong in C++ make sure we also get a + # Python backtrace dumped with the crash. + faulthandler.enable() + + ex = typing.cast(rclpy.executors.Executor, _rclpy.EventsExecutor(context)) + + # rclpy.Executor does this too. Note, the context itself is smart enough to check + # for bound methods, and check whether the instances they're bound to still exist at + # callback time, so we don't have to worry about tearing down this stale callback at + # destruction time. + # TODO(bmartin427) This should really be done inside of the EventsExecutor + # implementation itself, but I'm unable to figure out a pybind11 incantation that + # allows me to pass this bound method call from C++. + context.on_shutdown(ex.wake) + + return ex diff --git a/rclpy/src/rclpy/_rclpy_pybind11.cpp b/rclpy/src/rclpy/_rclpy_pybind11.cpp index 4da96a5d5..f33fdaa2f 100644 --- a/rclpy/src/rclpy/_rclpy_pybind11.cpp +++ b/rclpy/src/rclpy/_rclpy_pybind11.cpp @@ -32,6 +32,7 @@ #include "duration.hpp" #include "clock_event.hpp" #include "event_handle.hpp" +#include "events_executor/events_executor.hpp" #include "exceptions.hpp" #include "graph.hpp" #include "guard_condition.hpp" @@ -247,4 +248,6 @@ PYBIND11_MODULE(_rclpy_pybind11, m) { rclpy::define_signal_handler_api(m); rclpy::define_clock_event(m); rclpy::define_lifecycle_api(m); + + rclpy::events_executor::define_events_executor(m); } diff --git a/rclpy/src/rclpy/events_executor/events_executor.cpp b/rclpy/src/rclpy/events_executor/events_executor.cpp new file mode 100644 index 000000000..8cc964c66 --- /dev/null +++ b/rclpy/src/rclpy/events_executor/events_executor.cpp @@ -0,0 +1,878 @@ +// Copyright 2024 Brad Martin +// Copyright 2024 Merlin Labs, Inc. +// +// Based on a similar approach as the iRobot rclcpp EventsExecutor implementation: +// https://github.com/ros2/rclcpp/blob/7907b2fee0b1381dc21900efd1745e11f5caa670/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp +// Original copyright for that file is: +// Copyright 2023 iRobot Corporation. +// +// Also borrows some code from the original rclpy Executor implementations: +// https://github.com/ros2/rclpy/blob/06d78fb28a6d61ede793201ae75474f3e5432b47/rclpy/rclpy/executors.py +// Original copyright for that file is: +// Copyright 2017 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include "events_executor/events_executor.hpp" + +#include +#include + +#include + +#include +#include + +#include +#include +#include + +namespace pl = std::placeholders; +namespace py = pybind11; + +namespace rclpy { +namespace events_executor { + +EventsExecutor::EventsExecutor(py::object context) + : rclpy_context_(context), + asyncio_run_(py::module_::import("asyncio").attr("run")), + rclpy_task_(py::module_::import("rclpy.task").attr("Task")), + signals_(io_context_, SIGINT, SIGTERM), + rcl_callback_manager_(io_context_.get_executor()), + timers_manager_(io_context_.get_executor(), + std::bind(&EventsExecutor::HandleTimerReady, this, pl::_1)) { + // rclpy.Executor creates a sigint handling guard condition here. This is necessary + // because a sleeping event loop won't notice Ctrl-C unless some other event wakes + // it up otherwise. + // + // Unfortunately it doesn't look like we can either support generic guard conditions + // or hook into the existing rclpy signal handling in any other useful way. We'll + // just establish our own signal handling directly instead. This unfortunately + // bypasses the rclpy.init() options that allow a user to disable certain signal + // handlers, but it doesn't look like we can do any better. + signals_.async_wait([this](const boost::system::error_code& ec, int) { + if (!ec) { + py::gil_scoped_acquire gil_acquire; + // Don't call context.try_shutdown() here, because that can call back to us to + // request a blocking shutdown(), which doesn't make any sense because we have to + // be spinning to process the callback that's asked to wait for spinning to stop. + // We'll have to call that later outside of any spin loop. + // https://github.com/ros2/rclpy/blob/06d78fb28a6d61ede793201ae75474f3e5432b47/rclpy/rclpy/__init__.py#L105-L109 + sigint_pending_.store(true); + io_context_.stop(); + } + }); +} + +EventsExecutor::~EventsExecutor() { shutdown(); } + +pybind11::object EventsExecutor::create_task(py::object callback, py::args args, + const py::kwargs& kwargs) { + // Create and return a rclpy.task.Task() object, and schedule it to be called later. + using py::literals::operator""_a; + py::object task = rclpy_task_(callback, args, kwargs, "executor"_a = py::cast(this)); + // The Task needs to be owned at least until we invoke it from the callback we post, + // however we can't pass a bare py::object because that's going to try to do Python + // refcounting while preparing to go into or coming back from the callback, while the + // GIL is not held. We'll do manual refcounting on it instead. + py::handle cb_task_handle = task; + cb_task_handle.inc_ref(); + boost::asio::post(io_context_, + std::bind(&EventsExecutor::IterateTask, this, cb_task_handle)); + return task; +} + +bool EventsExecutor::shutdown(std::optional timeout) { + // NOTE: The rclpy context can invoke this with a lock on the context held. Therefore + // we must not try to go access that context during this method or we can deadlock. + // https://github.com/ros2/rclpy/blob/06d78fb28a6d61ede793201ae75474f3e5432b47/rclpy/rclpy/context.py#L101-L103 + + io_context_.stop(); + + // Block until spinning is done, or timeout. + std::unique_lock spin_lock(spinning_mutex_, std::defer_lock); + if (timeout) { + if (!spin_lock.try_lock_for(std::chrono::duration(*timeout))) { + return false; + } + } else { + spin_lock.lock(); + } + // Tear down any callbacks we still have registered. + for (py::handle node : py::list(nodes_)) { + remove_node(node); + } + UpdateEntitiesFromNodes(true); + return true; +} + +bool EventsExecutor::add_node(py::object node) { + std::lock_guard lock(nodes_mutex_); + if (nodes_.contains(node)) { + return false; + } + nodes_.add(node); + // Caution, the Node executor setter method calls executor.add_node() again making + // this reentrant. + node.attr("executor") = py::cast(this); + wake(); + return true; +} + +void EventsExecutor::remove_node(py::handle node) { + std::lock_guard lock(nodes_mutex_); + if (!nodes_.contains(node)) { + return; + } + // Why does pybind11 provide a C++ method for add() but not discard() or remove()? + nodes_.attr("remove")(node); + // Not sure why rclpy doesn't change the node.executor at this point + wake(); +} + +void EventsExecutor::wake() { + if (!wake_pending_.exchange(true)) { + // Update tracked entities. + boost::asio::post(io_context_, [this]() { + py::gil_scoped_acquire gil_acquire; + UpdateEntitiesFromNodes(!py::cast(rclpy_context_.attr("ok")())); + }); + } +} + +// NOTE: The timeouts on the below two methods are always realtime even if we're running +// in debug time. This is true of other executors too, because debug time is always +// associated with a specific node and more than one node may be connected to an +// executor instance. +// https://github.com/ros2/rclpy/blob/06d78fb28a6d61ede793201ae75474f3e5432b47/rclpy/rclpy/executors.py#L184-L185 + +void EventsExecutor::spin(std::optional timeout_sec) { + { + std::unique_lock spin_lock(spinning_mutex_, std::try_to_lock); + if (!spin_lock) { + throw std::runtime_error("Attempt to spin an already-spinning Executor"); + } + // Release the GIL while we block. Any callbacks on the io_context that want to + // touch Python will need to reacquire it though. + py::gil_scoped_release gil_release; + // Don't let asio auto stop if there's nothing to do + const auto work = boost::asio::make_work_guard(io_context_); + if (timeout_sec) { + io_context_.run_for(std::chrono::duration_cast( + std::chrono::duration(*timeout_sec))); + } else { + io_context_.run(); + } + io_context_.restart(); + } + + if (sigint_pending_.exchange(false)) { + rclpy_context_.attr("try_shutdown")(); + } +} + +void EventsExecutor::spin_once(std::optional timeout_sec) { + { + std::unique_lock spin_lock(spinning_mutex_, std::try_to_lock); + if (!spin_lock) { + throw std::runtime_error("Attempt to spin an already-spinning Executor"); + } + // Release the GIL while we block. Any callbacks on the io_context that want to + // touch Python will need to reacquire it though. + py::gil_scoped_release gil_release; + // Don't let asio auto stop if there's nothing to do + const auto work = boost::asio::make_work_guard(io_context_); + if (timeout_sec) { + io_context_.run_one_for(std::chrono::duration_cast( + std::chrono::duration(*timeout_sec))); + } else { + io_context_.run_one(); + } + io_context_.restart(); + } + + if (sigint_pending_.exchange(false)) { + rclpy_context_.attr("try_shutdown")(); + } +} + +void EventsExecutor::spin_until_future_complete(py::handle future, + std::optional timeout_sec) { + future.attr("add_done_callback")( + py::cpp_function([this](py::handle) { io_context_.stop(); })); + spin(timeout_sec); +} + +void EventsExecutor::spin_once_until_future_complete( + py::handle future, std::optional timeout_sec) { + future.attr("add_done_callback")( + py::cpp_function([this](py::handle) { io_context_.stop(); })); + spin_once(timeout_sec); +} + +void EventsExecutor::UpdateEntitiesFromNodes(bool shutdown) { + // Clear pending flag as early as possible, so we error on the side of retriggering + // a few harmless updates rather than potentially missing important additions. + wake_pending_.store(false); + + // Collect all entities currently associated with our nodes + py::set subscriptions; + py::set timers; + py::set clients; + py::set services; + py::set waitables; + { + std::lock_guard lock(nodes_mutex_); + if (!shutdown) { + for (py::handle node : nodes_) { + subscriptions.attr("update")(py::set(node.attr("subscriptions"))); + timers.attr("update")(py::set(node.attr("timers"))); + clients.attr("update")(py::set(node.attr("clients"))); + services.attr("update")(py::set(node.attr("services"))); + waitables.attr("update")(py::set(node.attr("waitables"))); + + // It doesn't seem to be possible to support guard conditions with a + // callback-based (as opposed to waitset-based) API. Fortunately we don't + // seem to need to. + if (!py::set(node.attr("guards")).empty()) { + throw std::runtime_error("Guard conditions not supported"); + } + } + } else { + // Remove all tracked entities and nodes. + nodes_.clear(); + } + } + + // Perform updates for added and removed entities + UpdateEntitySet(subscriptions_, subscriptions, + std::bind(&EventsExecutor::HandleAddedSubscription, this, pl::_1), + std::bind(&EventsExecutor::HandleRemovedSubscription, this, pl::_1)); + UpdateEntitySet(timers_, timers, + std::bind(&EventsExecutor::HandleAddedTimer, this, pl::_1), + std::bind(&EventsExecutor::HandleRemovedTimer, this, pl::_1)); + UpdateEntitySet(clients_, clients, + std::bind(&EventsExecutor::HandleAddedClient, this, pl::_1), + std::bind(&EventsExecutor::HandleRemovedClient, this, pl::_1)); + UpdateEntitySet(services_, services, + std::bind(&EventsExecutor::HandleAddedService, this, pl::_1), + std::bind(&EventsExecutor::HandleRemovedService, this, pl::_1)); + UpdateEntitySet(waitables_, waitables, + std::bind(&EventsExecutor::HandleAddedWaitable, this, pl::_1), + std::bind(&EventsExecutor::HandleRemovedWaitable, this, pl::_1)); + + if (shutdown) { + // Stop spinning after everything is torn down. + io_context_.stop(); + } +} + +void EventsExecutor::UpdateEntitySet( + py::set& entity_set, const py::set& new_entity_set, + std::function added_entity_callback, + std::function removed_entity_callback) { + py::set added_entities = new_entity_set - entity_set; + for (py::handle added_entity : added_entities) { + added_entity_callback(added_entity); + } + + py::set removed_entities = entity_set - new_entity_set; + for (py::handle removed_entity : removed_entities) { + removed_entity_callback(removed_entity); + } + + entity_set = new_entity_set; +} + +void EventsExecutor::HandleAddedSubscription(py::handle subscription) { + py::handle handle = subscription.attr("handle"); + auto with = std::make_shared(handle); + const rcl_subscription_t* rcl_ptr = GetRclSubscription(handle); + const auto cb = + std::bind(&EventsExecutor::HandleSubscriptionReady, this, subscription, pl::_1); + if (RCL_RET_OK != rcl_subscription_set_on_new_message_callback( + rcl_ptr, RclEventCallbackTrampoline, + rcl_callback_manager_.MakeCallback(rcl_ptr, cb, with))) { + throw std::runtime_error( + std::string("Failed to set the on new message callback for subscription: ") + + rcl_get_error_string().str); + } +} + +void EventsExecutor::HandleRemovedSubscription(py::handle subscription) { + py::handle handle = subscription.attr("handle"); + const rcl_subscription_t* rcl_ptr = GetRclSubscription(handle); + if (RCL_RET_OK != + rcl_subscription_set_on_new_message_callback(rcl_ptr, nullptr, nullptr)) { + throw std::runtime_error( + std::string("Failed to clear the on new message callback for subscription: ") + + rcl_get_error_string().str); + } + rcl_callback_manager_.RemoveCallback(rcl_ptr); +} + +void EventsExecutor::HandleSubscriptionReady(py::handle subscription, + size_t number_of_events) { + py::gil_scoped_acquire gil_acquire; + + // Largely based on rclpy.Executor._take_subscription() and _execute_subcription(). + // https://github.com/ros2/rclpy/blob/06d78fb28a6d61ede793201ae75474f3e5432b47/rclpy/rclpy/executors.py#L355-L367 + // + // NOTE: Simple object attributes we can count on to be owned by the parent object, + // but bound method calls and function return values need to be owned by us. + const py::handle handle = subscription.attr("handle"); + const py::object take_message = handle.attr("take_message"); + const py::handle msg_type = subscription.attr("msg_type"); + const py::handle raw = subscription.attr("raw"); + const int callback_type = + py::cast(subscription.attr("_callback_type").attr("value")); + const int message_only = py::cast( + subscription.attr("CallbackType").attr("MessageOnly").attr("value")); + const py::handle callback = subscription.attr("callback"); + + // rmw_cyclonedds has a bug which causes number_of_events to be zero in the case where + // messages were waiting for us when we registered the callback, and the topic is + // using KEEP_ALL history policy. We'll work around that by checking for zero and + // just taking messages until we start getting None in that case. + // https://github.com/ros2/rmw_cyclonedds/issues/509 + bool got_none = false; + for (size_t i = 0; number_of_events ? i < number_of_events : !got_none; ++i) { + py::object msg_info = take_message(msg_type, raw); + if (!msg_info.is_none()) { + try { + if (callback_type == message_only) { + callback(py::cast(msg_info)[0]); + } else { + callback(msg_info); + } + } catch (const py::error_already_set& e) { + HandleCallbackExceptionInNodeEntity(e, subscription, "subscriptions"); + throw; + } + } else { + got_none = true; + } + } +} + +void EventsExecutor::HandleAddedTimer(py::handle timer) { + timers_manager_.AddTimer(timer); +} + +void EventsExecutor::HandleRemovedTimer(py::handle timer) { + timers_manager_.RemoveTimer(timer); +} + +void EventsExecutor::HandleTimerReady(py::handle timer) { + py::gil_scoped_acquire gil_acquire; + + try { + // Unlike most rclpy objects this doesn't document whether it's a Callable or might + // be a Coroutine. Let's hope it's the former. 🤞 + timer.attr("callback")(); + } catch (const py::error_already_set& e) { + HandleCallbackExceptionInNodeEntity(e, timer, "timers"); + throw; + } +} + +void EventsExecutor::HandleAddedClient(py::handle client) { + py::handle handle = client.attr("handle"); + auto with = std::make_shared(handle); + const rcl_client_t* rcl_ptr = GetRclClient(handle); + const auto cb = std::bind(&EventsExecutor::HandleClientReady, this, client, pl::_1); + if (RCL_RET_OK != rcl_client_set_on_new_response_callback( + rcl_ptr, RclEventCallbackTrampoline, + rcl_callback_manager_.MakeCallback(rcl_ptr, cb, with))) { + throw std::runtime_error( + std::string("Failed to set the on new response callback for client: ") + + rcl_get_error_string().str); + } +} + +void EventsExecutor::HandleRemovedClient(py::handle client) { + py::handle handle = client.attr("handle"); + const rcl_client_t* rcl_ptr = GetRclClient(handle); + if (RCL_RET_OK != + rcl_client_set_on_new_response_callback(rcl_ptr, nullptr, nullptr)) { + throw std::runtime_error( + std::string("Failed to clear the on new response callback for client: ") + + rcl_get_error_string().str); + } + rcl_callback_manager_.RemoveCallback(rcl_ptr); +} + +void EventsExecutor::HandleClientReady(py::handle client, size_t number_of_events) { + py::gil_scoped_acquire gil_acquire; + + // Largely based on rclpy.Executor._take_client() and _execute_client(). + // https://github.com/ros2/rclpy/blob/06d78fb28a6d61ede793201ae75474f3e5432b47/rclpy/rclpy/executors.py#L369-L384 + const py::handle handle = client.attr("handle"); + const py::object take_response = handle.attr("take_response"); + const py::handle srv_type = client.attr("srv_type"); + const py::handle res_type = srv_type.attr("Response"); + const py::object get_pending_request = client.attr("get_pending_request"); + + for (size_t i = 0; i < number_of_events; ++i) { + py::tuple seq_and_response = take_response(res_type); + py::handle header = seq_and_response[0]; + py::handle response = seq_and_response[1]; + if (!header.is_none()) { + py::object sequence = header.attr("request_id").attr("sequence_number"); + py::object future; + try { + future = get_pending_request(sequence); + } catch (const py::error_already_set& e) { + if (e.matches(PyExc_KeyError)) { + // The request was cancelled + continue; + } + throw; + } + future.attr("_set_executor")(py::cast(this)); + try { + future.attr("set_result")(response); + } catch (const py::error_already_set& e) { + HandleCallbackExceptionInNodeEntity(e, client, "clients"); + throw; + } + } + } +} + +void EventsExecutor::HandleAddedService(py::handle service) { + py::handle handle = service.attr("handle"); + auto with = std::make_shared(handle); + const rcl_service_t* rcl_ptr = GetRclService(handle); + const auto cb = std::bind(&EventsExecutor::HandleServiceReady, this, service, pl::_1); + if (RCL_RET_OK != rcl_service_set_on_new_request_callback( + rcl_ptr, RclEventCallbackTrampoline, + rcl_callback_manager_.MakeCallback(rcl_ptr, cb, with))) { + throw std::runtime_error( + std::string("Failed to set the on new request callback for service: ") + + rcl_get_error_string().str); + } +} + +void EventsExecutor::HandleRemovedService(py::handle service) { + py::handle handle = service.attr("handle"); + const rcl_service_t* rcl_ptr = GetRclService(handle); + if (RCL_RET_OK != + rcl_service_set_on_new_request_callback(rcl_ptr, nullptr, nullptr)) { + throw std::runtime_error( + std::string("Failed to clear the on new request callback for service: ") + + rcl_get_error_string().str); + } + rcl_callback_manager_.RemoveCallback(rcl_ptr); +} + +void EventsExecutor::HandleServiceReady(py::handle service, size_t number_of_events) { + py::gil_scoped_acquire gil_acquire; + + // Largely based on rclpy.Executor._take_service() and _execute_service(). + // https://github.com/ros2/rclpy/blob/06d78fb28a6d61ede793201ae75474f3e5432b47/rclpy/rclpy/executors.py#L386-L397 + const py::handle handle = service.attr("handle"); + const py::object service_take_request = handle.attr("service_take_request"); + const py::handle srv_type = service.attr("srv_type"); + const py::handle req_type = srv_type.attr("Request"); + const py::handle res_type = srv_type.attr("Response"); + const py::handle callback = service.attr("callback"); + const py::object send_response = service.attr("send_response"); + + for (size_t i = 0; i < number_of_events; ++i) { + py::object maybe_request_and_header = service_take_request(req_type); + if (!maybe_request_and_header.is_none()) { + py::tuple request_and_header = py::cast(maybe_request_and_header); + py::handle request = request_and_header[0]; + py::handle header = request_and_header[1]; + if (!request.is_none()) { + py::object response; + try { + response = callback(request, res_type()); + } catch (const py::error_already_set& e) { + HandleCallbackExceptionInNodeEntity(e, service, "services"); + throw; + } + send_response(response, header); + } + } + } +} + +void EventsExecutor::HandleAddedWaitable(py::handle waitable) { + // The Waitable API is too abstract for us to work with directly; it only exposes APIs + // for dealing with wait sets, and all of the rcl callback API requires knowing + // exactly what kinds of rcl objects you're working with. We'll try to figure out + // what kind of stuff is hiding behind the abstraction by having the Waitable add + // itself to a wait set, then take stock of what all ended up there. We'll also have + // to hope that no Waitable implementations ever change their component entities over + // their lifetimes. 😬 + auto with_waitable = std::make_shared(waitable); + const py::object num_entities = waitable.attr("get_num_entities")(); + if (py::cast(num_entities.attr("num_guard_conditions")) != 0) { + throw std::runtime_error("Guard conditions not supported"); + } + const py::object _rclpy = py::module_::import("rclpy.impl.implementation_singleton") + .attr("rclpy_implementation"); + py::object wait_set = _rclpy.attr("WaitSet")( + num_entities.attr("num_subscriptions"), 0, num_entities.attr("num_timers"), + num_entities.attr("num_clients"), num_entities.attr("num_services"), + num_entities.attr("num_events"), rclpy_context_.attr("handle")); + auto with_waitset = std::make_shared(wait_set); + waitable.attr("add_to_wait_set")(wait_set); + rcl_wait_set_t* const rcl_waitset = GetRclWaitSet(wait_set); + // We null out each entry in the waitset as we set it up, so that the waitset itself + // can be reused when something becomes ready to signal to the Waitable what's ready + // and what's not. We also bind with_waitset into each callback we set up, to ensure + // that object doesn't get destroyed while any of these callbacks are still + // registered. + WaitableSubEntities sub_entities; + for (size_t i = 0; i < rcl_waitset->size_of_subscriptions; ++i) { + const rcl_subscription_t* const rcl_sub = rcl_waitset->subscriptions[i]; + rcl_waitset->subscriptions[i] = nullptr; + sub_entities.subscriptions.push_back(rcl_sub); + const auto cb = std::bind(&EventsExecutor::HandleWaitableSubReady, this, waitable, + rcl_sub, wait_set, i, with_waitset, pl::_1); + if (RCL_RET_OK != + rcl_subscription_set_on_new_message_callback( + rcl_sub, RclEventCallbackTrampoline, + rcl_callback_manager_.MakeCallback(rcl_sub, cb, with_waitable))) { + throw std::runtime_error( + std::string( + "Failed to set the on new message callback for Waitable subscription: ") + + rcl_get_error_string().str); + } + } + for (size_t i = 0; i < rcl_waitset->size_of_timers; ++i) { + // Unfortunately we do require a non-const pointer here, while the waitset structure + // contains a const pointer. + rcl_timer_t* const rcl_timer = const_cast(rcl_waitset->timers[i]); + rcl_waitset->timers[i] = nullptr; + sub_entities.timers.push_back(rcl_timer); + // Since this callback doesn't go through RclCallbackManager which would otherwise + // own an instance of `with_waitable` associated with this callback, we'll bind it + // directly into the callback instead. + const auto cb = std::bind(&EventsExecutor::HandleWaitableTimerReady, this, waitable, + rcl_timer, wait_set, i, with_waitable, with_waitset); + timers_manager_.rcl_manager().AddTimer(rcl_timer, cb); + } + for (size_t i = 0; i < rcl_waitset->size_of_clients; ++i) { + const rcl_client_t* const rcl_client = rcl_waitset->clients[i]; + rcl_waitset->clients[i] = nullptr; + sub_entities.clients.push_back(rcl_client); + const auto cb = std::bind(&EventsExecutor::HandleWaitableClientReady, this, + waitable, rcl_client, wait_set, i, with_waitset, pl::_1); + if (RCL_RET_OK != + rcl_client_set_on_new_response_callback( + rcl_client, RclEventCallbackTrampoline, + rcl_callback_manager_.MakeCallback(rcl_client, cb, with_waitable))) { + throw std::runtime_error( + std::string( + "Failed to set the on new response callback for Waitable client: ") + + rcl_get_error_string().str); + } + } + for (size_t i = 0; i < rcl_waitset->size_of_services; ++i) { + const rcl_service_t* const rcl_service = rcl_waitset->services[i]; + rcl_waitset->services[i] = nullptr; + sub_entities.services.push_back(rcl_service); + const auto cb = std::bind(&EventsExecutor::HandleWaitableServiceReady, this, + waitable, rcl_service, wait_set, i, with_waitset, pl::_1); + if (RCL_RET_OK != + rcl_service_set_on_new_request_callback( + rcl_service, RclEventCallbackTrampoline, + rcl_callback_manager_.MakeCallback(rcl_service, cb, with_waitable))) { + throw std::runtime_error( + std::string( + "Failed to set the on new request callback for Waitable service: ") + + rcl_get_error_string().str); + } + } + for (size_t i = 0; i < rcl_waitset->size_of_events; ++i) { + const rcl_event_t* const rcl_event = rcl_waitset->events[i]; + rcl_waitset->events[i] = nullptr; + sub_entities.events.push_back(rcl_event); + const auto cb = std::bind(&EventsExecutor::HandleWaitableEventReady, this, waitable, + rcl_event, wait_set, i, with_waitset, pl::_1); + if (RCL_RET_OK != rcl_event_set_callback(rcl_event, RclEventCallbackTrampoline, + rcl_callback_manager_.MakeCallback( + rcl_event, cb, with_waitable))) { + throw std::runtime_error( + std::string("Failed to set the callback for Waitable event: ") + + rcl_get_error_string().str); + } + } + + // Save the set of discovered sub-entities for later use during tear-down since we + // can't repeat the wait set trick then, as the RCL context may already be destroyed + // at that point. + waitable_entities_[waitable] = std::move(sub_entities); +} + +void EventsExecutor::HandleRemovedWaitable(py::handle waitable) { + const auto nh = waitable_entities_.extract(waitable); + if (!nh) { + throw std::runtime_error("Couldn't find sub-entities entry for removed Waitable"); + } + const WaitableSubEntities& sub_entities = nh.mapped(); + for (const rcl_subscription_t* const rcl_sub : sub_entities.subscriptions) { + if (RCL_RET_OK != + rcl_subscription_set_on_new_message_callback(rcl_sub, nullptr, nullptr)) { + throw std::runtime_error(std::string("Failed to clear the on new message " + "callback for Waitable subscription: ") + + rcl_get_error_string().str); + } + rcl_callback_manager_.RemoveCallback(rcl_sub); + } + for (rcl_timer_t* const rcl_timer : sub_entities.timers) { + timers_manager_.rcl_manager().RemoveTimer(rcl_timer); + } + for (const rcl_client_t* const rcl_client : sub_entities.clients) { + if (RCL_RET_OK != + rcl_client_set_on_new_response_callback(rcl_client, nullptr, nullptr)) { + throw std::runtime_error(std::string("Failed to clear the on new response " + "callback for Waitable client: ") + + rcl_get_error_string().str); + } + rcl_callback_manager_.RemoveCallback(rcl_client); + } + for (const rcl_service_t* const rcl_service : sub_entities.services) { + if (RCL_RET_OK != + rcl_service_set_on_new_request_callback(rcl_service, nullptr, nullptr)) { + throw std::runtime_error(std::string("Failed to clear the on new request " + "callback for Waitable service: ") + + rcl_get_error_string().str); + } + rcl_callback_manager_.RemoveCallback(rcl_service); + } + for (const rcl_event_t* const rcl_event : sub_entities.events) { + if (RCL_RET_OK != rcl_event_set_callback(rcl_event, nullptr, nullptr)) { + throw std::runtime_error( + std::string("Failed to clear the callback for Waitable event: ") + + rcl_get_error_string().str); + } + rcl_callback_manager_.RemoveCallback(rcl_event); + } +} + +void EventsExecutor::HandleWaitableSubReady( + py::handle waitable, const rcl_subscription_t* rcl_sub, py::handle wait_set, + size_t wait_set_sub_index, std::shared_ptr, size_t number_of_events) { + py::gil_scoped_acquire gil_acquire; + + // We need to set up the wait set to make it look like our subscription object is + // ready, and then poke the Waitable to do what it needs to do from there. + rcl_wait_set_t* const rcl_waitset = GetRclWaitSet(wait_set); + rcl_waitset->subscriptions[wait_set_sub_index] = rcl_sub; + HandleWaitableReady(waitable, wait_set, number_of_events); + // Null out the wait set again so that other callbacks can use it on other objects. + rcl_waitset->subscriptions[wait_set_sub_index] = nullptr; +} + +void EventsExecutor::HandleWaitableTimerReady(py::handle waitable, + const rcl_timer_t* rcl_timer, + py::handle wait_set, + size_t wait_set_timer_index, + std::shared_ptr, + std::shared_ptr) { + py::gil_scoped_acquire gil_acquire; + + // We need to set up the wait set to make it look like our timer object is ready, and + // then poke the Waitable to do what it needs to do from there. + rcl_wait_set_t* const rcl_waitset = GetRclWaitSet(wait_set); + rcl_waitset->timers[wait_set_timer_index] = rcl_timer; + HandleWaitableReady(waitable, wait_set, 1); + // Null out the wait set again so that other callbacks can use it on other objects. + rcl_waitset->timers[wait_set_timer_index] = nullptr; +} + +void EventsExecutor::HandleWaitableClientReady(py::handle waitable, + const rcl_client_t* rcl_client, + py::handle wait_set, + size_t wait_set_client_index, + std::shared_ptr, + size_t number_of_events) { + py::gil_scoped_acquire gil_acquire; + + // We need to set up the wait set to make it look like our client object is ready, and + // then poke the Waitable to do what it needs to do from there. + rcl_wait_set_t* const rcl_waitset = GetRclWaitSet(wait_set); + rcl_waitset->clients[wait_set_client_index] = rcl_client; + HandleWaitableReady(waitable, wait_set, number_of_events); + // Null out the wait set again so that other callbacks can use it on other objects. + rcl_waitset->clients[wait_set_client_index] = nullptr; +} + +void EventsExecutor::HandleWaitableServiceReady(py::handle waitable, + const rcl_service_t* rcl_service, + py::handle wait_set, + size_t wait_set_service_index, + std::shared_ptr, + size_t number_of_events) { + py::gil_scoped_acquire gil_acquire; + + // We need to set up the wait set to make it look like our service object is ready, + // and then poke the Waitable to do what it needs to do from there. + rcl_wait_set_t* const rcl_waitset = GetRclWaitSet(wait_set); + rcl_waitset->services[wait_set_service_index] = rcl_service; + HandleWaitableReady(waitable, wait_set, number_of_events); + // Null out the wait set again so that other callbacks can use it on other objects. + rcl_waitset->services[wait_set_service_index] = nullptr; +} + +void EventsExecutor::HandleWaitableEventReady( + py::handle waitable, const rcl_event_t* rcl_event, py::handle wait_set, + size_t wait_set_event_index, std::shared_ptr, size_t number_of_events) { + py::gil_scoped_acquire gil_acquire; + + // We need to set up the wait set to make it look like our event object is ready, and + // then poke the Waitable to do what it needs to do from there. + rcl_wait_set_t* const rcl_waitset = GetRclWaitSet(wait_set); + rcl_waitset->events[wait_set_event_index] = rcl_event; + HandleWaitableReady(waitable, wait_set, number_of_events); + // Null out the wait set again so that other callbacks can use it on other objects. + rcl_waitset->events[wait_set_event_index] = nullptr; +} + +void EventsExecutor::HandleWaitableReady(py::handle waitable, py::handle wait_set, + size_t number_of_events) { + // Largely based on rclpy.Executor._take_waitable() + // https://github.com/ros2/rclpy/blob/a19180c238d4d97ed2b58868d8fb7fa3e3b621f2/rclpy/rclpy/executors.py#L447-L454 + py::object is_ready = waitable.attr("is_ready"); + py::object take_data = waitable.attr("take_data"); + py::object execute = waitable.attr("execute"); + py::object futures = waitable.attr("_futures"); + for (auto& future : futures) { + future.attr("_set_executor")(py::cast(this)); + } + for (size_t i = 0; i < number_of_events; ++i) { + // This method can have side effects, so it needs to be called even though it looks + // like just an accessor. + if (!is_ready(wait_set)) { + throw std::runtime_error("Failed to make Waitable ready"); + } + py::object data = take_data(); + try { + // execute() is an async method, we need to use asyncio to run it + // TODO(bmartin427) Don't run all of this immediately, blocking everything else + asyncio_run_(execute(data)); + } catch (const py::error_already_set& e) { + HandleCallbackExceptionInNodeEntity(e, waitable, "waitables"); + throw; + } + } +} + +void EventsExecutor::IterateTask(py::handle task) { + py::gil_scoped_acquire gil_acquire; + // Calling this won't throw, but it may set the exception property on the task object. + task(); + if (task.attr("done")()) { + py::object ex = task.attr("exception")(); + // Drop reference with GIL held. This doesn't necessarily destroy the underlying + // Task, since the `create_task()` caller may have retained a reference to the + // returned value. + task.dec_ref(); + + if (!ex.is_none()) { + // It's not clear how to easily turn a Python exception into a C++ one, so let's + // just throw it again and let pybind translate it normally. + py::dict scope; + scope["ex"] = ex; + try { + py::exec("raise ex", scope); + } catch (py::error_already_set& cpp_ex) { + // There's no good way to know what node this task came from. If we only have + // one node, we can use the logger from that, otherwise we'll have to leave it + // undefined. + py::object logger = py::none(); + if (nodes_.size() == 1) { + logger = nodes_[0].attr("get_logger")(); + } + HandleCallbackExceptionWithLogger(cpp_ex, logger, "task"); + throw; + } + } + } else { + // Task needs more iteration. Post back to the asio loop again. + // TODO(bmartin427) Not sure this is correct; in particular, it's unclear how a task + // that needs to wait a while can avoid either blocking or spinning. Revisit when + // asyncio support is intentionally added. + boost::asio::post(io_context_, std::bind(&EventsExecutor::IterateTask, this, task)); + } +} + +void EventsExecutor::HandleCallbackExceptionInNodeEntity( + const py::error_already_set& exc, py::handle entity, + const std::string& node_entity_attr) { + // Try to identify the node associated with the entity that threw the exception, so we + // can log to it. + for (py::handle node : nodes_) { + if (py::set(node.attr(node_entity_attr.c_str())).contains(entity)) { + HandleCallbackExceptionWithLogger(exc, node.attr("get_logger")(), + node_entity_attr); + return; + } + } + + // Failed to find a node + HandleCallbackExceptionWithLogger(exc, py::none(), node_entity_attr); +} + +void EventsExecutor::HandleCallbackExceptionWithLogger(const py::error_already_set& exc, + py::object logger, + const std::string& entity_type) { + if (logger.is_none()) { + py::object logging = py::module_::import("rclpy.logging"); + logger = logging.attr("get_logger")("UNKNOWN"); + } + + // The logger API won't let you call it with two different severities, from what it + // considers the same code location. Since it has no visibility into C++, all calls + // made from here will be attributed to the python that last called into here. + // Instead we will call out to python for logging. + py::dict scope; + scope["logger"] = logger; + scope["node_entity_attr"] = entity_type; + scope["exc_value"] = exc.value(); + scope["exc_trace"] = exc.trace(); + py::exec( + R"( +import traceback +logger.fatal(f"Exception in '{node_entity_attr}' callback: {exc_value}") +logger.warn("Error occurred at:\n" + "".join(traceback.format_tb(exc_trace))) +)", + scope); +} + +// pybind11 module bindings + +void define_events_executor(py::object module) { + py::class_(module, "EventsExecutor") + .def(py::init(), py::arg("context")) + .def_property_readonly("context", &EventsExecutor::get_context) + .def("create_task", &EventsExecutor::create_task, py::arg("callback")) + .def("shutdown", &EventsExecutor::shutdown, py::arg("timeout_sec") = py::none()) + .def("add_node", &EventsExecutor::add_node, py::arg("node")) + .def("remove_node", &EventsExecutor::remove_node, py::arg("node")) + .def("wake", &EventsExecutor::wake) + .def("spin", [](EventsExecutor& exec) { exec.spin(); }) + .def("spin_once", &EventsExecutor::spin_once, py::arg("timeout_sec") = py::none()) + .def("spin_until_future_complete", &EventsExecutor::spin_until_future_complete, + py::arg("future"), py::arg("timeout_sec") = py::none()) + .def("spin_once_until_future_complete", + &EventsExecutor::spin_once_until_future_complete, py::arg("future"), + py::arg("timeout_sec") = py::none()); +} + +} // namespace events_executor +} // namespace rclpy diff --git a/rclpy/src/rclpy/events_executor/events_executor.hpp b/rclpy/src/rclpy/events_executor/events_executor.hpp new file mode 100644 index 000000000..513250f1b --- /dev/null +++ b/rclpy/src/rclpy/events_executor/events_executor.hpp @@ -0,0 +1,186 @@ +// Copyright 2024 Brad Martin +// Copyright 2024 Merlin Labs, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include + +#include + +#include "rcl_support.hpp" +#include "scoped_with.hpp" +#include "timers_manager.hpp" + +namespace rclpy { +namespace events_executor { + +/// Events executor implementation for rclpy +/// +/// This executor implementation attempts to replicate the function of the rclcpp +/// EventsExecutor for the benefit of rclpy applications. It is implemented in C++ to +/// minimize the overhead of processing the event loop. +/// +/// We assume all public methods could be invoked from any thread. Callbacks on the +/// executor loop will be issued on the thread that called one of the spin*() variants +/// (ignoring any parallelism that might be allowed by the callback group +/// configuration). +class EventsExecutor { + public: + /// @param context the rclpy Context object to operate on + explicit EventsExecutor(pybind11::object context); + + ~EventsExecutor(); + + // rclpy Executor API methods: + pybind11::object get_context() const { return rclpy_context_; } + pybind11::object create_task(pybind11::object callback, pybind11::args args, + const pybind11::kwargs& kwargs); + bool shutdown(std::optional timeout_sec = {}); + bool add_node(pybind11::object node); + void remove_node(pybind11::handle node); + void wake(); + void spin(std::optional timeout_sec = {}); + void spin_once(std::optional timeout_sec = {}); + void spin_until_future_complete(pybind11::handle future, + std::optional timeout_sec = {}); + void spin_once_until_future_complete(pybind11::handle future, + std::optional timeout_sec = {}); + + private: + // Structure to hold entities discovered underlying a Waitable object. + struct WaitableSubEntities { + std::vector subscriptions; + std::vector timers; // Can't be const + std::vector clients; + std::vector services; + std::vector events; + }; + + /// Updates the sets of known entities based on the currently tracked nodes. This is + /// not thread safe, so it must be posted to the io_context if the executor is + /// currently spinning. Expects the GIL to be held before calling. If @p shutdown is + /// true, a purge of all known nodes and entities is forced. + void UpdateEntitiesFromNodes(bool shutdown); + + /// Given an existing set of entities and a set with the desired new state, updates + /// the existing set and invokes callbacks on each added or removed entity. + void UpdateEntitySet(pybind11::set& entity_set, const pybind11::set& new_entity_set, + std::function added_entity_callback, + std::function removed_entity_callback); + + void HandleAddedSubscription(pybind11::handle); + void HandleRemovedSubscription(pybind11::handle); + void HandleSubscriptionReady(pybind11::handle, size_t number_of_events); + + void HandleAddedTimer(pybind11::handle); + void HandleRemovedTimer(pybind11::handle); + void HandleTimerReady(pybind11::handle); + + void HandleAddedClient(pybind11::handle); + void HandleRemovedClient(pybind11::handle); + void HandleClientReady(pybind11::handle, size_t number_of_events); + + void HandleAddedService(pybind11::handle); + void HandleRemovedService(pybind11::handle); + void HandleServiceReady(pybind11::handle, size_t number_of_events); + + void HandleAddedWaitable(pybind11::handle); + void HandleRemovedWaitable(pybind11::handle); + void HandleWaitableSubReady(pybind11::handle waitable, const rcl_subscription_t*, + pybind11::handle wait_set, size_t wait_set_sub_index, + std::shared_ptr with_waitset, + size_t number_of_events); + void HandleWaitableTimerReady(pybind11::handle waitable, const rcl_timer_t*, + pybind11::handle wait_set, size_t wait_set_timer_index, + std::shared_ptr with_waitable, + std::shared_ptr with_waitset); + void HandleWaitableClientReady(pybind11::handle waitable, const rcl_client_t*, + pybind11::handle wait_set, + size_t wait_set_client_index, + std::shared_ptr with_waitset, + size_t number_of_events); + void HandleWaitableServiceReady(pybind11::handle waitable, const rcl_service_t*, + pybind11::handle wait_set, + size_t wait_set_service_index, + std::shared_ptr with_waitset, + size_t number_of_events); + void HandleWaitableEventReady(pybind11::handle waitable, const rcl_event_t*, + pybind11::handle wait_set, size_t wait_set_event_index, + std::shared_ptr with_waitset, + size_t number_of_events); + void HandleWaitableReady(pybind11::handle waitable, pybind11::handle wait_set, + size_t number_of_events); + + /// Helper for create_task(). @p task needs to have had one reference manually added + /// to it. See create_task() implementation for details. + void IterateTask(pybind11::handle task); + + void HandleCallbackExceptionInNodeEntity(const pybind11::error_already_set&, + pybind11::handle entity, + const std::string& node_entity_attr); + void HandleCallbackExceptionWithLogger(const pybind11::error_already_set&, + pybind11::object logger, + const std::string& entity_type); + + const pybind11::object rclpy_context_; + + // Imported python objects we depend on + const pybind11::object asyncio_run_; + const pybind11::object rclpy_task_; + + boost::asio::io_context io_context_; + boost::asio::signal_set signals_; + + std::recursive_mutex nodes_mutex_; ///< Protects the node set + pybind11::set nodes_; ///< The set of all nodes we're executing + std::atomic wake_pending_{}; ///< An unhandled call to wake() has been made + std::timed_mutex spinning_mutex_; ///< Held while a thread is spinning + std::atomic sigint_pending_{}; + + // Collection of awaitable entities we're servicing + pybind11::set subscriptions_; + pybind11::set timers_; + pybind11::set clients_; + pybind11::set services_; + pybind11::set waitables_; + + /// Cache for rcl pointers underlying each waitables_ entry, because those are harder + /// to retrieve than the other entity types. + std::unordered_map + waitable_entities_; + + RclCallbackManager rcl_callback_manager_; + TimersManager timers_manager_; +}; + +void define_events_executor(pybind11::object module); + +} // namespace events_executor +} // namespace rclpy diff --git a/rclpy/src/rclpy/events_executor/python_hasher.hpp b/rclpy/src/rclpy/events_executor/python_hasher.hpp new file mode 100644 index 000000000..1ff9f8c2b --- /dev/null +++ b/rclpy/src/rclpy/events_executor/python_hasher.hpp @@ -0,0 +1,30 @@ +// Copyright 2024 Brad Martin +// Copyright 2024 Merlin Labs, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#pragma once + +#include + +namespace rclpy { +namespace events_executor { +/// This is intended to be used as the Hash template arg to STL containers using a +/// pybind11::handle as a Key. This is the same hash that a native Python dict or set +/// would use given the same key. +struct PythonHasher { + inline ssize_t operator()(const pybind11::handle& handle) const { + return pybind11::hash(handle); + } +}; +} // namespace events_executor +} // namespace rclpy diff --git a/rclpy/src/rclpy/events_executor/rcl_support.cpp b/rclpy/src/rclpy/events_executor/rcl_support.cpp new file mode 100644 index 000000000..1a53a9e7e --- /dev/null +++ b/rclpy/src/rclpy/events_executor/rcl_support.cpp @@ -0,0 +1,112 @@ +// Copyright 2024 Brad Martin +// Copyright 2024 Merlin Labs, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include "events_executor/rcl_support.hpp" + +#include +#include +#include + +#include + +namespace py = pybind11; + +namespace rclpy { +namespace events_executor { + +extern "C" void RclEventCallbackTrampoline(const void* user_data, + size_t number_of_events) { + const auto cb = reinterpret_cast*>(user_data); + (*cb)(number_of_events); +} + +RclCallbackManager::RclCallbackManager(const boost::asio::any_io_executor& executor) + : executor_(executor) {} + +RclCallbackManager::~RclCallbackManager() { + // Should not still have any callbacks registered when we exit, because otherwise RCL + // can call pointers that will no longer be valid. We can't throw an exception here, + // but we can explode. + if (!owned_cbs_.empty()) { + py::gil_scoped_acquire gil_acquire; + py::print("Destroying callback manager with callbacks remaining"); + ::abort(); + } +} + +const void* RclCallbackManager::MakeCallback(const void* key, + std::function callback, + std::shared_ptr with) { + // We don't support replacing an existing callback with a new one, because it gets + // tricky making sure we don't delete an old callback while the middleware still holds + // a pointer to it. + if (owned_cbs_.count(key) != 0) { + throw py::key_error("Attempt to replace existing callback"); + } + CbEntry new_entry; + new_entry.cb = std::make_unique>( + [this, callback, key](size_t number_of_events) { + boost::asio::post(executor_, [this, callback, key, number_of_events]() { + if (!owned_cbs_.count(key)) { + // This callback has been removed, just drop it as the objects it may want + // to touch may no longer exist. + return; + } + callback(number_of_events); + }); + }); + new_entry.with = with; + const void* ret = new_entry.cb.get(); + owned_cbs_[key] = std::move(new_entry); + return ret; +} + +void RclCallbackManager::RemoveCallback(const void* key) { + if (!owned_cbs_.erase(key)) { + throw py::key_error("Attempt to remove nonexistent callback"); + } +} + +namespace { +// This helper function is used for retrieving rcl pointers from _rclpy C++ objects. +// Because _rclpy doesn't install its C++ headers for public use, it's difficult to use +// the C++ classes directly. But, we can treat them as if they are Python objects using +// their defined Python API. Unfortunately the python interfaces convert the returned +// pointer to integers, so recovering that looks a little weird. +template +RclT* GetRclPtr(py::handle py_ent_handle) { + return reinterpret_cast(py::cast(py_ent_handle.attr("pointer"))); +} +} // namespace + +rcl_subscription_t* GetRclSubscription(py::handle sub_handle) { + return GetRclPtr(sub_handle); +} + +rcl_timer_t* GetRclTimer(py::handle timer_handle) { + return GetRclPtr(timer_handle); +} + +rcl_client_t* GetRclClient(py::handle cl_handle) { + return GetRclPtr(cl_handle); +} + +rcl_service_t* GetRclService(py::handle srv_handle) { + return GetRclPtr(srv_handle); +} + +rcl_wait_set_t* GetRclWaitSet(py::handle ws) { return GetRclPtr(ws); } + +} // namespace events_executor +} // namespace rclpy diff --git a/rclpy/src/rclpy/events_executor/rcl_support.hpp b/rclpy/src/rclpy/events_executor/rcl_support.hpp new file mode 100644 index 000000000..982461bf1 --- /dev/null +++ b/rclpy/src/rclpy/events_executor/rcl_support.hpp @@ -0,0 +1,108 @@ +// Copyright 2024 Brad Martin +// Copyright 2024 Merlin Labs, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#pragma once + +#include +#include +#include + +#include + +#include +#include +#include +#include +#include + +#include + +#include "scoped_with.hpp" + +namespace rclpy { +namespace events_executor { + +/// Use this for all RCL event callbacks. Use the return value from +/// RclCallbackManager::MakeCallback() as the user data arg. +/// +/// Note that RCL callbacks are invoked in some arbitrary thread originating from the +/// middleware. Callbacks should process quickly to avoid blocking the middleware; i.e. +/// all actual work should be posted to an asio loop in another thread. +extern "C" void RclEventCallbackTrampoline(const void* user_data, + size_t number_of_events); + +/// Creates and maintains callback wrappers used with the RCL C library. +class RclCallbackManager { + public: + /// All user callbacks will be posted on the @p executor given to the constructor. + /// These callbacks will be invoked without the Python Global Interpreter Lock held, + /// so if they need to access Python at all make sure to acquire that explicitly. + explicit RclCallbackManager(const boost::asio::any_io_executor& executor); + ~RclCallbackManager(); + + /// Creates a callback wrapper to be passed to RCL C functions. @p key should be a + /// pointer to the rcl object that will be associated with the callback. @p with + /// protects the _rclpy object handle owning the RCL object, for the duration the + /// callback is established. + const void* MakeCallback(const void* key, std::function callback, + std::shared_ptr with); + + /// Discard a previously constructed callback. @p key should be the same value + /// provided to MakeCallback(). Caution: ensure that RCL is no longer using a + /// callback before invoking this. + void RemoveCallback(const void* key); + + private: + /// The C RCL interface deals in raw pointers, so someone needs to own the C++ + /// function objects we'll be calling into. We use unique pointers so the raw pointer + /// to the object remains stable while the map is manipulated. + struct CbEntry { + std::unique_ptr> cb; + std::shared_ptr with; + }; + + boost::asio::any_io_executor executor_; + + /// The map key is the raw pointer to the RCL entity object (subscription, etc) + /// associated with the callback. + std::unordered_map owned_cbs_; +}; + +/// Returns the RCL subscription object pointer from a subscription handle (i.e. the +/// handle attribute of an rclpy Subscription object, which is a _rclpy C++ +/// Subscription object). Assumes that a ScopedWith has already been entered on the +/// given handle. +rcl_subscription_t* GetRclSubscription(pybind11::handle); + +/// Returns the RCL timer object pointer from a timer handle (i.e. the handle attribute +/// of an rclpy Timer object, which is a _rclpy C++ Timer object). Assumes that a +/// ScopedWith has already been entered on the given handle. +rcl_timer_t* GetRclTimer(pybind11::handle); + +/// Returns the RCL client object pointer from a client handle (i.e. the handle +/// attribute of an rclpy Client object, which is a _rclpy C++ Client object). Assumes +/// that a ScopedWith has already been entered on the given handle. +rcl_client_t* GetRclClient(pybind11::handle); + +/// Returns the RCL service object pointer from a service handle (i.e. the handle +/// attribute of an rclpy Service object, which is a _rclpy C++ Service object). +/// Assumes that a ScopedWith has already been entered on the given handle. +rcl_service_t* GetRclService(pybind11::handle); + +/// Returns the RCL wait set object pointer from a _rclpy C++ WaitSet object. Assumes +/// that a ScopedWith has already been entered on the given object. +rcl_wait_set_t* GetRclWaitSet(pybind11::handle); + +} // namespace events_executor +} // namespace rclpy diff --git a/rclpy/src/rclpy/events_executor/scoped_with.hpp b/rclpy/src/rclpy/events_executor/scoped_with.hpp new file mode 100644 index 000000000..53e3e6daa --- /dev/null +++ b/rclpy/src/rclpy/events_executor/scoped_with.hpp @@ -0,0 +1,39 @@ +// Copyright 2024 Brad Martin +// Copyright 2024 Merlin Labs, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#pragma once + +#include + +namespace rclpy { +namespace events_executor { + +/// Enters a python context manager for the scope of this object instance. +class ScopedWith { + public: + explicit ScopedWith(pybind11::handle object) + : object_(pybind11::cast(object)) { + object_.attr("__enter__")(); + } + + ~ScopedWith() { + object_.attr("__exit__")(pybind11::none(), pybind11::none(), pybind11::none()); + } + + private: + pybind11::object object_; +}; + +} // namespace events_executor +} // namespace rclpy diff --git a/rclpy/src/rclpy/events_executor/timers_manager.cpp b/rclpy/src/rclpy/events_executor/timers_manager.cpp new file mode 100644 index 000000000..2318754e5 --- /dev/null +++ b/rclpy/src/rclpy/events_executor/timers_manager.cpp @@ -0,0 +1,329 @@ +// Copyright 2024 Brad Martin +// Copyright 2024 Merlin Labs, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include "events_executor/timers_manager.hpp" + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include + +#include "rcl_support.hpp" + +namespace py = pybind11; + +namespace rclpy { +namespace events_executor { + +namespace { + +// Implementation note: the original iRobot TimersManager associated with the rclcpp +// EventsExecutor maintained a heap with all outstanding timers sorted by their next +// expiration time. Here that approach will be skipped in favor of just looking at +// every timer every update for the following reasons: +// +// * This approach is simpler +// * In the applications this has been used in so far, no Node types exist that have a +// large number of timers outstanding at once. Assuming no more than a few timers +// exist in the whole process, the heap seems like overkill. +// * Every time a timer ticks or is reset, the heap needs to be resorted anyway. +// * The rcl timer interface doesn't expose any way to get timer expiration info in +// absolute terms; you can only find out 'time until next callback'. This means if +// you are trying to sort the list of timers, the 'value' of each entry in the heap +// changes depending on when you look at it during the process of sorting. +// +// We will however yell a bit if we ever see a large number of timers that disproves +// this assumption, so we can reassess this decision. +constexpr size_t WARN_TIMERS_COUNT = 8; + +typedef std::function ClockJumpCallbackT; +typedef std::function TimerResetCallbackT; + +extern "C" void RclClockJumpTrampoline(const rcl_time_jump_t* time_jump, + bool before_jump, void* user_data) { + // rcl calls this both before and after a clock change, and we never want the before + // callback, so let's just eliminate that case early. + if (before_jump) { + return; + } + auto cb = reinterpret_cast(user_data); + (*cb)(time_jump); +} + +extern "C" void RclTimerResetTrampoline(const void* user_data, size_t) { + auto cb = reinterpret_cast(user_data); + (*cb)(); +} + +} // namespace + +/// Manages a single clock source, and all timers operating on that source. All methods +/// (including construction and destruction) are assumed to happen on the thread running +/// the provided asio executor. +class RclTimersManager::ClockManager { + public: + ClockManager(const boost::asio::any_io_executor& executor, rcl_clock_t* clock) + : executor_(executor), clock_(clock) { + // Need to establish a clock jump callback so we can tell when debug time is + // updated. + rcl_jump_threshold_t threshold{ + .on_clock_change = true, .min_forward = 1, .min_backward = -1}; + // Note, this callback could happen on any thread + jump_cb_ = [this](const rcl_time_jump_t* time_jump) { + bool on_debug{}; + switch (time_jump->clock_change) { + case RCL_ROS_TIME_NO_CHANGE: + case RCL_ROS_TIME_ACTIVATED: + on_debug = true; + break; + case RCL_ROS_TIME_DEACTIVATED: + case RCL_SYSTEM_TIME_NO_CHANGE: + on_debug = false; + break; + } + boost::asio::post(executor_, + std::bind(&ClockManager::HandleJump, this, on_debug)); + }; + if (RCL_RET_OK != rcl_clock_add_jump_callback(clock_, threshold, + RclClockJumpTrampoline, &jump_cb_)) { + throw std::runtime_error(std::string("Failed to set RCL clock jump callback: ") + + rcl_get_error_string().str); + } + + // This isn't necessary yet but every timer will eventually depend on it. Again, + // this could happen on any thread. + reset_cb_ = [this]() { + boost::asio::post(executor_, std::bind(&ClockManager::UpdateTimers, this)); + }; + + // Initialize which timebase we're on + if (clock_->type == RCL_ROS_TIME) { + if (RCL_RET_OK != rcl_is_enabled_ros_time_override(clock_, &on_debug_time_)) { + throw std::runtime_error( + std::string("Failed to get RCL clock override state: ") + + rcl_get_error_string().str); + } + } + } + + ~ClockManager() { + if (RCL_RET_OK != + rcl_clock_remove_jump_callback(clock_, RclClockJumpTrampoline, &jump_cb_)) { + py::gil_scoped_acquire gil_acquire; + py::print(std::string("Failed to remove RCL clock jump callback: ") + + rcl_get_error_string().str); + } + while (!timers_.empty()) { + RemoveTimer(timers_.begin()->first); + } + } + + bool empty() const { return timers_.empty(); } + + void AddTimer(rcl_timer_t* timer, std::function ready_callback) { + // All timers have the same reset callback + if (RCL_RET_OK != + rcl_timer_set_on_reset_callback(timer, RclTimerResetTrampoline, &reset_cb_)) { + throw std::runtime_error(std::string("Failed to set timer reset callback: ") + + rcl_get_error_string().str); + } + timers_[timer] = ready_callback; + if (timers_.size() == WARN_TIMERS_COUNT) { + py::print("Warning, the number of timers associated with this clock is large."); + py::print("Management of this number of timers may be inefficient."); + } + UpdateTimers(); + } + + void RemoveTimer(rcl_timer_t* timer) { + auto it = timers_.find(timer); + if (it == timers_.end()) { + throw py::key_error("Attempt to remove unmanaged timer"); + } + + if (RCL_RET_OK != rcl_timer_set_on_reset_callback(timer, nullptr, nullptr)) { + throw std::runtime_error(std::string("Failed to clear timer reset callback: ") + + rcl_get_error_string().str); + } + timers_.erase(it); + // We could re-evaluate how long we need to block for now that a timer has been + // removed; but, there's no real harm in one extra wakeup that then decides it + // doesn't need to do anything, and this timer might not even be the next to fire, + // so we won't bother. + } + + private: + void HandleJump(bool on_debug_time) { + on_debug_time_ = on_debug_time; + UpdateTimers(); + } + + void UpdateTimers() { + // First, evaluate all of our timers and dispatch any that are ready now. While + // we're at it, keep track of the earliest next timer callback that is due. + std::optional next_ready_ns; + for (const auto& timer_cb_pair : timers_) { + auto next_call_ns = GetNextCallNanoseconds(timer_cb_pair.first); + if (next_call_ns <= 0) { + // This just notifies RCL that we're considering the timer triggered, for the + // purposes of updating the next trigger time. + const auto ret = rcl_timer_call(timer_cb_pair.first); + switch (ret) { + case RCL_RET_OK: + break; + case RCL_RET_TIMER_CANCELED: + // Someone apparently canceled the timer *after* we just queried the next + // call time? Nevermind, then... + rcl_reset_error(); + continue; + default: + throw std::runtime_error(std::string("Failed to call RCL timer: ") + + rcl_get_error_string().str); + } + + // Post the user callback to be invoked later once timing-sensitive code is + // done. + boost::asio::post(executor_, timer_cb_pair.second); + + // Update time until *next* call. + next_call_ns = GetNextCallNanoseconds(timer_cb_pair.first); + } + if (!next_ready_ns || (next_call_ns < *next_ready_ns)) { + next_ready_ns = next_call_ns; + } + } + + // If we're not on debug time, we should schedule another wakeup when we + // anticipate the next timer being ready. If we are, we'll just re-check + // everything at the next jump callback. + if (!on_debug_time_ && next_ready_ns) { + // Boost doesn't support nanoseconds by default. Round the conversion up so we + // don't wake early and find nothing to do. + const int64_t sleep_us = (*next_ready_ns + 999) / 1000; + next_update_wait_.expires_from_now(boost::posix_time::microseconds(sleep_us)); + next_update_wait_.async_wait([this](const boost::system::error_code& ec) { + if (!ec) { + UpdateTimers(); + } else if (ec != boost::asio::error::operation_aborted) { + throw std::runtime_error("Error waiting for next timer: " + ec.message()); + } + }); + } else { + next_update_wait_.cancel(); + } + } + + /// Returns the number of nanoseconds until the next callback on the given timer is + /// due. Value may be negative or zero if callback time has already been reached. + /// Returns std::nullopt if the timer is canceled. + static std::optional GetNextCallNanoseconds(const rcl_timer_t* rcl_timer) { + int64_t time_until_next_call{}; + const rcl_ret_t ret = + rcl_timer_get_time_until_next_call(rcl_timer, &time_until_next_call); + switch (ret) { + case RCL_RET_OK: + return time_until_next_call; + case RCL_RET_TIMER_CANCELED: + return {}; + default: + throw std::runtime_error(std::string("Failed to fetch timer ready time: ") + + rcl_get_error_string().str); + } + } + + boost::asio::any_io_executor executor_; + rcl_clock_t* const clock_; + ClockJumpCallbackT jump_cb_; + TimerResetCallbackT reset_cb_; + bool on_debug_time_{}; + + std::unordered_map> timers_; + boost::asio::deadline_timer next_update_wait_{executor_}; +}; + +RclTimersManager::RclTimersManager(const boost::asio::any_io_executor& executor) + : executor_(executor) {} + +RclTimersManager::~RclTimersManager() {} + +namespace { +rcl_clock_t* GetTimerClock(rcl_timer_t* timer) { + rcl_clock_t* clock{}; + if (RCL_RET_OK != rcl_timer_clock(timer, &clock)) { + throw std::runtime_error(std::string("Failed to determine clock for timer: ") + + rcl_get_error_string().str); + } + return clock; +} +} // namespace + +void RclTimersManager::AddTimer(rcl_timer_t* timer, + std::function ready_callback) { + // Figure out the clock this timer is using, make sure a manager exists for that + // clock, then forward the timer to that clock's manager. + rcl_clock_t* clock = GetTimerClock(timer); + auto it = clock_managers_.find(clock); + if (it == clock_managers_.end()) { + std::tie(it, std::ignore) = clock_managers_.insert( + std::make_pair(clock, std::make_unique(executor_, clock))); + } + it->second->AddTimer(timer, ready_callback); +} + +void RclTimersManager::RemoveTimer(rcl_timer_t* timer) { + const rcl_clock_t* clock = GetTimerClock(timer); + auto it = clock_managers_.find(clock); + if (it == clock_managers_.end()) { + throw py::key_error("Attempt to remove timer from unmanaged clock"); + } + it->second->RemoveTimer(timer); + if (it->second->empty()) { + clock_managers_.erase(it); + } +} + +TimersManager::TimersManager(const boost::asio::any_io_executor& executor, + std::function timer_ready_callback) + : rcl_manager_(executor), ready_callback_(timer_ready_callback) {} + +TimersManager::~TimersManager() {} + +void TimersManager::AddTimer(py::handle timer) { + PyRclMapping mapping; + py::handle handle = timer.attr("handle"); + mapping.with = std::make_unique(handle); + mapping.rcl_ptr = GetRclTimer(handle); + rcl_manager_.AddTimer(mapping.rcl_ptr, std::bind(ready_callback_, timer)); + timer_mappings_[timer] = std::move(mapping); +} + +void TimersManager::RemoveTimer(py::handle timer) { + const auto it = timer_mappings_.find(timer); + if (it == timer_mappings_.end()) { + throw py::key_error("Attempt to remove unmanaged timer"); + } + rcl_manager_.RemoveTimer(it->second.rcl_ptr); + timer_mappings_.erase(it); +} + +} // namespace events_executor +} // namespace rclpy diff --git a/rclpy/src/rclpy/events_executor/timers_manager.hpp b/rclpy/src/rclpy/events_executor/timers_manager.hpp new file mode 100644 index 000000000..314dac9b4 --- /dev/null +++ b/rclpy/src/rclpy/events_executor/timers_manager.hpp @@ -0,0 +1,84 @@ +// Copyright 2024 Brad Martin +// Copyright 2024 Merlin Labs, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#pragma once + +#include +#include +#include + +#include + +#include + +#include +#include + +#include "events_executor/python_hasher.hpp" +#include "events_executor/scoped_with.hpp" + +namespace rclpy { +namespace events_executor { + +/// This class manages low-level rcl timers in the system on behalf of EventsExecutor. +class RclTimersManager { + public: + explicit RclTimersManager(const boost::asio::any_io_executor&); + ~RclTimersManager(); + + void AddTimer(rcl_timer_t*, std::function ready_callback); + void RemoveTimer(rcl_timer_t*); + + private: + boost::asio::any_io_executor executor_; + + class ClockManager; + /// Handlers for each distinct clock source in the system. + std::unordered_map> clock_managers_; +}; + +/// This class manages rclpy.Timer Python objects on behalf of EventsExecutor. +class TimersManager { + public: + /// @p timer_ready_callback will be invoked with the timer handle whenever a managed + /// timer is ready for servicing. + TimersManager(const boost::asio::any_io_executor&, + std::function timer_ready_callback); + ~TimersManager(); + + /// Accessor for underlying rcl timer manager, for management of non-Python timers. + RclTimersManager& rcl_manager() { return rcl_manager_; } + + // Both of these methods expect the GIL to be held when they are called. + void AddTimer(pybind11::handle timer); + void RemoveTimer(pybind11::handle timer); + + private: + struct PyRclMapping { + /// Marks the corresponding Python object as in-use for as long as we're using the + /// rcl pointer derived from it. + std::unique_ptr with; + + /// The underlying rcl object + rcl_timer_t* rcl_ptr{}; + }; + + RclTimersManager rcl_manager_; + const std::function ready_callback_; + + std::unordered_map timer_mappings_; +}; + +} // namespace events_executor +} // namespace rclpy diff --git a/rclpy/test/test_events_executor.py b/rclpy/test/test_events_executor.py new file mode 100644 index 000000000..8addb8ba1 --- /dev/null +++ b/rclpy/test/test_events_executor.py @@ -0,0 +1,659 @@ +# Copyright 2024 Brad Martin +# Copyright 2024 Merlin Labs, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import typing +import unittest + +import action_msgs.msg +import rclpy.action +import rclpy.duration +import rclpy.event_handler +import rclpy.executors +import rclpy.experimental +import rclpy.node +import rclpy.qos +import rclpy.time +import rosgraph_msgs.msg +import test_msgs.action +import test_msgs.msg +import test_msgs.srv + + +def _get_pub_sub_qos(transient_local: bool) -> rclpy.qos.QoSProfile: + if not transient_local: + return rclpy.qos.QoSProfile(history=rclpy.qos.HistoryPolicy.KEEP_ALL) + # For test purposes we deliberately want a TRANSIENT_LOCAL QoS with KEEP_ALL + # history. + return rclpy.qos.QoSProfile( + history=rclpy.qos.HistoryPolicy.KEEP_ALL, + durability=rclpy.qos.DurabilityPolicy.TRANSIENT_LOCAL, + ) + + +class SubTestNode(rclpy.node.Node): + """Node to test subscriptions and subscription-related events.""" + + def __init__(self, *, transient_local: bool = False): + super().__init__("test_sub_node") + self._new_pub_future: rclpy.Future[ + rclpy.event_handler.QoSSubscriptionMatchedInfo + ] | None = None + self._received_future: rclpy.Future[test_msgs.msg.BasicTypes] | None = None + self.create_subscription( + test_msgs.msg.BasicTypes, + # This node seems to get stale discovery data and then complain about QoS + # changes if we reuse the same topic name. + "test_topic" + ("_transient_local" if transient_local else ""), + self._handle_sub, + _get_pub_sub_qos(transient_local), + event_callbacks=rclpy.event_handler.SubscriptionEventCallbacks( + matched=self._handle_matched_sub + ), + ) + + def expect_pub_info( + self, + ) -> rclpy.Future[rclpy.event_handler.QoSSubscriptionMatchedInfo]: + self._new_pub_future = rclpy.Future() + return self._new_pub_future + + def expect_message(self) -> rclpy.Future[test_msgs.msg.BasicTypes]: + self._received_future = rclpy.Future() + return self._received_future + + def _handle_sub(self, msg: test_msgs.msg.BasicTypes) -> None: + if self._received_future is not None: + future = self._received_future + self._received_future = None + future.set_result(msg) + + def _handle_matched_sub( + self, info: rclpy.event_handler.QoSSubscriptionMatchedInfo + ) -> None: + """Handle a new publisher being matched to our subscription.""" + if self._new_pub_future is not None: + self._new_pub_future.set_result(info) + self._new_pub_future = None + + +class PubTestNode(rclpy.node.Node): + """Node to test publications and publication-related events.""" + + def __init__(self, *, transient_local: bool = False): + super().__init__("test_pub_node") + self._new_sub_future: rclpy.Future[ + rclpy.event_handler.QoSPublisherMatchedInfo + ] | None = None + self._pub = self.create_publisher( + test_msgs.msg.BasicTypes, + "test_topic" + ("_transient_local" if transient_local else ""), + _get_pub_sub_qos(transient_local), + event_callbacks=rclpy.event_handler.PublisherEventCallbacks( + matched=self._handle_matched_pub + ), + ) + + def expect_sub_info( + self, + ) -> rclpy.Future[rclpy.event_handler.QoSPublisherMatchedInfo]: + self._new_sub_future = rclpy.Future() + return self._new_sub_future + + def publish(self, value: float) -> None: + self._pub.publish(test_msgs.msg.BasicTypes(float32_value=value)) + + def _handle_matched_pub( + self, info: rclpy.event_handler.QoSPublisherMatchedInfo + ) -> None: + """Handle a new subscriber being matched to our publication.""" + if self._new_sub_future is not None: + self._new_sub_future.set_result(info) + self._new_sub_future = None + + +class ServiceServerTestNode(rclpy.node.Node): + """Node to test service server-side operation.""" + + def __init__(self): + super().__init__("test_service_server_node") + self._got_request_future: rclpy.Future[ + test_msgs.srv.BasicTypes.Request + ] | None = None + self._pending_response: test_msgs.srv.BasicTypes.Response | None = None + self.create_service( + test_msgs.srv.BasicTypes, "test_service", self._handle_request + ) + + def expect_request( + self, success: bool, error_msg: str + ) -> rclpy.Future[test_msgs.srv.BasicTypes.Request]: + """Expect an incoming request. + + The arguments are used to compose the response. + """ + self._got_request_future = rclpy.Future() + self._pending_response = test_msgs.srv.BasicTypes.Response( + bool_value=success, string_value=error_msg + ) + return self._got_request_future + + def _handle_request( + self, + req: test_msgs.srv.BasicTypes.Request, + res: test_msgs.srv.BasicTypes.Response, + ) -> test_msgs.srv.BasicTypes.Response: + if self._got_request_future is not None: + self._got_request_future.set_result(req) + self._got_request_future = None + if self._pending_response is not None: + res = self._pending_response + self._pending_response = None + return res + + +class ServiceClientTestNode(rclpy.node.Node): + """Node to test service client-side operation.""" + + def __init__(self): + super().__init__("test_service_client_node") + self._client = self.create_client(test_msgs.srv.BasicTypes, "test_service") + + def issue_request( + self, value: float + ) -> rclpy.Future[test_msgs.srv.BasicTypes.Response]: + req = test_msgs.srv.BasicTypes.Request(float32_value=value) + return self._client.call_async(req) + + +class TimerTestNode(rclpy.node.Node): + """Node to test timer operation.""" + + def __init__( + self, index: int = 0, parameter_overrides: list[rclpy.Parameter] | None = None + ): + super().__init__(f"test_timer{index}", parameter_overrides=parameter_overrides) + self._timer_events = 0 + self._tick_future: rclpy.Future[None] | None = None + self._timer = self.create_timer(0.1, self._handle_timer) + + @property + def timer_events(self) -> int: + return self._timer_events + + def expect_tick(self) -> rclpy.Future[None]: + self._tick_future = rclpy.Future() + return self._tick_future + + def _handle_timer(self) -> None: + self._timer_events += 1 + if self._tick_future is not None: + self._tick_future.set_result(None) + self._tick_future = None + + +class ClockPublisherNode(rclpy.node.Node): + """Node to publish rostime clock updates.""" + + def __init__(self): + super().__init__("clock_node") + self._now = rclpy.time.Time() + self._pub = self.create_publisher( + rosgraph_msgs.msg.Clock, + "/clock", + rclpy.qos.QoSProfile( + depth=1, reliability=rclpy.qos.ReliabilityPolicy.BEST_EFFORT + ), + ) + + def advance_time(self, millisec: int) -> None: + self._now += rclpy.duration.Duration(nanoseconds=millisec * 1000000) + self._pub.publish(rosgraph_msgs.msg.Clock(clock=self._now.to_msg())) + + +class ActionServerTestNode(rclpy.node.Node): + """Node to test action server-side operation.""" + + def __init__(self): + super().__init__( + "test_action_server_node", + parameter_overrides=[rclpy.Parameter("use_sim_time", value=True)], + ) + self._got_goal_future: rclpy.Future[ + test_msgs.action.Fibonacci.Goal + ] | None = None + self._srv = rclpy.action.ActionServer( + self, + test_msgs.action.Fibonacci, + "test_action", + self._handle_execute, + handle_accepted_callback=self._handle_accepted, + result_timeout=10, + ) + self._goal_handle: rclpy.action.server.ServerGoalHandle | None = None + self._sequence: list[int] = [] + + def expect_goal(self) -> rclpy.Future[test_msgs.action.Fibonacci.Goal]: + self._goal_handle = None + self._got_goal_future = rclpy.Future() + return self._got_goal_future + + def _handle_accepted( + self, goal_handle: rclpy.action.server.ServerGoalHandle + ) -> None: + self._goal_handle = goal_handle + self._sequence = [0, 1] + if self._got_goal_future is not None: + self._got_goal_future.set_result(goal_handle.request) + self._got_goal_future = None + # Wait to finish until instructed by test + + def advance_feedback(self) -> list[int] | None: + """Add an entry to the result in progress and sends a feedback message. + + Returns the current sequence in progress if incomplete, or None if the sequence + is complete and it's time to complete the operation instead. + + """ + assert self._goal_handle is not None + n = self._goal_handle.request.order + 1 + if len(self._sequence) < n: + self._sequence.append(self._sequence[-2] + self._sequence[-1]) + if len(self._sequence) >= n: + return None + + # FYI normally feedbacks would be sent from the execute handler, but we've tied + # it to its own public method for testing + fb = test_msgs.action.Fibonacci.Feedback() + fb.sequence = self._sequence + self._goal_handle.publish_feedback(fb) + return self._sequence + + def execute(self) -> rclpy.action.server.ServerGoalHandle: + """Completes the action in progress. + + Returns the handle to the goal executed. + + """ + handle = self._goal_handle + self._goal_handle = None + assert handle is not None + handle.execute() + return handle + + def _handle_execute( + self, goal_handle: rclpy.action.server.ServerGoalHandle + ) -> test_msgs.action.Fibonacci.Result: + goal_handle.succeed() + result = test_msgs.action.Fibonacci.Result() + result.sequence = self._sequence + return result + + +class ActionClientTestNode(rclpy.node.Node): + """Node to test action client-side operation.""" + + def __init__(self): + super().__init__("test_action_client_node") + self._client = rclpy.action.ActionClient( + self, test_msgs.action.Fibonacci, "test_action" + ) + self._feedback_future: rclpy.Future[ + test_msgs.action.Fibonacci.Feedback + ] | None = None + self._result_future: rclpy.Future[ + test_msgs.action.Fibonacci.Result + ] | None = None + + def send_goal( + self, order: int + ) -> rclpy.Future[rclpy.action.client.ClientGoalHandle]: + """Send a new goal. + + The future will contain the goal handle when the goal submission response has + been received. + + """ + self._client.wait_for_server() + goal_ack_future = self._client.send_goal_async( + test_msgs.action.Fibonacci.Goal(order=order), + feedback_callback=self._handle_feedback, + ) + goal_ack_future.add_done_callback(self._handle_goal_ack) + return goal_ack_future + + def _handle_goal_ack( + self, future: rclpy.Future[rclpy.action.client.ClientGoalHandle] + ) -> None: + handle = future.result() + assert handle is not None + result_future = handle.get_result_async() + result_future.add_done_callback(self._handle_result_response) + + def expect_feedback(self) -> rclpy.Future[test_msgs.action.Fibonacci.Feedback]: + self._feedback_future = rclpy.Future() + return self._feedback_future + + def _handle_feedback( + self, + # If this is a private 'Impl' detail, why is rclpy handing this out?? + fb_msg: test_msgs.action.Fibonacci.Impl.FeedbackMessage, + ) -> None: + if self._feedback_future is not None: + self._feedback_future.set_result(fb_msg.feedback) + self._feedback_future = None + + def expect_result( + self, + ) -> rclpy.Future[test_msgs.action.Fibonacci.Result]: + self._result_future = rclpy.Future() + return self._result_future + + def _handle_result_response( + self, future: rclpy.Future[test_msgs.action.Fibonacci_GetResult_Response] + ) -> None: + response: test_msgs.action.Fibonacci_GetResult_Response | None = future.result() + assert response is not None + assert self._result_future is not None + result: test_msgs.action.Fibonacci.Result = response.result + self._result_future.set_result(result) + self._result_future = None + + +class TestEventsExecutor(unittest.TestCase): + def setUp(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + # Prevent nodes under test from discovering other random stuff to talk to + os.environ["ROS_AUTOMATIC_DISCOVERY_RANGE"] = "OFF" + rclpy.init() + + self.executor = rclpy.experimental.EventsExecutor() + + def tearDown(self) -> None: + rclpy.shutdown() + + def _expect_future_done(self, future: rclpy.Future[typing.Any]) -> None: + # Use a moderately long timeout with the expectation that we shouldn't often + # need the whole duration. + self.executor.spin_until_future_complete(future, 1.0) + self.assertTrue(future.done()) + + def _expect_future_not_done(self, future: rclpy.Future[typing.Any]) -> None: + # Use a short timeout to give the future some time to complete if we are going + # to fail, but not very long because we'll be waiting the full duration every + # time during successful tests. It's ok if the timeout is a bit short and the + # failure isn't 100% deterministic. + self.executor.spin_until_future_complete(future, 0.2) + self.assertFalse(future.done()) + + def _spin_for(self, sec: float) -> None: + """Spins the executor for the given number of realtime seconds.""" + # Note that this roundabout approach of waiting on a future that will never + # finish with a timeout seems to be the only way with the rclpy.Executor API to + # spin for a fixed time. + self.executor.spin_until_future_complete(rclpy.Future(), sec) + + def _check_match_event_future( + self, + future: rclpy.Future[ + rclpy.event_handler.QoSSubscriptionMatchedInfo + | rclpy.event_handler.QoSPublisherMatchedInfo + ], + total_count: int, + current_count: int, + ) -> None: + # NOTE: fastdds appears to be buggy and reports a change in total_count with + # total_count_change staying zero. cyclonedds works as expected. Rather than + # have this test be sensitive to which RMW is selected, let's just avoid testing + # the change fields altogether. + self._expect_future_done(future) + info: ( # These two python types are both actually rmw_matched_status_t + rclpy.event_handler.QoSSubscriptionMatchedInfo + | rclpy.event_handler.QoSPublisherMatchedInfo + | None + ) = future.result() + assert info is not None + self.assertEqual(info.total_count, total_count) + self.assertEqual(info.current_count, current_count) + + def _check_message_future( + self, future: rclpy.Future[test_msgs.msg.BasicTypes], value: float + ) -> None: + self._expect_future_done(future) + msg: test_msgs.msg.BasicTypes | None = future.result() + assert msg is not None + self.assertAlmostEqual(msg.float32_value, value, places=5) + + def _check_service_request_future( + self, future: rclpy.Future[test_msgs.srv.BasicTypes.Request], value: float + ) -> None: + self._expect_future_done(future) + req: test_msgs.srv.BasicTypes.Request | None = future.result() + assert req is not None + self.assertAlmostEqual(req.float32_value, value, places=5) + + def _check_service_response_future( + self, + future: rclpy.Future[test_msgs.srv.BasicTypes.Response], + success: bool, + error_msg: str, + ) -> None: + self._expect_future_done(future) + res: test_msgs.srv.BasicTypes.Response | None = future.result() + assert res is not None + self.assertEqual(res.bool_value, success) + self.assertEqual(res.string_value, error_msg) + + def test_pub_sub(self) -> None: + sub_node = SubTestNode() + new_pub_future = sub_node.expect_pub_info() + received_future = sub_node.expect_message() + self.executor.add_node(sub_node) + + # With subscriber node alone, should be no publisher or messages + self._expect_future_not_done(new_pub_future) + self.assertFalse(received_future.done()) # Already waited a bit + + pub_node = PubTestNode() + new_sub_future = pub_node.expect_sub_info() + self.executor.add_node(pub_node) + + # Publisher and subscriber should find each other but no messages should be + # exchanged yet + self._check_match_event_future(new_pub_future, 1, 1) + new_pub_future = sub_node.expect_pub_info() + self._check_match_event_future(new_sub_future, 1, 1) + new_sub_future = pub_node.expect_sub_info() + self._expect_future_not_done(received_future) + + # Send messages and make sure they're received. + for i in range(300): + pub_node.publish(0.1 * i) + self._check_message_future(received_future, 0.1 * i) + received_future = sub_node.expect_message() + + # Destroy the subscriber node, make sure the publisher is notified + self.executor.remove_node(sub_node) + sub_node.destroy_node() + self._check_match_event_future(new_sub_future, 1, 0) + + # Publish another message to ensure all subscriber callbacks got cleaned up + pub_node.publish(4.7) + self._expect_future_not_done(new_pub_future) + self.assertFalse(received_future.done()) # Already waited a bit + + def test_pub_sub_multi_message(self) -> None: + # Creates a transient local publisher and queues multiple messages on it. Then + # creates a subscriber and makes sure all sent messages get delivered when it + # comes up. + pub_node = PubTestNode(transient_local=True) + self.executor.add_node(pub_node) + for i in range(5): + pub_node.publish(0.1 * i) + + sub_node = SubTestNode(transient_local=True) + received_future = sub_node.expect_message() + received_messages: list[test_msgs.msg.BasicTypes] = [] + + def handle_message(future: rclpy.Future[test_msgs.msg.BasicTypes]) -> None: + nonlocal received_future + msg = future.result() + assert msg is not None + received_messages.append(msg) + received_future = sub_node.expect_message() + received_future.add_done_callback(handle_message) + + received_future.add_done_callback(handle_message) + self._expect_future_not_done(received_future) + self.executor.add_node(sub_node) + while len(received_messages) < 5: + self._expect_future_done(received_future) + self.assertEqual(len(received_messages), 5) + for i in range(5): + self.assertAlmostEqual( + received_messages[i].float32_value, 0.1 * i, places=5 + ) + self._expect_future_not_done(received_future) + + pub_node.publish(0.5) + self._check_message_future(received_future, 0.5) + + def test_service(self) -> None: + server_node = ServiceServerTestNode() + got_request_future = server_node.expect_request(True, "test response 0") + self.executor.add_node(server_node) + self._expect_future_not_done(got_request_future) + + client_node = ServiceClientTestNode() + self.executor.add_node(client_node) + self._expect_future_not_done(got_request_future) + for i in range(300): + got_response_future = client_node.issue_request(7.1) + self._check_service_request_future(got_request_future, 7.1) + got_request_future = server_node.expect_request( + True, f"test response {i + 1}" + ) + self._check_service_response_future( + got_response_future, True, f"test response {i}" + ) + + # Destroy server node and retry issuing a request + self.executor.remove_node(server_node) + server_node.destroy_node() + self._expect_future_not_done(got_request_future) + got_response_future = client_node.issue_request(5.0) + self._expect_future_not_done(got_request_future) + self.assertFalse(got_response_future.done()) # Already waited a bit + + def test_timers(self) -> None: + realtime_node = TimerTestNode(index=0) + rostime_node = TimerTestNode( + index=1, parameter_overrides=[rclpy.Parameter("use_sim_time", value=True)] + ) + clock_node = ClockPublisherNode() + for node in [realtime_node, rostime_node, clock_node]: + self.executor.add_node(node) + + # Wait a bit, and make sure the realtime timer ticks, and the rostime one does + # not. Since this is based on wall time, be very flexible on tolerances here. + self._spin_for(1.0) + realtime_ticks = realtime_node.timer_events + self.assertGreater(realtime_ticks, 1) + self.assertLess(realtime_ticks, 50) + self.assertEqual(rostime_node.timer_events, 0) + + # Manually tick the rostime timer by less than a full interval. + rostime_tick_future = rostime_node.expect_tick() + for _ in range(99): + clock_node.advance_time(1) + self._expect_future_not_done(rostime_tick_future) + clock_node.advance_time(1) + self._expect_future_done(rostime_tick_future) + # Now tick by a bunch of full intervals. + for _ in range(300): + rostime_tick_future = rostime_node.expect_tick() + clock_node.advance_time(100) + self._expect_future_done(rostime_tick_future) + + # Ensure the realtime timer ticked much less than the rostime one. + self.assertLess(realtime_node.timer_events, rostime_node.timer_events) + + def test_action(self) -> None: + clock_node = ClockPublisherNode() + self.executor.add_node(clock_node) + + server_node = ActionServerTestNode() + got_goal_future = server_node.expect_goal() + self.executor.add_node(server_node) + clock_node.advance_time(0) + self._expect_future_not_done(got_goal_future) + + client_node = ActionClientTestNode() + self.executor.add_node(client_node) + self._expect_future_not_done(got_goal_future) + for i in range(300): + order = (i % 40) + 1 # Don't want sequence to get too big + goal_acknowledged_future = client_node.send_goal(order) + + self._expect_future_done(got_goal_future) + self._expect_future_done(goal_acknowledged_future) + req: test_msgs.action.Fibonacci.Goal | None = got_goal_future.result() + assert req is not None + self.assertEqual(req.order, order) + result_future = client_node.expect_result() + + while True: + got_feedback_future = client_node.expect_feedback() + seq = server_node.advance_feedback() + if seq is None: + break + self._expect_future_done(got_feedback_future) + feedback = got_feedback_future.result() + assert feedback is not None + self.assertEqual(len(feedback.sequence), len(seq)) + + last_handle = server_node.execute() + self._expect_future_done(result_future) + self.assertFalse(got_feedback_future.done()) + + res: test_msgs.action.Fibonacci.Result | None = result_future.result() + assert res is not None + self.assertEqual(len(res.sequence), order + 1) + + got_goal_future = server_node.expect_goal() + + # Test completed goal expiration by time + self.assertEqual( + last_handle.status, action_msgs.msg.GoalStatus.STATUS_SUCCEEDED + ) + clock_node.advance_time(9999) + self._spin_for(0.2) + self.assertEqual( + last_handle.status, action_msgs.msg.GoalStatus.STATUS_SUCCEEDED + ) + clock_node.advance_time(2) + self._spin_for(0.2) + self.assertEqual(last_handle.status, action_msgs.msg.GoalStatus.STATUS_UNKNOWN) + + # Destroy server node and retry issuing a goal + self.executor.remove_node(server_node) + server_node.destroy_node() + self._expect_future_not_done(got_goal_future) + goal_acknowledged_future = client_node.send_goal(5) + self._expect_future_not_done(got_goal_future) + self.assertFalse(goal_acknowledged_future.done()) # Already waited a bit + + +if __name__ == "__main__": + unittest.main()