diff --git a/rclrs/Cargo.toml b/rclrs/Cargo.toml index f489ea3f0..ecd5ecdcd 100644 --- a/rclrs/Cargo.toml +++ b/rclrs/Cargo.toml @@ -49,6 +49,7 @@ dyn_msg = ["ament_rs", "libloading"] # This feature is solely for the purpose of being able to generate documetation without a ROS installation # The only intended usage of this feature is for docs.rs builders to work, and is not intended to be used by end users generate_docs = ["rosidl_runtime_rs/generate_docs"] +serde = [] [package.metadata.docs.rs] features = ["generate_docs"] diff --git a/rclrs/src/clock.rs b/rclrs/src/clock.rs index f7c085e14..311eed9ee 100644 --- a/rclrs/src/clock.rs +++ b/rclrs/src/clock.rs @@ -28,7 +28,7 @@ impl From for rcl_clock_type_t { #[derive(Clone, Debug)] pub struct Clock { kind: ClockType, - rcl_clock: Arc>, + pub(crate) rcl_clock: Arc>, // TODO(luca) Implement jump callbacks } diff --git a/rclrs/src/executor.rs b/rclrs/src/executor.rs index 37c43a68e..530bb51db 100644 --- a/rclrs/src/executor.rs +++ b/rclrs/src/executor.rs @@ -61,6 +61,10 @@ impl SingleThreadedExecutor { for ready_service in ready_entities.services { ready_service.execute()?; } + + for ready_timer in ready_entities.timers { + ready_timer.lock().unwrap().execute()?; + } } Ok(()) @@ -82,3 +86,28 @@ impl SingleThreadedExecutor { Ok(()) } } + +#[cfg(test)] +mod tests { + use crate::{spin_once, Context}; + + use super::*; + + #[test] + fn spin_once_fires_timer() -> Result<(), RclrsError> { + let context = Context::new([])?; + let node = Node::new(&context, "test_spin_timer")?; + + let callback_triggered = Arc::new(Mutex::new(0)); + let callback_flag = Arc::clone(&callback_triggered); + + let _timer = node.create_timer(Duration::from_secs(0), move |_| { + *callback_flag.lock().unwrap() += 1; + })?; + + spin_once(node, Some(Duration::ZERO))?; + + assert_eq!(*callback_triggered.lock().unwrap(), 1); + Ok(()) + } +} diff --git a/rclrs/src/lib.rs b/rclrs/src/lib.rs index 3a22c6da8..ddb3e9c3f 100644 --- a/rclrs/src/lib.rs +++ b/rclrs/src/lib.rs @@ -20,6 +20,7 @@ mod service; mod subscription; mod time; mod time_source; +mod timer; mod vendor; mod wait; @@ -49,6 +50,7 @@ pub use service::*; pub use subscription::*; pub use time::*; use time_source::*; +pub use timer::*; pub use wait::*; /// Polls the node for new messages and executes the corresponding callbacks. diff --git a/rclrs/src/logging.rs b/rclrs/src/logging.rs index 3943183c7..36e8554ae 100644 --- a/rclrs/src/logging.rs +++ b/rclrs/src/logging.rs @@ -120,6 +120,12 @@ pub struct LogConditions { pub log_if_true: bool, } +impl Default for LogConditions { + fn default() -> Self { + Self::new() + } +} + impl LogConditions { /// Default construct an instance pub fn new() -> Self { diff --git a/rclrs/src/node.rs b/rclrs/src/node.rs index 394c5f740..5a4aa10d8 100644 --- a/rclrs/src/node.rs +++ b/rclrs/src/node.rs @@ -6,6 +6,7 @@ use std::{ fmt, os::raw::c_char, sync::{atomic::AtomicBool, Arc, Mutex, Weak}, + time::Duration, vec::Vec, }; @@ -16,7 +17,7 @@ use crate::{ rcl_bindings::*, Client, ClientBase, Clock, Context, ContextHandle, GuardCondition, ParameterBuilder, ParameterInterface, ParameterVariant, Parameters, Publisher, QoSProfile, RclrsError, Service, ServiceBase, Subscription, SubscriptionBase, SubscriptionCallback, - TimeSource, ENTITY_LIFECYCLE_MUTEX, + TimeSource, Timer, TimerBase, TimerCallback, ENTITY_LIFECYCLE_MUTEX, }; // SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread @@ -47,7 +48,7 @@ unsafe impl Send for rcl_node_t {} /// The namespace and name given when creating the node can be overridden through the command line. /// In that sense, the parameters to the node creation functions are only the _default_ namespace and /// name. -/// See also the [official tutorial][1] on the command line arguments for ROS nodes, and the +/// See also the [official tutorial][2] on the command line arguments for ROS nodes, and the /// [`Node::namespace()`] and [`Node::name()`] functions for examples. /// /// ## Rules for valid names @@ -63,6 +64,7 @@ pub struct Node { pub(crate) guard_conditions_mtx: Mutex>>, pub(crate) services_mtx: Mutex>>, pub(crate) subscriptions_mtx: Mutex>>, + pub(crate) timers_mtx: Mutex>>>, time_source: TimeSource, parameter: ParameterInterface, pub(crate) handle: Arc, @@ -339,6 +341,35 @@ impl Node { Ok(subscription) } + /// Create a [`Timer`][1] that will use the node's clock. + /// + /// A Timer may be modified via the `Arc` returned by this function or from + /// within its callback. + /// A weak reference counter to the `Timer` is stored within this node. + /// + /// [1]: crate::Timer + pub fn create_timer( + &self, + period: Duration, + callback: F, + ) -> Result>, RclrsError> + where + F: TimerCallback + 'static, + { + let timer = Arc::new(Mutex::new(Timer::new_with_context_handle( + Arc::clone(&self.handle.context_handle), + self.get_clock(), + period, + callback, + )?)); + + self.timers_mtx + .lock() + .unwrap() + .push(Arc::downgrade(&timer) as Weak>); + Ok(timer) + } + /// Returns the subscriptions that have not been dropped yet. pub(crate) fn live_subscriptions(&self) -> Vec> { { self.subscriptions_mtx.lock().unwrap() } @@ -368,6 +399,15 @@ impl Node { .collect() } + pub(crate) fn live_timers(&self) -> Vec>> { + self.timers_mtx + .lock() + .unwrap() + .iter() + .filter_map(Weak::upgrade) + .collect() + } + /// Returns the ROS domain ID that the node is using. /// /// The domain ID controls which nodes can send messages to each other, see the [ROS 2 concept article][1]. diff --git a/rclrs/src/node/builder.rs b/rclrs/src/node/builder.rs index f8df82101..a5e76eba6 100644 --- a/rclrs/src/node/builder.rs +++ b/rclrs/src/node/builder.rs @@ -319,6 +319,7 @@ impl NodeBuilder { guard_conditions_mtx: Mutex::new(vec![]), services_mtx: Mutex::new(vec![]), subscriptions_mtx: Mutex::new(vec![]), + timers_mtx: Mutex::new(vec![]), time_source: TimeSource::builder(self.clock_type) .clock_qos(self.clock_qos) .build(), diff --git a/rclrs/src/publisher.rs b/rclrs/src/publisher.rs index 2935ca322..adf449fee 100644 --- a/rclrs/src/publisher.rs +++ b/rclrs/src/publisher.rs @@ -108,7 +108,7 @@ where // * The rcl_node is kept alive by the NodeHandle because it is a dependency of the publisher. // * The topic name and the options are copied by this function, so they can be dropped afterwards. // * The entity lifecycle mutex is locked to protect against the risk of global - // variables in the rmw implementation being unsafely modified during cleanup. + // variables in the rmw implementation being unsafely modified during initialization. rcl_publisher_init( &mut rcl_publisher, &*rcl_node, diff --git a/rclrs/src/subscription.rs b/rclrs/src/subscription.rs index 05b01beb5..3ecd7513a 100644 --- a/rclrs/src/subscription.rs +++ b/rclrs/src/subscription.rs @@ -126,7 +126,7 @@ where // * The rcl_node is kept alive by the NodeHandle because it is a dependency of the subscription. // * The topic name and the options are copied by this function, so they can be dropped afterwards. // * The entity lifecycle mutex is locked to protect against the risk of global - // variables in the rmw implementation being unsafely modified during cleanup. + // variables in the rmw implementation being unsafely modified during initialization. rcl_subscription_init( &mut rcl_subscription, &*rcl_node, diff --git a/rclrs/src/timer.rs b/rclrs/src/timer.rs new file mode 100644 index 000000000..aceeb1daf --- /dev/null +++ b/rclrs/src/timer.rs @@ -0,0 +1,421 @@ +use crate::{ + clock::Clock, + rcl_bindings::{ + rcl_get_zero_initialized_timer, rcl_timer_call, rcl_timer_cancel, + rcl_timer_exchange_period, rcl_timer_fini, rcl_timer_get_period, + rcl_timer_get_time_since_last_call, rcl_timer_get_time_until_next_call, rcl_timer_init, + rcl_timer_is_canceled, rcl_timer_is_ready, rcl_timer_reset, rcl_timer_t, + rcutils_get_default_allocator, + }, + Context, ContextHandle, RclReturnCode, RclrsError, ToResult, ENTITY_LIFECYCLE_MUTEX, +}; +use std::{ + sync::{atomic::AtomicBool, Arc, Mutex, MutexGuard}, + time::Duration, +}; + +/// A trait for the callback function of a timer. +pub trait TimerCallback: FnMut(&mut Timer) + Send + Sync {} + +// Blanket implementation of TimerCallback for all types that implement the necessary traits. +impl TimerCallback for T {} + +// SAFETY: The functions accessing this type, including drop(), shouldn't care +// about the thread they are running in (partly because they're protected by mutex). +// Therefore, this type can be safely sent to another thread. +unsafe impl Send for rcl_timer_t {} + +/// Manage the lifecycle of an `rcl_timer_t` and its dependency on +/// `rcl_clock_t` and `rcl_context_t` by ensuring that these dependencies are +/// [dropped after][1] the `rcl_timer_t`. +/// +/// [1]: +pub struct TimerHandle { + rcl_timer: Mutex, + _clock: Clock, + _context_handle: Arc, + pub(crate) in_use_by_wait_set: Arc, +} + +impl TimerHandle { + pub(crate) fn lock(&self) -> MutexGuard { + self.rcl_timer.lock().unwrap() + } +} + +impl Drop for TimerHandle { + fn drop(&mut self) { + let rcl_timer = self.rcl_timer.get_mut().unwrap(); + let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap(); + // SAFETY: The entity lifecycle mutex is locked to protect against the risk of + // global variables in the rmw implementation being unsafely modified during cleanup. + unsafe { + rcl_timer_fini(rcl_timer); + } + } +} + +/// Trait to be implemented by concrete [`Timer`]s. +pub trait TimerBase: Send + Sync { + /// Internal function to get a reference to the `rcl` handle. + fn handle(&self) -> &TimerHandle; + /// Tries to call the timer and run the associated callback. + /// Timers are allowed to modify themselves while being executed. + fn execute(&mut self) -> Result<(), RclrsError>; +} + +/// A struct for triggering a callback at regular intervals. +/// +/// If created via [`Node::create_timer()`][1], calling [`spin_once`][2] or [`spin`][3] +/// on the timer's node will wait until the configured period of time has passed since +/// the timer was last called (or since it was created) before triggering the timer's callback. +/// +/// If created via [`Timer::new`], [`is_ready`][4] must be polled until the timer has +/// expired, after which [`execute`][5] must be called to trigger the timer's callback. +/// The timer can also be added to a [`WaitSet`][6] to block until it is ready. +/// +/// [1]: crate::Node::create_timer +/// [2]: crate::spin_once +/// [3]: crate::spin +/// [4]: crate::Timer::is_ready +/// [5]: crate::Timer::execute +/// [6]: crate::WaitSet +pub struct Timer { + callback: Arc>, + handle: TimerHandle, +} + +impl Timer { + /// Creates a new `Timer` with the given period and callback. + /// Periods greater than i64::MAX nanoseconds will saturate to i64::MAX. + /// + /// Note that most of the time [`Node::create_timer`][1] is the better way to make + /// a new timer, as that will allow the timer to be triggered by spinning the node. + /// Timers created with [`Timer::new`] must be checked and executed by the user. + /// + /// [1]: crate::Node::create_timer + pub fn new( + context: &Context, + clock: Clock, + period: Duration, + callback: F, + ) -> Result + where + F: TimerCallback + 'static, + { + Timer::new_with_context_handle(Arc::clone(&context.handle), clock, period, callback) + } + + /// Version of [`Timer::new`] that takes a context handle directly. + pub(crate) fn new_with_context_handle( + context_handle: Arc, + clock: Clock, + period: Duration, + callback: F, + ) -> Result + where + F: TimerCallback + 'static, + { + let callback = Arc::new(Mutex::new(callback)); + + // SAFETY: Getting a zero-initialized value is always safe. + let mut rcl_timer = unsafe { rcl_get_zero_initialized_timer() }; + + let clock_clone = clock.rcl_clock.clone(); + let mut rcl_clock = clock_clone.lock().unwrap(); + + let context_handle_clone = context_handle.clone(); + let mut rcl_context = context_handle_clone.rcl_context.lock().unwrap(); + + // core::time::Duration will always be >= 0, so no need to check for negatives. + let period_nanos = i64::try_from(period.as_nanos()).unwrap_or(i64::MAX); + + // SAFETY: No preconditions for this function. + let allocator = unsafe { rcutils_get_default_allocator() }; + { + let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap(); + unsafe { + // SAFETY: + // * The rcl_timer is zero-initialized as mandated by this function. + // * The rcl_clock is kept alive by the Clock within TimerHandle because it is + // a dependency of the timer. + // * The rcl_context is kept alive by the ContextHandle within TimerHandle because + // it is a dependency of the timer. + // * The period is copied into this function so it can be dropped afterwards. + // * The callback is None / nullptr so doesn't need to be kept alive. + // * The entity lifecycle mutex is locked to protect against the risk of global + // variables in the rmw implementation being unsafely modified during initialization. + rcl_timer_init( + &mut rcl_timer, + &mut *rcl_clock, + &mut *rcl_context, + period_nanos, + None, + allocator, + ) + .ok()?; + } + } + + Ok(Self { + callback, + handle: TimerHandle { + rcl_timer: Mutex::new(rcl_timer), + _clock: clock, + _context_handle: context_handle, + in_use_by_wait_set: Arc::new(AtomicBool::new(false)), + }, + }) + } + + /// Calculates if the timer is ready to be called. + /// Returns true if the timer is due or past due to be called. + /// Returns false if the timer is not yet due or has been canceled. + pub fn is_ready(&self) -> bool { + let timer = self.handle.lock(); + let mut is_ready = false; + // SAFETY: + // * The timer is initialized, which is guaranteed by the constructor. + // * The is_ready pointer is allocated on the stack and is valid for the duration of this function. + let ret = unsafe { rcl_timer_is_ready(&*timer, &mut is_ready) }; + + // rcl_timer_is_ready should only error if incorrect arguments are given or something isn't initialised, + // both of which we control in this function. + debug_assert_eq!(RclReturnCode::try_from(ret).unwrap(), RclReturnCode::Ok); + + is_ready + } + + /// Get the time until the next call of the timer is due. Saturates to 0 if the timer is ready. + /// Returns [`RclReturnCode::TimerCanceled`] as an error if the timer has already been canceled. + pub fn time_until_next_call(&self) -> Result { + let timer = self.handle.lock(); + let mut remaining_time = 0; + // SAFETY: + // * The timer is initialized, which is guaranteed by the constructor. + // * The remaining_time pointer is allocated on the stack and is valid for the duration of this function. + unsafe { + rcl_timer_get_time_until_next_call(&*timer, &mut remaining_time).ok()?; + } + Ok(Duration::from_nanos( + u64::try_from(remaining_time).unwrap_or(0), + )) + } + + /// Get the time since the last call of the timer. + /// Calling this function within a callback will not return the time since the + /// previous call but instead the time since the current callback was called. + /// Saturates to 0 if the timer was last called in the future (i.e. the clock jumped). + pub fn time_since_last_call(&self) -> Duration { + let timer = self.handle.lock(); + let mut elapsed_time = 0; + // SAFETY: + // * The timer is initialized, which is guaranteed by the constructor. + // * The elapsed_time pointer is allocated on the stack and is valid for the duration of this function. + let ret = unsafe { rcl_timer_get_time_since_last_call(&*timer, &mut elapsed_time) }; + + // rcl_timer_get_time_since_last_call should only error if incorrect arguments are given + // or something isn't initialised, both of which we control in this function. + debug_assert_eq!(RclReturnCode::try_from(ret).unwrap(), RclReturnCode::Ok); + + Duration::from_nanos(u64::try_from(elapsed_time).unwrap_or(0)) + } + + /// Get the period of the timer. + pub fn get_period(&self) -> Duration { + let timer = self.handle.lock(); + let mut period = 0; + // SAFETY: + // * The timer is initialized, which is guaranteed by the constructor. + // * The period pointer is allocated on the stack and is valid for the duration of this function. + let ret = unsafe { rcl_timer_get_period(&*timer, &mut period) }; + + // rcl_timer_get_period should only error if incorrect arguments are given + // or something isn't initialised, both of which we control in this function. + debug_assert_eq!(RclReturnCode::try_from(ret).unwrap(), RclReturnCode::Ok); + + // The period should never be negative as we only expose (unsigned) Duration structs + // for setting, but if it is, saturate to 0. + Duration::from_nanos(u64::try_from(period).unwrap_or(0)) + } + + /// Set the period of the timer. Periods greater than i64::MAX nanoseconds will saturate to i64::MAX. + /// + /// The updated period will not take affect until either [`reset`][1] is called + /// or the timer next expires, whichever comes first. + /// + /// [1]: crate::Timer::reset + pub fn set_period(&self, period: Duration) { + let timer = self.handle.lock(); + let new_period = i64::try_from(period.as_nanos()).unwrap_or(i64::MAX); + let mut old_period = 0; + // SAFETY: + // * The timer is initialized, which is guaranteed by the constructor. + // * The new_period is copied into this function so it can be dropped afterwards. + // * The old_period pointer is allocated on the stack and is valid for the duration of this function. + let ret = unsafe { rcl_timer_exchange_period(&*timer, new_period, &mut old_period) }; + + // rcl_timer_exchange_period should only error if incorrect arguments are given + // or something isn't initialised, both of which we control in this function. + debug_assert_eq!(RclReturnCode::try_from(ret).unwrap(), RclReturnCode::Ok); + } + + /// Cancel the timer so it will no longer return ready. Can be restarted with [`reset`][1]. + /// + /// [1]: crate::timer::Timer::reset + pub fn cancel(&self) { + let mut timer = self.handle.lock(); + // SAFETY: The timer is initialized, which is guaranteed by the constructor. + let ret = unsafe { rcl_timer_cancel(&mut *timer) }; + + // rcl_timer_cancel should only error if incorrect arguments are given + // or something isn't initialised, both of which we control in this function. + debug_assert_eq!(RclReturnCode::try_from(ret).unwrap(), RclReturnCode::Ok); + } + + /// Check if the timer has been canceled. + pub fn is_canceled(&self) -> bool { + let timer = self.handle.lock(); + let mut canceled = false; + // SAFETY: + // * The timer is initialized, which is guaranteed by the constructor. + // * The canceled pointer is allocated on the stack and is valid for the duration of this function. + let ret = unsafe { rcl_timer_is_canceled(&*timer, &mut canceled) }; + + // rcl_timer_is_canceled should only error if incorrect arguments are given + // or something isn't initialised, both of which we control in this function. + debug_assert_eq!(RclReturnCode::try_from(ret).unwrap(), RclReturnCode::Ok); + + canceled + } + + /// Set the timer's last call time to now. Additionally marks canceled timers as not-canceled. + pub fn reset(&self) { + let mut timer = self.handle.lock(); + // SAFETY: The timer is initialized, which is guaranteed by the constructor. + let ret = unsafe { rcl_timer_reset(&mut *timer) }; + + // rcl_timer_reset should only error if incorrect arguments are given + // or something isn't initialised, both of which we control in this function. + debug_assert_eq!(RclReturnCode::try_from(ret).unwrap(), RclReturnCode::Ok); + } + + /// Internal function to check the timer is still valid and set the last call time in rcl. + /// Returns [`RclReturnCode::TimerCanceled`] as an error if the timer has already been canceled. + fn call_rcl(&self) -> Result<(), RclrsError> { + let mut timer = self.handle.lock(); + // SAFETY: Safe if the timer is initialized, which is guaranteed by the constructor. + unsafe { + rcl_timer_call(&mut *timer).ok()?; + } + Ok(()) + } +} + +impl TimerBase for Timer { + fn handle(&self) -> &TimerHandle { + &self.handle + } + + fn execute(&mut self) -> Result<(), RclrsError> { + // Timer still needs to be called within RCL, even though we handle the callback ourselves. + self.call_rcl()?; + + let callback = self.callback.clone(); + (*callback.lock().unwrap())(self); + + Ok(()) + } +} + +// Timer.rs does very little other than call rcl functions. +// To keep these tests easy to maintain, most of them just check the rcl functions +// can be called without returning errors. +#[cfg(test)] +mod tests { + use std::time::Duration; + + use crate::{Clock, Context}; + + use super::Timer; + + fn new_timer() -> Timer { + let context = Context::new([]).unwrap(); + + // This is technically a wall clock, but we have a period of 0 so it won't slow down unit testing. + let clock = Clock::system(); + + let timer = Timer::new(&context, clock, Duration::from_secs(0), |_| {}); + + timer.expect("Timer::new should not return an error") + } + + #[test] + fn creation() { + let _ = new_timer(); + } + + #[test] + fn is_ready() { + let timer = new_timer(); + + // Calling is_ready will trigger the debug_assert check on the rcl return value. + timer.is_ready(); + } + + #[test] + fn time_until_next_call() { + let timer = new_timer(); + + timer + .time_until_next_call() + .expect("Calling Timer::time_until_next_call on a non-canceled timer should not error"); + } + + #[test] + fn time_since_last_call() { + let timer = new_timer(); + + // Calling time_since_last_call will trigger the debug_assert check on the rcl return value. + timer.time_since_last_call(); + } + + #[test] + fn update_period() { + let timer = new_timer(); + + let new_period = Duration::from_millis(100); + + // Calling set_period will trigger the debug_assert check on the rcl return value. + timer.set_period(new_period); + + // Calling get_period will trigger the debug_assert check on the rcl return value. + let retrieved_period = timer.get_period(); + + assert_eq!(new_period, retrieved_period); + } + + #[test] + fn cancel_timer() { + let timer = new_timer(); + + // Calling is_canceled will trigger the debug_assert check on the rcl return value. + assert!(!timer.is_canceled()); + + // Calling cancel will trigger the debug_assert check on the rcl return value. + timer.cancel(); + + assert!(timer.is_canceled()); + } + + #[test] + fn reset_canceled_timer() { + let timer = new_timer(); + timer.cancel(); + + // Calling reset will trigger the debug_assert check on the rcl return value. + timer.reset(); + + assert!(!timer.is_canceled()); + } +} diff --git a/rclrs/src/wait.rs b/rclrs/src/wait.rs index 2ef99c026..3d4995087 100644 --- a/rclrs/src/wait.rs +++ b/rclrs/src/wait.rs @@ -15,12 +15,16 @@ // DISTRIBUTION A. Approved for public release; distribution unlimited. // OPSEC #4584. -use std::{sync::Arc, time::Duration, vec::Vec}; +use std::{ + sync::{Arc, Mutex}, + time::Duration, + vec::Vec, +}; use crate::{ error::{to_rclrs_result, RclReturnCode, RclrsError, ToResult}, rcl_bindings::*, - ClientBase, Context, ContextHandle, Node, ServiceBase, SubscriptionBase, + ClientBase, Context, ContextHandle, Node, ServiceBase, SubscriptionBase, TimerBase, }; mod exclusivity_guard; @@ -50,6 +54,8 @@ pub struct WaitSet { // The guard conditions that are currently registered in the wait set. guard_conditions: Vec>>, services: Vec>>, + // Timers need interior mutability to modify themselves within their callback. + timers: Vec>>>, handle: WaitSetHandle, } @@ -63,6 +69,9 @@ pub struct ReadyEntities { pub guard_conditions: Vec>, /// A list of services that have potentially received requests. pub services: Vec>, + /// A list of timers that are ready to be called. + // Timers need interior mutability to modify themselves within their callback. + pub timers: Vec>>, } impl Drop for rcl_wait_set_t { @@ -123,6 +132,7 @@ impl WaitSet { guard_conditions: Vec::new(), clients: Vec::new(), services: Vec::new(), + timers: Vec::new(), handle: WaitSetHandle { rcl_wait_set, context_handle: Arc::clone(&context.handle), @@ -138,13 +148,14 @@ impl WaitSet { let live_clients = node.live_clients(); let live_guard_conditions = node.live_guard_conditions(); let live_services = node.live_services(); + let live_timers = node.live_timers(); let ctx = Context { handle: Arc::clone(&node.handle.context_handle), }; let mut wait_set = WaitSet::new( live_subscriptions.len(), live_guard_conditions.len(), - 0, + live_timers.len(), live_clients.len(), live_services.len(), 0, @@ -166,6 +177,10 @@ impl WaitSet { for live_service in &live_services { wait_set.add_service(live_service.clone())?; } + + for live_timer in &live_timers { + wait_set.add_timer(live_timer.clone())?; + } Ok(wait_set) } @@ -178,6 +193,7 @@ impl WaitSet { self.guard_conditions.clear(); self.clients.clear(); self.services.clear(); + self.timers.clear(); // This cannot fail – the rcl_wait_set_clear function only checks that the input handle is // valid, which it always is in our case. Hence, only debug_assert instead of returning // Result. @@ -311,6 +327,39 @@ impl WaitSet { Ok(()) } + /// Adds a timer to the wait set. + /// + /// # Errors + /// - If the timer was already added to this wait set or another one, + /// [`AlreadyAddedToWaitSet`][1] will be returned + /// - If the number of timers in the wait set is larger than the + /// capacity set in [`WaitSet::new`], [`WaitSetFull`][2] will be returned + /// + /// [1]: crate::RclrsError + /// [2]: crate::RclReturnCode + pub fn add_timer(&mut self, timer: Arc>) -> Result<(), RclrsError> { + let exclusive_timer = ExclusivityGuard::new( + Arc::clone(&timer), + Arc::clone(&timer.lock().unwrap().handle().in_use_by_wait_set), + )?; + unsafe { + // SAFETY: + // * The WaitSet is initialized, which is guaranteed by the constructor. + // * Timer pointer will remain valid for as long as the wait set exists, + // because it's stored in self.timers. + // * Null pointer for `index` is explicitly allowed and doesn't need + // to be kept alive. + rcl_wait_set_add_timer( + &mut self.handle.rcl_wait_set, + &*timer.lock().unwrap().handle().lock(), + core::ptr::null_mut(), + ) + } + .ok()?; + self.timers.push(exclusive_timer); + Ok(()) + } + /// Blocks until the wait set is ready, or until the timeout has been exceeded. /// /// If the timeout is `None` then this function will block indefinitely until @@ -365,6 +414,7 @@ impl WaitSet { clients: Vec::new(), guard_conditions: Vec::new(), services: Vec::new(), + timers: Vec::new(), }; for (i, subscription) in self.subscriptions.iter().enumerate() { // SAFETY: The `subscriptions` entry is an array of pointers, and this dereferencing is @@ -409,12 +459,24 @@ impl WaitSet { ready_entities.services.push(Arc::clone(&service.waitable)); } } + + for (i, timer) in self.timers.iter().enumerate() { + // SAFETY: The `timers` entry is an array of pointers, and this dereferencing is + // equivalent to + // https://github.com/ros2/rcl/blob/35a31b00a12f259d492bf53c0701003bd7f1745c/rcl/include/rcl/wait.h#L419 + let wait_set_entry = unsafe { *self.handle.rcl_wait_set.timers.add(i) }; + if !wait_set_entry.is_null() { + ready_entities.timers.push(Arc::clone(&timer.waitable)); + } + } Ok(ready_entities) } } #[cfg(test)] mod tests { + use crate::{Clock, Timer}; + use super::*; #[test] @@ -440,4 +502,28 @@ mod tests { Ok(()) } + + #[test] + fn timer_in_wait_set_readies() -> Result<(), RclrsError> { + let context = Context::new([])?; + // This is technically a wall clock, but we have a period of 0 so it won't slow down unit testing. + let clock = Clock::system(); + + let timer: Arc> = Arc::new(Mutex::new(Timer::new( + &context, + clock, + Duration::from_secs(0), + |_| {}, + )?)); + let mut wait_set = WaitSet::new(0, 0, 1, 0, 0, 0, &context)?; + + assert!(wait_set.timers.is_empty()); + wait_set.add_timer(Arc::clone(&timer))?; + + let readies = wait_set.wait(Some(std::time::Duration::from_millis(10)))?; + + assert_eq!(readies.timers.len(), 1); + + Ok(()) + } }