From f858f736d2bc160bb49bb578322ccfc0d3ce25e6 Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Mon, 2 Dec 2024 17:53:09 +0100 Subject: [PATCH 1/2] feat: Add ClockWaiter and ClockConditionalVariable Added two new synchronization primitives to perform waits using an rclcpp::Clock. Signed-off-by: Janosch Machowinski --- rclcpp/include/rclcpp/clock.hpp | 111 +++++++++++++++ rclcpp/src/rclcpp/clock.cpp | 236 ++++++++++++++++++++++++++++++++ 2 files changed, 347 insertions(+) diff --git a/rclcpp/include/rclcpp/clock.hpp b/rclcpp/include/rclcpp/clock.hpp index 2050bf53e6..6cbbed5d18 100644 --- a/rclcpp/include/rclcpp/clock.hpp +++ b/rclcpp/include/rclcpp/clock.hpp @@ -260,6 +260,117 @@ class Clock std::shared_ptr impl_; }; +/** + * A synchronization primitive, equal to std::conditional_variable, + * that works with the rclcpp::Clock. + * + * For more information on the API see https://en.cppreference.com/w/cpp/thread/condition_variable. + * + * Note, this class does not handle shutdowns, if you want to + * haven them handles as well, use ClockConditionalVariable. + */ +class ClockWaiter +{ +private: + class ClockWaiterImpl; + std::unique_ptr impl_; + +public: + RCLCPP_SMART_PTR_DEFINITIONS(ClockWaiter) + + RCLCPP_PUBLIC + explicit ClockWaiter(const rclcpp::Clock::SharedPtr & clock); + + RCLCPP_PUBLIC + ~ClockWaiter(); + + /** + * Calling this function will block the current thread, until abs_time is reached, + * or pred returns true. + * @param lock A locked lock. The lock must be locked at call time, or this method will throw. + * The lock will be atomically released and this thread will blocked. + * @param abs_time The time until which this thread shall be blocked. + * @param pred may be called in cased of spurious wakeups, but must be called every time + * notify_one() was called. During the call to pred, the given lock will be locked. + * This method will return, if pred returns true. + */ + RCLCPP_PUBLIC + bool + wait_until( + std::unique_lock & lock, + const rclcpp::Time & abs_time, const std::function & pred); + + /** + * Notify the blocked thread, that it should reevaluate the wakeup condition. + * The given pred function in wait_until will be reevaluated and wait_until + * will return if it evaluates to true. + */ + RCLCPP_PUBLIC + void + notify_one(); +}; + + +/** + * A synchronization primitive, similar to std::conditional_variable, + * that works with the rclcpp::Clock. + * + * For more information on the API see https://en.cppreference.com/w/cpp/thread/condition_variable. + * + * This primitive will wake up if the context was shut down. + */ +class ClockConditionalVariable +{ + class Impl; + std::unique_ptr impl_; + +public: + RCLCPP_SMART_PTR_DEFINITIONS(ClockConditionalVariable) + + RCLCPP_PUBLIC + ClockConditionalVariable( + rclcpp::Clock::SharedPtr & clock, + rclcpp::Context::SharedPtr context = rclcpp::contexts::get_global_default_context()); + RCLCPP_PUBLIC + ~ClockConditionalVariable(); + + /** + * Calling this function will block the current thread, until abs_time is reached, + * or pred returns true. + * @param lock A locked lock. The lock must be locked at call time, or this method will throw. + * The lock will be atomically released and this thread will blocked. + * The given lock must be created using the mutex returned my mutex(). + * @param abs_time The time until which this thread shall be blocked. + * @param pred may be called in cased of spurious wakeups, but must be called every time + * notify_one() was called. During the call to pred, the given lock will be locked. + * This method will return, if pred returns true. + * + * @return true if until was reached. + */ + RCLCPP_PUBLIC + bool + wait_until( + std::unique_lock & lock, rclcpp::Time until, + const std::function & pred); + + /** + * Notify the blocked thread, that is should reevaluate the wakeup condition. + * E.g. the given pred function in wait_until shall be reevaluated. + */ + RCLCPP_PUBLIC + void + notify_one(); + + /** + * Returns the internal mutex. In order to be race free with the context shutdown, + * this mutex must be used for the wait_until call. + */ + RCLCPP_PUBLIC + std::mutex & + mutex(); +}; + + } // namespace rclcpp #endif // RCLCPP__CLOCK_HPP_ diff --git a/rclcpp/src/rclcpp/clock.cpp b/rclcpp/src/rclcpp/clock.cpp index 5c13f19d13..085469f726 100644 --- a/rclcpp/src/rclcpp/clock.cpp +++ b/rclcpp/src/rclcpp/clock.cpp @@ -367,4 +367,240 @@ Clock::create_jump_callback( // *INDENT-ON* } +class ClockWaiter::ClockWaiterImpl +{ +private: + std::condition_variable cv_; + + rclcpp::Clock::SharedPtr clock_; + bool time_source_changed_ = false; + std::function post_time_jump_callback; + + bool + wait_until_system_time( + std::unique_lock & lock, + const rclcpp::Time & abs_time, const std::function & pred) + { + auto system_time = std::chrono::system_clock::time_point( + // Cast because system clock resolution is too big for nanoseconds on some systems + std::chrono::duration_cast( + std::chrono::nanoseconds(abs_time.nanoseconds()))); + + return cv_.wait_until(lock, system_time, pred); + } + + bool + wait_until_steady_time( + std::unique_lock & lock, + const rclcpp::Time & abs_time, const std::function & pred) + { + // Synchronize because RCL steady clock epoch might differ from chrono::steady_clock epoch + const rclcpp::Time rcl_entry = clock_->now(); + const std::chrono::steady_clock::time_point chrono_entry = std::chrono::steady_clock::now(); + const rclcpp::Duration delta_t = abs_time - rcl_entry; + const std::chrono::steady_clock::time_point chrono_until = + chrono_entry + std::chrono::nanoseconds(delta_t.nanoseconds()); + + return cv_.wait_until(lock, chrono_until, pred); + } + + + bool + wait_until_ros_time( + std::unique_lock & lock, + const rclcpp::Time & abs_time, const std::function & pred) + { + // Install jump handler for any amount of time change, for two purposes: + // - if ROS time is active, check if time reached on each new clock sample + // - Trigger via on_clock_change to detect if time source changes, to invalidate sleep + rcl_jump_threshold_t threshold; + threshold.on_clock_change = true; + // 0 is disable, so -1 and 1 are smallest possible time changes + threshold.min_backward.nanoseconds = -1; + threshold.min_forward.nanoseconds = 1; + + time_source_changed_ = false; + + post_time_jump_callback = [this, &lock] (const rcl_time_jump_t & jump) + { + if (jump.clock_change != RCL_ROS_TIME_NO_CHANGE) { + std::lock_guard lk(*lock.mutex()); + time_source_changed_ = true; + } + cv_.notify_one(); + }; + + // Note this is a trade-off. Adding the callback for every call + // is expensive for high frequency calls. For low frequency waits + // its more overhead to have the callback being called all the time. + // As we expect the use case to be low frequency calls to wait_until + // with relative big pauses between the calls, we install it on demand. + auto clock_handler = clock_->create_jump_callback( + nullptr, + post_time_jump_callback, + threshold); + + if (!clock_->ros_time_is_active()) { + auto system_time = std::chrono::system_clock::time_point( + // Cast because system clock resolution is too big for nanoseconds on some systems + std::chrono::duration_cast( + std::chrono::nanoseconds(abs_time.nanoseconds()))); + + return cv_.wait_until(lock, system_time, [this, &pred] () { + return time_source_changed_ || pred(); + }); + } + + // RCL_ROS_TIME with ros_time_is_active. + // Just wait without "until" because installed + // jump callbacks wake the cv on every new sample. + cv_.wait(lock, [this, &pred, &abs_time] () { + return clock_->now() >= abs_time || time_source_changed_ || pred(); + }); + + return clock_->now() < abs_time; + } + +public: + explicit ClockWaiterImpl(const rclcpp::Clock::SharedPtr & clock) + :clock_(clock) + { + } + + bool + wait_until( + std::unique_lock & lock, + const rclcpp::Time & abs_time, const std::function & pred) + { + switch(clock_->get_clock_type()) { + case RCL_CLOCK_UNINITIALIZED: + throw std::runtime_error("Error, wait on uninitialized clock called"); + case RCL_ROS_TIME: + return wait_until_ros_time(lock, abs_time, pred); + break; + case RCL_STEADY_TIME: + return wait_until_steady_time(lock, abs_time, pred); + break; + case RCL_SYSTEM_TIME: + return wait_until_system_time(lock, abs_time, pred); + break; + } + + return false; + } + + void + notify_one() + { + cv_.notify_one(); + } +}; + +ClockWaiter::ClockWaiter(const rclcpp::Clock::SharedPtr & clock) +:impl_(std::make_unique(clock)) +{ +} + +ClockWaiter::~ClockWaiter() = default; + +bool +ClockWaiter::wait_until( + std::unique_lock & lock, + const rclcpp::Time & abs_time, const std::function & pred) +{ + return impl_->wait_until(lock, abs_time, pred); +} + +void +ClockWaiter::notify_one() +{ + impl_->notify_one(); +} + +class ClockConditionalVariable::Impl +{ + std::mutex pred_mutex_; + bool shutdown_ = false; + rclcpp::Context::SharedPtr context_; + rclcpp::OnShutdownCallbackHandle shutdown_cb_handle_; + ClockWaiter::UniquePtr clock_; + +public: + Impl(rclcpp::Clock::SharedPtr & clock, rclcpp::Context::SharedPtr context) + :context_(context), + clock_(std::make_unique(clock)) + { + if (!context_ || !context_->is_valid()) { + throw std::runtime_error("context cannot be slept with because it's invalid"); + } + // Wake this thread if the context is shutdown + shutdown_cb_handle_ = context_->add_on_shutdown_callback( + [this]() { + { + std::unique_lock lock(pred_mutex_); + shutdown_ = true; + } + clock_->notify_one(); + }); + } + + bool + wait_until( + std::unique_lock & lock, rclcpp::Time until, + const std::function & pred) + { + if(lock.mutex() != &pred_mutex_) { + throw std::runtime_error( + "ClockConditionalVariable::wait_until: Internal error, given lock does not use" + " mutex returned by this->mutex()"); + } + + clock_->wait_until(lock, until, [this, &pred] () -> bool { + return shutdown_ || pred(); + }); + return true; + } + + void + notify_one() + { + clock_->notify_one(); + } + + std::mutex & + mutex() + { + return pred_mutex_; + } +}; + +ClockConditionalVariable::ClockConditionalVariable( + rclcpp::Clock::SharedPtr & clock, + rclcpp::Context::SharedPtr context) +:impl_(std::make_unique(clock, context)) +{ +} + +ClockConditionalVariable::~ClockConditionalVariable() = default; + +void +ClockConditionalVariable::notify_one() +{ + impl_->notify_one(); +} + +bool +ClockConditionalVariable::wait_until( + std::unique_lock & lock, rclcpp::Time until, + const std::function & pred) +{ + return impl_->wait_until(lock, until, pred); +} + +std::mutex & +ClockConditionalVariable::mutex() +{ + return impl_->mutex(); +} + } // namespace rclcpp From e16b8f9d8e6283ce5433c6ed900af0a3959a0e4c Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Mon, 2 Dec 2024 17:53:32 +0100 Subject: [PATCH 2/2] fix: Deprecate broken API Signed-off-by: Janosch Machowinski --- rclcpp/include/rclcpp/clock.hpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/rclcpp/include/rclcpp/clock.hpp b/rclcpp/include/rclcpp/clock.hpp index 6cbbed5d18..874ac04a56 100644 --- a/rclcpp/include/rclcpp/clock.hpp +++ b/rclcpp/include/rclcpp/clock.hpp @@ -194,12 +194,17 @@ class Clock ros_time_is_active(); /** + * Deprecated. This API is broken, as there is no way to get a deep + * copy of a clock. Therefore one can experience spurious wakeups triggered + * by some other instance of a clock. + * * Cancels an ongoing or future sleep operation of one thread. * * This function can be used by one thread, to wakeup another thread that is * blocked using any of the sleep_ or wait_ methods of this class. */ RCLCPP_PUBLIC + [[deprecated("Use ClockConditionalVariable")]] void cancel_sleep_or_wait();