Skip to content

Commit

Permalink
feat: Add ClockWaiter and ClockConditionalVariable
Browse files Browse the repository at this point in the history
Added two new synchronization primitives to perform waits
using an rclcpp::Clock.

Signed-off-by: Janosch Machowinski <[email protected]>
  • Loading branch information
Janosch Machowinski authored and Janosch Machowinski committed Dec 2, 2024
1 parent 785a70d commit a1f5ae7
Show file tree
Hide file tree
Showing 2 changed files with 305 additions and 0 deletions.
102 changes: 102 additions & 0 deletions rclcpp/include/rclcpp/clock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,108 @@ class Clock
std::shared_ptr<Impl> impl_;
};

/**
* A synchronization primitive, equal to std::conditional_variable,
* that works with the rclcpp::Clock.
*
* For more information on the API see https://en.cppreference.com/w/cpp/thread/condition_variable.
*
* Note, this class does not handle shutdowns, if you want to
* haven them handles as well, use ClockConditionalVariable.
*/
class ClockWaiter
{
private:
class ClockWaiterImpl;
std::unique_ptr<ClockWaiterImpl> impl_;

public:
RCLCPP_SMART_PTR_DEFINITIONS(ClockWaiter)

RCLCPP_PUBLIC
ClockWaiter(const rclcpp::Clock::SharedPtr &clock);

RCLCPP_PUBLIC
~ClockWaiter();

/**
* Calling this function will block the current thread, until abs_time is reached,
* or pred returns true.
* @param lock A locked lock. The lock must be locked at call time, or this method will throw.
* The lock will be atomically released and this thread will blocked.
* @param abs_time The time until which this thread shall be blocked.
* @param pred may be called in cased of spurious wakeups, but must be called every time
* notify_one() was called. During the call to pred, the given lock will be locked.
* This method will return, if pred returns true.
*/
RCLCPP_PUBLIC
bool
wait_until(std::unique_lock<std::mutex>& lock,
const rclcpp::Time& abs_time, const std::function<bool ()> &pred );

/**
* Notify the blocked thread, that is should reevaluate the wakeup condition.
* E.g. the given pred function in wait_until shall be reevaluated.
*/
RCLCPP_PUBLIC
void notify_one();
};


/**
* A synchronization primitive, similar to std::conditional_variable,
* that works with the rclcpp::Clock.
*
* For more information on the API see https://en.cppreference.com/w/cpp/thread/condition_variable.
*
* This primitive will wake up if the context was shut down.
*/
class ClockConditionalVariable
{
class Impl;
std::unique_ptr<Impl> impl_;

public:
RCLCPP_SMART_PTR_DEFINITIONS(ClockConditionalVariable)

RCLCPP_PUBLIC
ClockConditionalVariable(rclcpp::Clock::SharedPtr &clock, rclcpp::Context::SharedPtr context = rclcpp::contexts::get_global_default_context());
RCLCPP_PUBLIC
~ClockConditionalVariable();

/**
* Calling this function will block the current thread, until abs_time is reached,
* or pred returns true.
* @param lock A locked lock. The lock must be locked at call time, or this method will throw.
* The lock will be atomically released and this thread will blocked.
* The given lock must be created using the mutex returned my mutex().
* @param abs_time The time until which this thread shall be blocked.
* @param pred may be called in cased of spurious wakeups, but must be called every time
* notify_one() was called. During the call to pred, the given lock will be locked.
* This method will return, if pred returns true.
*
* @return true if until was reached.
*/
RCLCPP_PUBLIC
bool wait_until(std::unique_lock<std::mutex>& lock, rclcpp::Time until, const std::function<bool ()> &pred);

/**
* Notify the blocked thread, that is should reevaluate the wakeup condition.
* E.g. the given pred function in wait_until shall be reevaluated.
*/
RCLCPP_PUBLIC
void notify_one();

/**
* Returns the internal mutex. In order to be race free with the context shutdown,
* this mutex must be used for the wait_until call.
*/
RCLCPP_PUBLIC
std::mutex &mutex();
};



} // namespace rclcpp

#endif // RCLCPP__CLOCK_HPP_
203 changes: 203 additions & 0 deletions rclcpp/src/rclcpp/clock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -367,4 +367,207 @@ Clock::create_jump_callback(
// *INDENT-ON*
}

class ClockWaiter::ClockWaiterImpl
{
private:
std::condition_variable cv_;

rclcpp::Clock::SharedPtr clock_;
bool time_source_changed_ = false;
std::function<void (const rcl_time_jump_t &)> post_time_jump_callback;

bool
wait_until_system_time(std::unique_lock<std::mutex>& lock,
const rclcpp::Time& abs_time, const std::function<bool ()> &pred )
{
auto system_time = std::chrono::system_clock::time_point(
// Cast because system clock resolution is too big for nanoseconds on some systems
std::chrono::duration_cast<std::chrono::system_clock::duration>(
std::chrono::nanoseconds(abs_time.nanoseconds())));

return cv_.wait_until(lock, system_time, pred);
}

bool
wait_until_steady_time(std::unique_lock<std::mutex>& lock,
const rclcpp::Time& abs_time, const std::function<bool ()> &pred )
{
// Synchronize because RCL steady clock epoch might differ from chrono::steady_clock epoch
const rclcpp::Time rcl_entry = clock_->now();
const std::chrono::steady_clock::time_point chrono_entry = std::chrono::steady_clock::now();
const rclcpp::Duration delta_t = abs_time - rcl_entry;
const std::chrono::steady_clock::time_point chrono_until =
chrono_entry + std::chrono::nanoseconds(delta_t.nanoseconds());

return cv_.wait_until(lock, chrono_until, pred);
}


bool
wait_until_ros_time(std::unique_lock<std::mutex>& lock,
const rclcpp::Time& abs_time, const std::function<bool ()> &pred )
{
// Install jump handler for any amount of time change, for two purposes:
// - if ROS time is active, check if time reached on each new clock sample
// - Trigger via on_clock_change to detect if time source changes, to invalidate sleep
rcl_jump_threshold_t threshold;
threshold.on_clock_change = true;
// 0 is disable, so -1 and 1 are smallest possible time changes
threshold.min_backward.nanoseconds = -1;
threshold.min_forward.nanoseconds = 1;

time_source_changed_ = false;

auto clock_handler = clock_->create_jump_callback(
nullptr,
post_time_jump_callback,
threshold);

if (!clock_->ros_time_is_active()) {
auto system_time = std::chrono::system_clock::time_point(
// Cast because system clock resolution is too big for nanoseconds on some systems
std::chrono::duration_cast<std::chrono::system_clock::duration>(
std::chrono::nanoseconds(abs_time.nanoseconds())));

return cv_.wait_until(lock, system_time, [this, &pred] () {
return time_source_changed_ || pred();
});
}


// RCL_ROS_TIME with ros_time_is_active.
// Just wait without "until" because installed
// jump callbacks wake the cv on every new sample.
cv_.wait(lock, [this, &pred, &abs_time] () {
return clock_->now() >= abs_time || time_source_changed_ || pred();
});

return clock_->now() < abs_time;
}


public:
ClockWaiterImpl(const rclcpp::Clock::SharedPtr &clock) :
clock_(clock)
{
}

bool
wait_until(std::unique_lock<std::mutex>& lock,
const rclcpp::Time& abs_time, const std::function<bool ()> &pred )
{
switch(clock_->get_clock_type())
{
case RCL_CLOCK_UNINITIALIZED:
throw std::runtime_error("Error, wait on uninitialized clock called");
case RCL_ROS_TIME:
return wait_until_ros_time(lock, abs_time, pred);
break;
case RCL_STEADY_TIME:
return wait_until_steady_time(lock, abs_time, pred);
break;
case RCL_SYSTEM_TIME:
return wait_until_system_time(lock, abs_time, pred);
break;
}

return false;
}

void notify_one()
{
cv_.notify_one();
}
};

ClockWaiter::ClockWaiter(const rclcpp::Clock::SharedPtr &clock) :
impl_(std::make_unique<ClockWaiterImpl>(clock))
{
}

ClockWaiter::~ClockWaiter() = default;

bool ClockWaiter::wait_until(std::unique_lock<std::mutex>& lock,
const rclcpp::Time& abs_time, const std::function<bool ()> &pred )
{
return impl_->wait_until(lock, abs_time, pred);
}

void ClockWaiter::notify_one()
{
impl_->notify_one();
}

class ClockConditionalVariable::Impl
{
std::mutex pred_mutex_;
bool shutdown_ = false;
rclcpp::Context::SharedPtr context_;
rclcpp::OnShutdownCallbackHandle shutdown_cb_handle_;
ClockWaiter::UniquePtr clock_;
public:

Impl(rclcpp::Clock::SharedPtr &clock, rclcpp::Context::SharedPtr context) :
context_(context),
clock_(std::make_unique<ClockWaiter>(clock))
{
if (!context_ || !context_->is_valid()) {
throw std::runtime_error("context cannot be slept with because it's invalid");
}
// Wake this thread if the context is shutdown
shutdown_cb_handle_ = context_->add_on_shutdown_callback(
[this]() {
{
std::unique_lock lock(pred_mutex_);
shutdown_ = true;
}
clock_->notify_one();
});
}

bool wait_until(std::unique_lock<std::mutex>& lock, rclcpp::Time until, const std::function<bool ()> &pred)
{
if(lock.mutex() != &pred_mutex_)
{
throw std::runtime_error("ClockConditionalVariable::wait_until: Internal error, given lock does not use mutex returned by this->mutex()");
}

clock_->wait_until(lock, until, [this, &pred] () -> bool {
return shutdown_ || pred(); });
return true;
}

void notify_one()
{
clock_->notify_one();
}

std::mutex &mutex()
{
return pred_mutex_;
};
};

ClockConditionalVariable::ClockConditionalVariable(rclcpp::Clock::SharedPtr &clock, rclcpp::Context::SharedPtr context) :
impl_(std::make_unique<Impl>(clock, context))
{
}

ClockConditionalVariable::~ClockConditionalVariable() = default;

void ClockConditionalVariable::notify_one()
{
impl_->notify_one();
}

bool ClockConditionalVariable::wait_until(std::unique_lock<std::mutex>& lock, rclcpp::Time until, const std::function<bool ()> &pred)
{
return impl_->wait_until(lock, until, pred);
}

std::mutex &ClockConditionalVariable::mutex()
{
return impl_->mutex();
}

} // namespace rclcpp

0 comments on commit a1f5ae7

Please sign in to comment.