From 142012e34e8d9328103ee809f0ac315b056a8ec5 Mon Sep 17 00:00:00 2001 From: David Brown Date: Thu, 10 Apr 2025 15:46:57 -0600 Subject: [PATCH 01/15] zephyr: embassy: Decouple embassy-time and Zephyr-time Don't require that the two time bases be the same. This allows applications to work using the default embassy-time base of 1MHz. There is a performance cost to the conversion (which depends on the exact ratios). If the time bases are the same (which would be common for an application build for a single target), then no conversion is needed. Signed-off-by: David Brown --- samples/embassy/Cargo.toml | 9 +++-- zephyr/src/embassy/time_driver.rs | 60 +++++++++++++++++++++++++++---- 2 files changed, 57 insertions(+), 12 deletions(-) diff --git a/samples/embassy/Cargo.toml b/samples/embassy/Cargo.toml index 6301463e..beeee526 100644 --- a/samples/embassy/Cargo.toml +++ b/samples/embassy/Cargo.toml @@ -28,17 +28,16 @@ features = [ [dependencies.embassy-futures] version = "0.1.1" -# path = "../../embassy/embassy-futures" [dependencies.embassy-sync] version = "0.6.2" -# path = "../../embassy/embassy-sync" [dependencies.embassy-time] version = "0.4.0" -# path = "../../embassy/embassy-time" -# This is board specific. -features = ["tick-hz-10_000"] +# For real builds, you should figure out your target's tick rate and set the appropriate feature, +# like in these examples. Without this, embassy-time will assume a 1Mhz tick rate, and every time +# operation will involve a conversion. +#features = ["tick-hz-10_000"] [dependencies.critical-section] version = "1.2" diff --git a/zephyr/src/embassy/time_driver.rs b/zephyr/src/embassy/time_driver.rs index c9e73247..f5616137 100644 --- a/zephyr/src/embassy/time_driver.rs +++ b/zephyr/src/embassy/time_driver.rs @@ -14,6 +14,18 @@ use embassy_time_queue_utils::Queue; use crate::raw::{k_timeout_t, k_timer, k_timer_init, k_timer_start}; use crate::sys::K_FOREVER; +/// The time base configured into Zephyr. +pub const ZEPHYR_TICK_HZ: u64 = crate::time::SYS_FREQUENCY as u64; + +/// The configured Embassy time tick rate. +pub const EMBASSY_TICK_HZ: u64 = embassy_time_driver::TICK_HZ; + +/// When the zephyr and embassy rates differ, use this intermediate type. This can be selected by +/// feature. At the worst case, with Embassy's tick at 1Mhz, and Zephyr's at 50k, it is a little +/// over 11 years. Higher of either will reduce that further. But, 128-bit arithmetic is fairly +/// inefficient. +type InterTime = u128; + embassy_time_driver::time_driver_impl!(static DRIVER: ZephyrTimeDriver = ZephyrTimeDriver { queue: Mutex::new(RefCell::new(Queue::new())), timer: Mutex::new(RefCell::new(unsafe { mem::zeroed() })), @@ -63,9 +75,40 @@ impl ZTimer { } } +/// Convert from a zephyr tick count, to an embassy tick count. +/// +/// This is done using an intermediate type defined above. +/// This conversion truncates. +fn zephyr_to_embassy(ticks: u64) -> u64 { + if ZEPHYR_TICK_HZ == EMBASSY_TICK_HZ { + // This should happen at compile time. + return ticks; + } + + // Otherwise do the intermediate conversion. + let prod = (ticks as InterTime) * (EMBASSY_TICK_HZ as InterTime); + (prod / (ZEPHYR_TICK_HZ as InterTime)) as u64 +} + +/// Convert from an embassy tick count to a zephyr. +/// +/// This conversion use ceil so that values are always large enough. +fn embassy_to_zephyr(ticks: u64) -> u64 { + if ZEPHYR_TICK_HZ == EMBASSY_TICK_HZ { + return ticks; + } + + let prod = (ticks as InterTime) * (ZEPHYR_TICK_HZ as InterTime); + prod.div_ceil(EMBASSY_TICK_HZ as InterTime) as u64 +} + +fn zephyr_now() -> u64 { + crate::time::now().ticks() +} + impl Driver for ZephyrTimeDriver { fn now(&self) -> u64 { - crate::time::now().ticks() + zephyr_to_embassy(zephyr_now()) } fn schedule_wake(&self, at: u64, waker: &core::task::Waker) { @@ -73,10 +116,13 @@ impl Driver for ZephyrTimeDriver { let mut queue = self.queue.borrow(cs).borrow_mut(); let mut timer = self.timer.borrow(cs).borrow_mut(); + // All times below are in Zephyr units. + let at = embassy_to_zephyr(at); + if queue.schedule_wake(at, waker) { - let mut next = queue.next_expiration(self.now()); - while !timer.set_alarm(next, self.now()) { - next = queue.next_expiration(self.now()); + let mut next = queue.next_expiration(zephyr_now()); + while !timer.set_alarm(next, zephyr_now()) { + next = queue.next_expiration(zephyr_now()); } } }) @@ -89,9 +135,9 @@ impl ZephyrTimeDriver { let mut queue = self.queue.borrow(cs).borrow_mut(); let mut timer = self.timer.borrow(cs).borrow_mut(); - let mut next = queue.next_expiration(self.now()); - while !timer.set_alarm(next, self.now()) { - next = queue.next_expiration(self.now()); + let mut next = queue.next_expiration(zephyr_now()); + while !timer.set_alarm(next, zephyr_now()) { + next = queue.next_expiration(zephyr_now()); } }) } From 8379fec933d850a496a931f24901b41d0427f789 Mon Sep 17 00:00:00 2001 From: David Brown Date: Mon, 24 Mar 2025 15:53:22 -0600 Subject: [PATCH 02/15] zephyr-build: Only generate nodes that are enabled Don't generate instance access code for DT nodes that aren't actually enabled. Signed-off-by: David Brown --- zephyr-build/src/devicetree/augment.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/zephyr-build/src/devicetree/augment.rs b/zephyr-build/src/devicetree/augment.rs index 2881da08..7b333a4c 100644 --- a/zephyr-build/src/devicetree/augment.rs +++ b/zephyr-build/src/devicetree/augment.rs @@ -28,6 +28,12 @@ pub trait Augment { /// The default implementation checks if this node matches and calls a generator if it does, or /// does nothing if not. fn augment(&self, node: &Node, tree: &DeviceTree) -> TokenStream { + // If there is a status field present, and it is not set to "okay", don't augment this node. + if let Some(status) = node.get_single_string("status") { + if status != "okay" { + return TokenStream::new(); + } + } if self.is_compatible(node) { self.generate(node, tree) } else { From a7dcabe83d95ff4c98b711fa6e7c8c345f97228f Mon Sep 17 00:00:00 2001 From: David Brown Date: Thu, 10 Apr 2025 15:56:03 -0600 Subject: [PATCH 03/15] tests: drivers: gpio-async: Add small readme Add a short readme to explain some of the difficulties with level triggered interrupts (notably, most STM32 devices do not support level triggered interrupts). Signed-off-by: David Brown --- tests/drivers/gpio-async/README.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 tests/drivers/gpio-async/README.md diff --git a/tests/drivers/gpio-async/README.md b/tests/drivers/gpio-async/README.md new file mode 100644 index 00000000..de7a3836 --- /dev/null +++ b/tests/drivers/gpio-async/README.md @@ -0,0 +1,6 @@ +# Async gpio + +This demo makes use of the GPIO `wait_for_high()` and `wait_for_low()` async operations. + +Unfortunately, not all GPIO controllers support level triggered interrupts. Notably, most of the +stm32 line does not support level triggered interrupts. From 6610c7b5f63b3542ad66983f95f07e97f926aad8 Mon Sep 17 00:00:00 2001 From: David Brown Date: Thu, 10 Apr 2025 16:01:22 -0600 Subject: [PATCH 04/15] zephyr-macros: Fix dependency version A version mismatch here causes compilation errors due to other crates depending on this specific version. Signed-off-by: David Brown --- zephyr-macros/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zephyr-macros/Cargo.toml b/zephyr-macros/Cargo.toml index a9c386b0..72c2a1b6 100644 --- a/zephyr-macros/Cargo.toml +++ b/zephyr-macros/Cargo.toml @@ -9,7 +9,7 @@ descriptions = "Macros for managing tasks and work queues in Zephyr" proc-macro = true [dependencies] -syn = { version = "2.0.85", features = ["full", "visit"] } +syn = { version = "2.0.79", features = ["full", "visit"] } quote = "1.0.37" proc-macro2 = "1.0.86" darling = "0.20.1" From 457fc3cf841037b48b715539beb1d136e3cf95fc Mon Sep 17 00:00:00 2001 From: David Brown Date: Thu, 10 Apr 2025 16:02:10 -0600 Subject: [PATCH 05/15] zephyr: Change from Arc to declaration on Work Queues Instead of allocating work queues, and hoping they don't get freed, instead move to a static declaration. This is facilitated by a macro `define_work_queue` that makes it easy to declare these at the top level. Signed-off-by: David Brown --- samples/bench/src/lib.rs | 44 ++--- zephyr/src/lib.rs | 2 +- zephyr/src/work.rs | 343 +++++++++++++++++++++------------------ 3 files changed, 202 insertions(+), 187 deletions(-) diff --git a/samples/bench/src/lib.rs b/samples/bench/src/lib.rs index 432384fe..5a8c7f45 100644 --- a/samples/bench/src/lib.rs +++ b/samples/bench/src/lib.rs @@ -16,11 +16,11 @@ use alloc::collections::vec_deque::VecDeque; use alloc::vec; use executor::AsyncTests; use static_cell::StaticCell; -use zephyr::kobj_define; +use zephyr::define_work_queue; use zephyr::raw::k_yield; use zephyr::sync::{PinWeak, SpinMutex}; use zephyr::time::NoWait; -use zephyr::work::{SimpleAction, Work, WorkQueueBuilder}; +use zephyr::work::{SimpleAction, Work}; use zephyr::{ kconfig::CONFIG_SYS_CLOCK_HW_CYCLES_PER_SEC, printkln, @@ -80,7 +80,7 @@ extern "C" fn rust_main() { spin_bench(); sem_bench(); - let simple = Simple::new(tester.workq.clone()); + let simple = Simple::new(tester.workq); let mut num = 6; while num < 250 { simple.run(num, TOTAL_ITERS / num); @@ -147,7 +147,7 @@ struct ThreadTests { high_command: Sender, /// A work queue for the main runners. - workq: Arc, + workq: &'static WorkQueue, /// The test also all return their result to the main. The threads Send, the main running /// receives. @@ -163,15 +163,7 @@ impl ThreadTests { let (low_send, low_recv) = bounded(1); let (high_send, high_recv) = bounded(1); - let workq = Arc::new( - WorkQueueBuilder::new() - .set_priority(5) - .set_no_yield(true) - .start(WORK_STACK.init_once(()).unwrap()), - ); - - // Leak the workqueue so it doesn't get dropped. - let _ = Arc::into_raw(workq.clone()); + let workq = WORKQ.start(); let mut result = Self { sems: &SEMS, @@ -581,20 +573,20 @@ enum TestResult { /// The Simple test just does a ping pong test using manually submitted work. struct Simple { - workq: Arc, + workq: &'static WorkQueue, } impl Simple { - fn new(workq: Arc) -> Self { + fn new(workq: &'static WorkQueue) -> Self { Self { workq } } fn run(&self, workers: usize, iterations: usize) { // printkln!("Running Simple"); - let main = Work::new(SimpleMain::new(workers * iterations, self.workq.clone())); + let main = Work::new(SimpleMain::new(workers * iterations, self.workq)); let children: VecDeque<_> = (0..workers) - .map(|n| Work::new(SimpleWorker::new(main.clone(), self.workq.clone(), n))) + .map(|n| Work::new(SimpleWorker::new(main.clone(), self.workq, n))) .collect(); let mut locked = main.action().locked.lock().unwrap(); @@ -603,7 +595,7 @@ impl Simple { let start = now(); // Fire off main, which will run everything. - Work::submit_to_queue(main.clone(), &self.workq).unwrap(); + Work::submit_to_queue(main.clone(), self.workq).unwrap(); // And wait for the completion semaphore. main.action().done.take(Forever).unwrap(); @@ -642,12 +634,12 @@ impl Simple { /// A simple worker. When run, it submits the main worker to do the next work. struct SimpleWorker { main: PinWeak>, - workq: Arc, + workq: &'static WorkQueue, _id: usize, } impl SimpleWorker { - fn new(main: Pin>>, workq: Arc, id: usize) -> Self { + fn new(main: Pin>>, workq: &'static WorkQueue, id: usize) -> Self { Self { main: PinWeak::downgrade(main), workq, @@ -660,7 +652,7 @@ impl SimpleAction for SimpleWorker { fn act(self: Pin<&Self>) { // Each time we are run, fire the main worker back up. let main = self.main.upgrade().unwrap(); - Work::submit_to_queue(main.clone(), &self.workq).unwrap(); + Work::submit_to_queue(main.clone(), self.workq).unwrap(); } } @@ -670,7 +662,7 @@ impl SimpleAction for SimpleWorker { struct SimpleMain { /// All of the work items. locked: SpinMutex, - workq: Arc, + workq: &'static WorkQueue, done: Semaphore, } @@ -690,12 +682,12 @@ impl SimpleAction for SimpleMain { lock.count -= 1; drop(lock); - Work::submit_to_queue(worker.clone(), &self.workq).unwrap(); + Work::submit_to_queue(worker.clone(), self.workq).unwrap(); } } impl SimpleMain { - fn new(count: usize, workq: Arc) -> Self { + fn new(count: usize, workq: &'static WorkQueue) -> Self { Self { locked: SpinMutex::new(Locked::new(count)), done: Semaphore::new(0, 1), @@ -812,9 +804,7 @@ impl<'a> BenchTimer<'a> { } } -kobj_define! { - static WORK_STACK: ThreadStack; -} +define_work_queue!(WORKQ, WORK_STACK_SIZE, priority = 5, no_yield = true); static SEMS: [Semaphore; NUM_THREADS] = [const { Semaphore::new(0, u32::MAX) }; NUM_THREADS]; static BACK_SEMS: [Semaphore; NUM_THREADS] = [const { Semaphore::new(0, u32::MAX) }; NUM_THREADS]; diff --git a/zephyr/src/lib.rs b/zephyr/src/lib.rs index cccdfaf6..2764e1e7 100644 --- a/zephyr/src/lib.rs +++ b/zephyr/src/lib.rs @@ -39,7 +39,7 @@ //! level operation that is still quite useful in regular code. //! - [`timer`]: Rust interfaces to Zephyr timers. These timers can be used either by registering a //! callback, or polled or waited for for an elapsed time. -//! - [`work`]: Zephyr work queues for Rust. The [`work::WorkQueueBuilder`] and resulting +//! - [`work`]: Zephyr work queues for Rust. The [`define_work_queue`] macro and resulting //! [`work::WorkQueue`] allow creation of Zephyr work queues to be used from Rust. The //! [`work::Work`] item had an action that will be invoked by the work queue, and can be manually //! submitted when needed. diff --git a/zephyr/src/work.rs b/zephyr/src/work.rs index c3ed8d20..af0dd8be 100644 --- a/zephyr/src/work.rs +++ b/zephyr/src/work.rs @@ -51,193 +51,145 @@ //! //! ## The work queues themselves //! -//! Workqueues themselves are built using [`WorkQueueBuilder`]. This needs a statically defined -//! stack. Typical usage will be along the lines of: -//! ```rust -//! kobj_define! { -//! WORKER_STACK: ThreadStack<2048>; -//! } -//! // ... -//! let main_worker = Box::new( -//! WorkQueueBuilder::new() -//! .set_priority(2). -//! .set_name(c"mainloop") -//! .set_no_yield(true) -//! .start(MAIN_LOOP_STACK.init_once(()).unwrap()) -//! ); -//! -//! let _ = zephyr::kio::spawn( -//! mainloop(), // Async or function returning Future. -//! &main_worker, -//! c"w:mainloop", -//! ); +//! Work Queues should be declared with the `define_work_queue!` macro, this macro requires the name +//! of the symbol for the work queue, the stack size, and then zero or more optional arguments, +//! defined by the fields in the [`WorkQueueDeclArgs`] struct. For example: //! -//! ... -//! -//! // Leak the Box so that the worker is never freed. -//! let _ = Box::leak(main_worker); +//! ```rust +//! define_work_queue!(MY_WORKQ, 2048, no_yield = true, priority = 2); //! ``` //! -//! It is important that WorkQueues never be dropped. It has a Drop implementation that invokes -//! panic. Zephyr provides no mechanism to stop work queue threads, so dropping would result in -//! undefined behavior. -//! -//! # Current Status -//! -//! Although Zephyr has 3 types of work queues, the `k_work_poll` is sufficient to implement all of -//! the behavior, and this implementation only implements this type. Non Future work could be built -//! around the other work types. -//! -//! As such, this means that manually constructed work is still built using `Future`. The `_async` -//! primitives throughout this crate can be used just as readily by hand-written Futures as by async -//! code. Notable, the use of [`Signal`] will likely be common, along with possible timeouts. -//! -//! [`sys::sync::Semaphore`]: crate::sys::sync::Semaphore -//! [`sync::channel`]: crate::sync::channel -//! [`sync::Mutex`]: crate::sync::Mutex -//! [`join`]: futures::JoinHandle::join -//! [`join_async`]: futures::JoinHandle::join_async +//! Then, in code, the work queue can be started, and used to issue work. +//! ```rust +//! let my_workq = MY_WORKQ.start(); +//! let action = Work::new(action_item); +//! action.submit(my_workq); +//! ``` extern crate alloc; use core::{ - cell::UnsafeCell, - ffi::{c_int, c_uint, CStr}, + cell::{RefCell, UnsafeCell}, + ffi::{c_char, c_int, c_uint}, mem, pin::Pin, - ptr, + sync::atomic::Ordering, }; +use critical_section::Mutex; +use portable_atomic::AtomicBool; +use portable_atomic_util::Arc; use zephyr_sys::{ k_poll_signal, k_poll_signal_check, k_poll_signal_init, k_poll_signal_raise, k_poll_signal_reset, k_work, k_work_init, k_work_q, k_work_queue_config, k_work_queue_init, - k_work_queue_start, k_work_submit, k_work_submit_to_queue, + k_work_queue_start, k_work_submit, k_work_submit_to_queue, z_thread_stack_element, }; -use crate::{ - error::to_result_void, - object::Fixed, - simpletls::SimpleTls, - sync::{Arc, Mutex}, - sys::thread::ThreadStack, -}; +use crate::{error::to_result_void, object::Fixed, simpletls::SimpleTls}; -/// A builder for work queues themselves. -/// -/// A work queue is a Zephyr thread that instead of directly running a piece of code, manages a work -/// queue. Various types of `Work` can be submitted to these queues, along with various types of -/// triggering conditions. -pub struct WorkQueueBuilder { - /// The "config" value passed in. - config: k_work_queue_config, - /// Priority for the work queue thread. - priority: c_int, +/// The WorkQueue decl args as a struct, so we can have a default, and the macro can fill in those +/// specified by the user. +pub struct WorkQueueDeclArgs { + /// Should this work queue call yield after each queued item. + pub no_yield: bool, + /// Is this work queue thread "essential". + /// + /// Threads marked essential will panic if they stop running. + pub essential: bool, + /// Zephyr thread priority for the work queue thread. + pub priority: c_int, } -impl WorkQueueBuilder { - /// Construct a new WorkQueueBuilder with default values. - pub fn new() -> Self { +impl WorkQueueDeclArgs { + /// Like `Default::default`, but const. + pub const fn default_values() -> Self { Self { - config: k_work_queue_config { - name: ptr::null(), - no_yield: false, - essential: false, - }, + no_yield: false, + essential: false, priority: 0, } } +} - /// Set the name for the WorkQueue thread. - /// - /// This name shows up in debuggers and some analysis tools. - pub fn set_name(&mut self, name: &'static CStr) -> &mut Self { - self.config.name = name.as_ptr(); - self - } - - /// Set the "no yield" flag for the created worker. - /// - /// If this is not set, the work queue will call `k_yield` between each enqueued work item. For - /// non-preemptible threads, this will allow other threads to run. For preemptible threads, - /// this will allow other threads at the same priority to run. - /// - /// This method has a negative in the name, which goes against typical conventions. This is - /// done to match the field in the Zephyr config. - pub fn set_no_yield(&mut self, value: bool) -> &mut Self { - self.config.no_yield = value; - self - } - - /// Set the "essential" flag for the created worker. - /// - /// This sets the essential flag on the running thread. The system considers the termination of - /// an essential thread to be a fatal error. - pub fn set_essential(&mut self, value: bool) -> &mut Self { - self.config.essential = value; - self - } - - /// Set the priority for the worker thread. - /// - /// See the Zephyr docs for the meaning of priority. - pub fn set_priority(&mut self, value: c_int) -> &mut Self { - self.priority = value; - self - } +/// A static declaration of a work-queue. This associates a work queue, with a stack, and an atomic +/// to determine if it has been initialized. +// TODO: Remove the pub on the fields, and make a constructor. +pub struct WorkQueueDecl { + queue: WorkQueue, + stack: &'static crate::thread::ThreadStack, + config: k_work_queue_config, + priority: c_int, + started: AtomicBool, +} - /// Start the given work queue thread. - /// - /// TODO: Implement a 'start' that works from a static work queue. - pub fn start(&self, stack: ThreadStack) -> WorkQueue { - let item: Fixed = Fixed::new(unsafe { mem::zeroed() }); - unsafe { - // SAFETY: Initialize zeroed memory. - k_work_queue_init(item.get()); - - // SAFETY: This associates the workqueue with the thread ID that runs it. The thread is - // a pointer into this work item, which will not move, because of the Fixed. - let this = &mut *item.get(); - WORK_QUEUES - .lock() - .unwrap() - .insert(&this.thread, WorkQueueRef(item.get())); - - // SAFETY: Start work queue thread. The main issue here is that the work queue cannot - // be deallocated once the thread has started. We enforce this by making Drop panic. - k_work_queue_start( - item.get(), - stack.base, - stack.size, - self.priority, - &self.config, - ); +// SAFETY: Sync is needed here to make a static declaration, despite the `*const i8` that is burried +// in the config. +unsafe impl Sync for WorkQueueDecl {} + +impl WorkQueueDecl { + /// Static constructor. Mostly for use by the macro. + pub const fn new( + stack: &'static crate::thread::ThreadStack, + name: *const c_char, + args: WorkQueueDeclArgs, + ) -> Self { + Self { + queue: unsafe { mem::zeroed() }, + stack, + config: k_work_queue_config { + name, + no_yield: args.no_yield, + essential: args.essential, + }, + priority: args.priority, + started: AtomicBool::new(false), } + } - WorkQueue { item } + /// Start the work queue thread, if needed, and return a reference to it. + pub fn start(&'static self) -> &'static WorkQueue { + critical_section::with(|cs| { + if self.started.load(Ordering::Relaxed) { + // Already started, just return it. + return &self.queue; + } + + // SAFETY: Starting is coordinated by the atomic, as well as being protected in a + // critical section. + unsafe { + let this = &mut *self.queue.item.get(); + + k_work_queue_init(self.queue.item.get()); + + // Add to the WORK_QUEUES data. That needs to be changed to a critical + // section Mutex from a Zephyr Mutex, as that would deadlock if called while in a + // critrical section. + let mut tls = WORK_QUEUES.borrow_ref_mut(cs); + tls.insert(&this.thread, WorkQueueRef(self.queue.item.get())); + + // Start the work queue thread. + k_work_queue_start( + self.queue.item.get(), + self.stack.data.get() as *mut z_thread_stack_element, + self.stack.size(), + self.priority, + &self.config, + ); + } + + &self.queue + }) } } /// A running work queue thread. /// -/// # Panic -/// -/// Allowing a work queue to drop will result in a panic. There are two ways to handle this, -/// depending on whether the WorkQueue is in a Box, or an Arc: -/// ``` -/// // Leak a work queue in an Arc. -/// let wq = Arc::new(WorkQueueBuilder::new().start(...)); -/// // If the Arc is used after this: -/// let _ = Arc::into_raw(wq.clone()); -/// // If the Arc is no longer needed: -/// let _ = Arc::into_raw(wq); -/// -/// // Leak a work queue in a Box. -/// let wq = Box::new(WorkQueueBuilder::new().start(...)); -/// let _ = Box::leak(wq); -/// ``` +/// This must be declared statically, and initialized once. Please see the macro +/// [`define_work_queue`] which declares this with a [`StaticWorkQueue`] to help with the +/// association with a stack, and making sure the queue is only started once. pub struct WorkQueue { #[allow(dead_code)] - item: Fixed, + item: UnsafeCell, } /// Work queues can be referenced from multiple threads, and thus are Send and Sync. @@ -265,7 +217,8 @@ impl Drop for WorkQueue { /// /// This is a little bit messy as we don't have a lazy mechanism, so we have to handle this a bit /// manually right now. -static WORK_QUEUES: Mutex> = Mutex::new(SimpleTls::new()); +static WORK_QUEUES: Mutex>> = + Mutex::new(RefCell::new(SimpleTls::new())); /// For the queue mapping, we need a simple wrapper around the underlying pointer, one that doesn't /// implement stop. @@ -278,7 +231,7 @@ unsafe impl Sync for WorkQueueRef {} /// Retrieve the current work queue, if we are running within one. pub fn get_current_workq() -> Option<*mut k_work_q> { - WORK_QUEUES.lock().unwrap().get().map(|wq| wq.0) + critical_section::with(|cs| WORK_QUEUES.borrow_ref(cs).get().map(|wq| wq.0)) } /// A Rust wrapper for `k_poll_signal`. @@ -408,6 +361,24 @@ impl SubmitResult { } } +/* +pub trait Queueable: Send + Sync { + fn as_ptr(&self) -> *const (); +} + +impl Queueable for Arc { + fn as_ptr(&self) -> *const () { + todo!() + } +} + +impl Queueable for &'static T { + fn as_ptr(&self) -> *const () { + todo!() + } +} +*/ + /// A simple action that just does something with its data. /// /// This is similar to a Future, except there is no concept of it completing. It manages its @@ -480,17 +451,24 @@ impl Work { // SAFETY: C the code does not perform moves on the data, and the `from_raw` below puts it // back into a Pin when it reconstructs the Arc. let this = unsafe { Pin::into_inner_unchecked(this) }; - let _ = Arc::into_raw(this); + let _ = Arc::into_raw(this.clone()); // SAFETY: The Pin ensures this will not move. Our implementation of drop ensures that the // work item is no longer queued when the data is dropped. - SubmitResult::to_result(unsafe { k_work_submit(work) }) + let result = SubmitResult::to_result(unsafe { k_work_submit(work) }); + + Self::check_drop(work, &result); + + result } /// Submit this work to a specified work queue. /// /// TODO: Change when we have better wrappers for work queues. - pub fn submit_to_queue(this: Pin>, queue: &WorkQueue) -> crate::Result { + pub fn submit_to_queue( + this: Pin>, + queue: &'static WorkQueue, + ) -> crate::Result { let work = this.work.get(); // "leak" the arc to give to C. We'll reconstruct it in the handler. @@ -501,7 +479,12 @@ impl Work { // SAFETY: The Pin ensures this will not move. Our implementation of drop ensures that the // work item is no longer queued when the data is dropped. - SubmitResult::to_result(unsafe { k_work_submit_to_queue(queue.item.get(), work) }) + let result = + SubmitResult::to_result(unsafe { k_work_submit_to_queue(queue.item.get(), work) }); + + Self::check_drop(work, &result); + + result } /// Callback, through C, but bound by a specific type. @@ -541,8 +524,50 @@ impl Work { Pin::new_unchecked(this) } + /// Determine if this work was submitted, and cause a drop of the Arc to happen if it was not. + pub fn check_drop(work: *const k_work, result: &crate::Result) { + if matches!(result, Ok(SubmitResult::AlreadySubmitted) | Err(_)) { + // SAFETY: If the above submit indicates that it was already running, the work will not + // be submitted (no additional handle will be called). "un leak" the work so that it + // will be dropped. Also, any error indicates that the work did not enqueue. + unsafe { + let this = Self::from_raw(work); + drop(this); + } + } + } + /// Access the inner action. pub fn action(&self) -> &T { &self.action } } + +/// Declare a static work queue. +/// +/// This declares a static work queue (of type [`WorkQueueDecl`]). This will have a single method +/// `.start()` which can be used to start the work queue, as well as return the persistent handle +/// that can be used to enqueue to it. +#[macro_export] +macro_rules! define_work_queue { + ($name:ident, $stack_size:expr) => { + $crate::define_work_queue!($name, $stack_size,); + }; + ($name:ident, $stack_size:expr, $($key:ident = $value:expr),* $(,)?) => { + static $name: $crate::work::WorkQueueDecl<$stack_size> = { + #[link_section = concat!(".noinit.workq.", stringify!($name))] + static _ZEPHYR_STACK: $crate::thread::ThreadStack<$stack_size> = + $crate::thread::ThreadStack::new(); + const _ZEPHYR_C_NAME: &[u8] = concat!(stringify!($name), "\0").as_bytes(); + const _ZEPHYR_ARGS: $crate::work::WorkQueueDeclArgs = $crate::work::WorkQueueDeclArgs { + $($key: $value,)* + ..$crate::work::WorkQueueDeclArgs::default_values() + }; + $crate::work::WorkQueueDecl::new( + &_ZEPHYR_STACK, + _ZEPHYR_C_NAME.as_ptr() as *const ::core::ffi::c_char, + _ZEPHYR_ARGS, + ) + }; + }; +} From 820b0a095e204e9900576059335060dfc5f2c0b3 Mon Sep 17 00:00:00 2001 From: David Brown Date: Fri, 11 Apr 2025 11:10:41 -0600 Subject: [PATCH 06/15] ci: Workaround for docgen dependency problem The docgen target seems to try building the docs before the generated headers are present. Work around this by making a full build first, and then generating the docs. Signed-off-by: David Brown --- .github/workflows/docs.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index 0e65bd73..26316fe1 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -45,7 +45,8 @@ jobs: working-directory: zephyr-lang-rust run: | # Note that the above build doesn't set Zephyrbase, so we'll need to do that here. - west build -t rustdoc -b qemu_cortex_m3 docgen + west build -b qemu_cortex_m3 docgen + west build -t rustdoc mkdir rustdocs mv build/rust/target/thumbv7m-none-eabi/doc rustdocs/nostd From 95c5bcd4ee97287563b9a095ceb05e0bf0917d66 Mon Sep 17 00:00:00 2001 From: David Brown Date: Fri, 11 Apr 2025 11:38:35 -0600 Subject: [PATCH 07/15] zephyr: work: Remove documentation for removed code The work-queue-based Future has been removed, so also removed the documentation that was describing it. Signed-off-by: David Brown --- zephyr/src/work.rs | 38 ++++---------------------------------- 1 file changed, 4 insertions(+), 34 deletions(-) diff --git a/zephyr/src/work.rs b/zephyr/src/work.rs index af0dd8be..21ca411c 100644 --- a/zephyr/src/work.rs +++ b/zephyr/src/work.rs @@ -17,39 +17,7 @@ //! having the `k_work` embedded in their structure, and Zephyr schedules the work when the given //! reason happens. //! -//! At this time, only the basic work queue type is supported. -//! -//! Zephyr's work queues can be used in different ways: -//! -//! - Work can be scheduled as needed. For example, an IRQ handler can queue a work item to process -//! data it has received from a device. -//! - Work can be scheduled periodically. -//! -//! As most C use of Zephyr statically allocates things like work, these are typically rescheduled -//! when the work is complete. The work queue scheduling functions are designed, and intended, for -//! a given work item to be able to reschedule itself, and such usage is common. -//! -//! ## Ownership -//! -//! The remaining challenge with implementing `k_work` for Rust is that of ownership. The model -//! taken here is that the work items are held in a `Box` that is effectively owned by the work -//! itself. When the work item is scheduled to Zephyr, ownership of that box is effectively handed -//! off to C, and then when the work item is called, the Box re-constructed. This repeats until the -//! work is no longer needed, at which point the work will be dropped. -//! -//! There are two common ways the lifecycle of work can be managed in an embedded system: -//! -//! - A set of `Future`'s are allocated once at the start, and these never return a value. Work -//! Futures inside of this (which correspond to `.await` in async code) can have lives and return -//! values, but the main loops will not return values, or be dropped. Embedded Futures will -//! typically not be boxed. -//! -//! One consequence of the ownership being passed through to C code is that if the work cancellation -//! mechanism is used on a work queue, the work items themselves will be leaked. -//! -//! These work items are also `Pin`, to ensure that the work actions are not moved. -//! -//! ## The work queues themselves +//! At this point, this code supports the simple work queues, with [`Work`] items. //! //! Work Queues should be declared with the `define_work_queue!` macro, this macro requires the name //! of the symbol for the work queue, the stack size, and then zero or more optional arguments, @@ -185,8 +153,10 @@ impl WorkQueueDecl { /// A running work queue thread. /// /// This must be declared statically, and initialized once. Please see the macro -/// [`define_work_queue`] which declares this with a [`StaticWorkQueue`] to help with the +/// [`define_work_queue`] which declares this with a [`WorkQueue`] to help with the /// association with a stack, and making sure the queue is only started once. +/// +/// [`define_work_queue`]: crate::define_work_queue pub struct WorkQueue { #[allow(dead_code)] item: UnsafeCell, From 913c1a374a7a8930bcdf76ca9dec65eb715d2aa3 Mon Sep 17 00:00:00 2001 From: David Brown Date: Fri, 11 Apr 2025 11:39:13 -0600 Subject: [PATCH 08/15] zephyr: Documentation fixes Fix various broken links in documentation comments. Signed-off-by: David Brown --- zephyr/src/embassy.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zephyr/src/embassy.rs b/zephyr/src/embassy.rs index 8bbc2546..d924ae95 100644 --- a/zephyr/src/embassy.rs +++ b/zephyr/src/embassy.rs @@ -71,7 +71,7 @@ //! //! ## Caveats //! -//! [`Semaphore::take_async`]: crate::sys::sync::Semaphore::take_async +//! The executor currently doesn't support async waits on Zephyr primitives, such as Semaphore. #[cfg(feature = "time-driver")] mod time_driver; From 7fb01e155d3e68e42827af9d6c17b3da981cbce4 Mon Sep 17 00:00:00 2001 From: David Brown Date: Tue, 22 Apr 2025 15:36:58 -0600 Subject: [PATCH 09/15] zephyr: sync: Add methods to PinWeak Add some missing methods to `PinWeak`, notably forwarding `strong_count`, `weak_count`, and `ptr_eq` through to the inner type. Signed-off-by: David Brown --- zephyr/src/sync.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/zephyr/src/sync.rs b/zephyr/src/sync.rs index 001ea8bc..6924f83e 100644 --- a/zephyr/src/sync.rs +++ b/zephyr/src/sync.rs @@ -57,6 +57,21 @@ mod pinweak { .upgrade() .map(|arc| unsafe { Pin::new_unchecked(arc) }) } + + /// Equivalent to [`Weak::strong_count`] + pub fn strong_count(&self) -> usize { + self.0.strong_count() + } + + /// Equivalent to [`Weak::weak_count`] + pub fn weak_count(&self) -> usize { + self.0.weak_count() + } + + /// Equivalent to [`Weak::ptr_eq`] + pub fn ptr_eq(&self, other: &Self) -> bool { + self.0.ptr_eq(&other.0) + } } } From 510b4ec79c72f18e3259e72a36b826c3924dff5f Mon Sep 17 00:00:00 2001 From: David Brown Date: Tue, 22 Apr 2025 15:38:08 -0600 Subject: [PATCH 10/15] samples: bench: Check for Arc leaks After running the work queue benchmarks, ensure that we haven't leaked any Arcs. Signed-off-by: David Brown --- samples/bench/src/lib.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/samples/bench/src/lib.rs b/samples/bench/src/lib.rs index 5a8c7f45..23c19857 100644 --- a/samples/bench/src/lib.rs +++ b/samples/bench/src/lib.rs @@ -619,15 +619,16 @@ impl Simple { ); // Before we go away, make sure that there aren't any leaked workers. - /* let mut locked = main.action().locked.lock().unwrap(); while let Some(other) = locked.works.pop_front() { - // Portable atomic's Arc seems to be a problem here. - let other = unsafe { Pin::into_inner_unchecked(other) }; + let other = Pin::into_inner(other); assert_eq!(Arc::strong_count(&other), 1); - // printkln!("Child: {} refs", Arc::strong_count(&other)); } - */ + drop(locked); + + // And nothing has leaked main, either. + let main = Pin::into_inner(main); + assert_eq!(Arc::strong_count(&main), 1); } } From 930242d90e1887fead7c69703e2f1d9faf9db077 Mon Sep 17 00:00:00 2001 From: David Brown Date: Fri, 2 May 2025 10:27:24 -0600 Subject: [PATCH 11/15] zephyr: work: Generalize Work pointer type First step in generalizing the worker type. Place the inner pointer type into a trait, and allow the work methods to work on pointers of that type. This continues to work the same as before with `Pin>>`. It needs a bit more massaging to be able to work with static pointers, namely that the constructor has to be able to be static, and as such, the initialization can't happen until it has been pinned down. This will likely change the "new" API. Signed-off-by: David Brown --- samples/bench/src/lib.rs | 2 +- zephyr/src/work.rs | 151 ++++++++++++++++++++++----------------- 2 files changed, 85 insertions(+), 68 deletions(-) diff --git a/samples/bench/src/lib.rs b/samples/bench/src/lib.rs index 23c19857..bc40e632 100644 --- a/samples/bench/src/lib.rs +++ b/samples/bench/src/lib.rs @@ -583,7 +583,7 @@ impl Simple { fn run(&self, workers: usize, iterations: usize) { // printkln!("Running Simple"); - let main = Work::new(SimpleMain::new(workers * iterations, self.workq)); + let main: Pin>> = Work::new(SimpleMain::new(workers * iterations, self.workq)); let children: VecDeque<_> = (0..workers) .map(|n| Work::new(SimpleWorker::new(main.clone(), self.workq, n))) diff --git a/zephyr/src/work.rs b/zephyr/src/work.rs index 21ca411c..829ecacc 100644 --- a/zephyr/src/work.rs +++ b/zephyr/src/work.rs @@ -331,24 +331,6 @@ impl SubmitResult { } } -/* -pub trait Queueable: Send + Sync { - fn as_ptr(&self) -> *const (); -} - -impl Queueable for Arc { - fn as_ptr(&self) -> *const () { - todo!() - } -} - -impl Queueable for &'static T { - fn as_ptr(&self) -> *const () { - todo!() - } -} -*/ - /// A simple action that just does something with its data. /// /// This is similar to a Future, except there is no concept of it completing. It manages its @@ -394,16 +376,15 @@ impl Work { /// inter-thread sharing mechanisms are needed. /// /// TODO: Can we come up with a way to allow sharing on the same worker using Rc instead of Arc? - pub fn new(action: T) -> Pin> { - let this = Arc::pin(Self { - // SAFETY: will be initialized below, after this is pinned. - work: unsafe { mem::zeroed() }, - action, - }); + pub fn new

(action: T) -> P + where + P: SubmittablePointer, + { + let this: P = unsafe { SubmittablePointer::new_ptr(action) }; - // SAFETY: Initializes above zero-initialized struct. + // SAFETY: Initializes the above zero-initialized struct. unsafe { - k_work_init(this.work.get(), Some(Self::handler)); + k_work_init(this.get_work(), Some(P::handler)); } this @@ -413,21 +394,25 @@ impl Work { /// /// This can return several possible `Ok` results. See the docs on [`SubmitResult`] for an /// explanation of them. - pub fn submit(this: Pin>) -> crate::Result { + pub fn submit

(this: P) -> crate::Result + where + P: SubmittablePointer, + { // We "leak" the arc, so that when the handler runs, it can be safely turned back into an // Arc, and the drop on the arc will then run. - let work = this.work.get(); + let work = this.get_work(); // SAFETY: C the code does not perform moves on the data, and the `from_raw` below puts it // back into a Pin when it reconstructs the Arc. - let this = unsafe { Pin::into_inner_unchecked(this) }; - let _ = Arc::into_raw(this.clone()); + unsafe { + P::into_raw(this); + } // SAFETY: The Pin ensures this will not move. Our implementation of drop ensures that the // work item is no longer queued when the data is dropped. let result = SubmitResult::to_result(unsafe { k_work_submit(work) }); - Self::check_drop(work, &result); + P::check_drop(work, &result); result } @@ -435,67 +420,91 @@ impl Work { /// Submit this work to a specified work queue. /// /// TODO: Change when we have better wrappers for work queues. - pub fn submit_to_queue( - this: Pin>, - queue: &'static WorkQueue, - ) -> crate::Result { - let work = this.work.get(); + pub fn submit_to_queue

(this: P, queue: &'static WorkQueue) -> crate::Result + where + P: SubmittablePointer, + { + let work = this.get_work(); // "leak" the arc to give to C. We'll reconstruct it in the handler. // SAFETY: The C code does not perform moves on the data, and the `from_raw` below puts it // back into a Pin when it reconstructs the Arc. - let this = unsafe { Pin::into_inner_unchecked(this) }; - let _ = Arc::into_raw(this); + unsafe { + P::into_raw(this); + } // SAFETY: The Pin ensures this will not move. Our implementation of drop ensures that the // work item is no longer queued when the data is dropped. let result = SubmitResult::to_result(unsafe { k_work_submit_to_queue(queue.item.get(), work) }); - Self::check_drop(work, &result); + P::check_drop(work, &result); result } - /// Callback, through C, but bound by a specific type. - extern "C" fn handler(work: *mut k_work) { - // We want to avoid needing a `repr(C)` on our struct, so the `k_work` pointer is not - // necessarily at the beginning of the struct. - // SAFETY: Converts raw pointer to work back into the box. - let this = unsafe { Self::from_raw(work) }; + /// Access the inner action. + pub fn action(&self) -> &T { + &self.action + } +} - // Access the action within, still pinned. - // SAFETY: It is safe to keep the pin on the interior. - let action = unsafe { this.as_ref().map_unchecked(|p| &p.action) }; +/// Capture the kinds of pointers that are safe to submit to work queues. +pub trait SubmittablePointer { + /// Create a new version of a pointer for this particular type. The pointer should be pinned + /// after this call, and can then be initialized and used by C code. + unsafe fn new_ptr(action: T) -> Self; - action.act(); + /// Given a raw pointer to the work_q burried within, recover the Self pointer containing our + /// data. + unsafe fn from_raw(ptr: *const k_work) -> Self; + + /// Given our Self, indicate that this reference is now owned by the C code. For something like + /// Arc, this should leak a reference, and is the opposite of from_raw. + unsafe fn into_raw(self); + + /// Determine from the submitted work if this work has been enqueued, and if not, cause a "drop" + /// to happen on the Self pointer type. + fn check_drop(work: *const k_work, result: &crate::Result); + + /// Get the inner work pointer. + fn get_work(&self) -> *mut k_work; + + /// The low-level handler for this specific type. + extern "C" fn handler(work: *mut k_work); +} + +impl SubmittablePointer for Pin>> { + unsafe fn new_ptr(action: T) -> Self { + Arc::pin(Work { + work: unsafe { mem::zeroed() }, + action, + }) } - /* - /// Consume this Arc, returning the internal pointer. Needs to have a complementary `from_raw` - /// called to avoid leaking the item. - fn into_raw(this: Pin>) -> *const Self { - // SAFETY: This removes the Pin guarantee, but is given as a raw pointer to C, which doesn't - // generally use move. - let this = unsafe { Pin::into_inner_unchecked(this) }; - Arc::into_raw(this) + fn get_work(&self) -> *mut k_work { + self.work.get() } - */ - /// Given a pointer to the work_q burried within, recover the Pinned Box containing our data. - unsafe fn from_raw(ptr: *const k_work) -> Pin> { + unsafe fn from_raw(ptr: *const k_work) -> Self { // SAFETY: This fixes the pointer back to the beginning of Self. This also assumes the // pointer is valid. let ptr = ptr .cast::() - .sub(mem::offset_of!(Self, work)) - .cast::(); + .sub(mem::offset_of!(Work, work)) + .cast::>(); let this = Arc::from_raw(ptr); Pin::new_unchecked(this) } - /// Determine if this work was submitted, and cause a drop of the Arc to happen if it was not. - pub fn check_drop(work: *const k_work, result: &crate::Result) { + unsafe fn into_raw(self) { + // SAFETY: The C code does not perform moves on the data, and the `from_raw` that gets back + // our Arc puts it back into the pin when it reconstructs the Arc. + let this = unsafe { Pin::into_inner_unchecked(self) }; + let _ = Arc::into_raw(this.clone()); + } + + fn check_drop(work: *const k_work, result: &crate::Result) { if matches!(result, Ok(SubmitResult::AlreadySubmitted) | Err(_)) { // SAFETY: If the above submit indicates that it was already running, the work will not // be submitted (no additional handle will be called). "un leak" the work so that it @@ -507,9 +516,17 @@ impl Work { } } - /// Access the inner action. - pub fn action(&self) -> &T { - &self.action + extern "C" fn handler(work: *mut k_work) { + // We want to avoid needing a `repr(C)` on our struct, so the `k_work` pointer is not + // necessarily at the beginning of the struct. + // SAFETY: Converts raw pointer to work back into the box. + let this = unsafe { Self::from_raw(work) }; + + // Access the action within, still pinned. + // SAFETY: It is safe to keep the pin on the interior. + let action = unsafe { this.as_ref().map_unchecked(|p| &p.action) }; + + action.act(); } } From f936c708cfb4a941db2e8cd1de840a4554246fa8 Mon Sep 17 00:00:00 2001 From: David Brown Date: Fri, 2 May 2025 12:37:27 -0600 Subject: [PATCH 12/15] zephyr: work: Replace UnsafeCell with ZephyrObject The ZephyrObject pairs the UnsafeCell with an atomic for initialization that allows for const constructors. Signed-off-by: David Brown --- zephyr/src/work.rs | 53 +++++++++++++++++++++++++++++++--------------- 1 file changed, 36 insertions(+), 17 deletions(-) diff --git a/zephyr/src/work.rs b/zephyr/src/work.rs index 829ecacc..0a7e8325 100644 --- a/zephyr/src/work.rs +++ b/zephyr/src/work.rs @@ -49,11 +49,16 @@ use portable_atomic::AtomicBool; use portable_atomic_util::Arc; use zephyr_sys::{ k_poll_signal, k_poll_signal_check, k_poll_signal_init, k_poll_signal_raise, - k_poll_signal_reset, k_work, k_work_init, k_work_q, k_work_queue_config, k_work_queue_init, - k_work_queue_start, k_work_submit, k_work_submit_to_queue, z_thread_stack_element, + k_poll_signal_reset, k_work, k_work_handler_t, k_work_init, k_work_q, k_work_queue_config, + k_work_queue_init, k_work_queue_start, k_work_submit, k_work_submit_to_queue, + z_thread_stack_element, }; -use crate::{error::to_result_void, object::Fixed, simpletls::SimpleTls}; +use crate::{ + error::to_result_void, + object::{Fixed, ObjectInit, ZephyrObject}, + simpletls::SimpleTls, +}; /// The WorkQueue decl args as a struct, so we can have a default, and the macro can fill in those /// specified by the user. @@ -348,7 +353,7 @@ pub trait SimpleAction { /// Holds a `k_work`, along with the data associated with that work. When the work is queued, the /// `act` method will be called on the provided `SimpleAction`. pub struct Work { - work: UnsafeCell, + work: ZephyrObject, action: T, } @@ -380,12 +385,9 @@ impl Work { where P: SubmittablePointer, { - let this: P = unsafe { SubmittablePointer::new_ptr(action) }; - - // SAFETY: Initializes the above zero-initialized struct. - unsafe { - k_work_init(this.get_work(), Some(P::handler)); - } + // SAFETY: Initializes the above zero-initialized struct. Initialization once is handled by + // ZephyrObject. + let this: P = unsafe { SubmittablePointer::new_ptr(action, Some(P::handler)) }; this } @@ -453,7 +455,7 @@ impl Work { pub trait SubmittablePointer { /// Create a new version of a pointer for this particular type. The pointer should be pinned /// after this call, and can then be initialized and used by C code. - unsafe fn new_ptr(action: T) -> Self; + unsafe fn new_ptr(action: T, handler: k_work_handler_t) -> Self; /// Given a raw pointer to the work_q burried within, recover the Self pointer containing our /// data. @@ -475,15 +477,21 @@ pub trait SubmittablePointer { } impl SubmittablePointer for Pin>> { - unsafe fn new_ptr(action: T) -> Self { - Arc::pin(Work { - work: unsafe { mem::zeroed() }, - action, - }) + unsafe fn new_ptr(action: T, handler: k_work_handler_t) -> Self { + let work = >::new_raw(); + + unsafe { + let addr = work.get_uninit(); + (*addr).handler = handler; + } + + Arc::pin(Work { work, action }) } fn get_work(&self) -> *mut k_work { - self.work.get() + // SAFETY: The `get` method takes care of initialization as well as ensuring that the value + // is not moved since the first initialization. + unsafe { self.work.get() } } unsafe fn from_raw(ptr: *const k_work) -> Self { @@ -530,6 +538,17 @@ impl SubmittablePointer for Pin>> { } } +impl ObjectInit for ZephyrObject { + fn init(item: *mut k_work) { + // SAFETY: The handler was stashed in this field when constructing. At this point, the item + // will be pinned. + unsafe { + let handler = (*item).handler; + k_work_init(item, handler); + } + } +} + /// Declare a static work queue. /// /// This declares a static work queue (of type [`WorkQueueDecl`]). This will have a single method From 46d3d0fac1f08dfa64286141e07d1f729acadc5a Mon Sep 17 00:00:00 2001 From: David Brown Date: Tue, 6 May 2025 15:38:43 -0600 Subject: [PATCH 13/15] work: Specialize Arc-based Work Instead of hard-coding `Pin>>`, have a constructor that wraps the `Arc>` in a new type `ArcWork`. This has the methods for submitting work on this particular pointer type. This leaves room for `StaticWork` which will have a static reference. Removes all references to Pin. Now that the Work queue itself is stored in a ZephyrObject, we have a mechanism to panic on move, which although a runtime check, does catch issues where values have been moved. Signed-off-by: David Brown --- samples/bench/src/lib.rs | 39 ++++--- zephyr/src/work.rs | 225 ++++++++++++++++++--------------------- 2 files changed, 120 insertions(+), 144 deletions(-) diff --git a/samples/bench/src/lib.rs b/samples/bench/src/lib.rs index bc40e632..de02d7ba 100644 --- a/samples/bench/src/lib.rs +++ b/samples/bench/src/lib.rs @@ -10,7 +10,6 @@ extern crate alloc; use core::mem; -use core::pin::Pin; use alloc::collections::vec_deque::VecDeque; use alloc::vec; @@ -18,9 +17,9 @@ use executor::AsyncTests; use static_cell::StaticCell; use zephyr::define_work_queue; use zephyr::raw::k_yield; -use zephyr::sync::{PinWeak, SpinMutex}; +use zephyr::sync::{SpinMutex, Weak}; use zephyr::time::NoWait; -use zephyr::work::{SimpleAction, Work}; +use zephyr::work::{ArcWork, SimpleAction, Work}; use zephyr::{ kconfig::CONFIG_SYS_CLOCK_HW_CYCLES_PER_SEC, printkln, @@ -583,22 +582,22 @@ impl Simple { fn run(&self, workers: usize, iterations: usize) { // printkln!("Running Simple"); - let main: Pin>> = Work::new(SimpleMain::new(workers * iterations, self.workq)); + let main = Work::new_arc(SimpleMain::new(workers * iterations, self.workq)); let children: VecDeque<_> = (0..workers) - .map(|n| Work::new(SimpleWorker::new(main.clone(), self.workq, n))) + .map(|n| Work::new_arc(SimpleWorker::new(main.0.clone(), self.workq, n)).0) .collect(); - let mut locked = main.action().locked.lock().unwrap(); + let mut locked = main.0.action().locked.lock().unwrap(); let _ = mem::replace(&mut locked.works, children); drop(locked); let start = now(); // Fire off main, which will run everything. - Work::submit_to_queue(main.clone(), self.workq).unwrap(); + main.clone().submit_to_queue(self.workq).unwrap(); // And wait for the completion semaphore. - main.action().done.take(Forever).unwrap(); + main.0.action().done.take(Forever).unwrap(); let stop = now(); let time = (stop - start) as f64 / (CONFIG_SYS_CLOCK_HW_CYCLES_PER_SEC as f64) * 1000.0; @@ -619,30 +618,28 @@ impl Simple { ); // Before we go away, make sure that there aren't any leaked workers. - let mut locked = main.action().locked.lock().unwrap(); + let mut locked = main.0.action().locked.lock().unwrap(); while let Some(other) = locked.works.pop_front() { - let other = Pin::into_inner(other); assert_eq!(Arc::strong_count(&other), 1); } drop(locked); // And nothing has leaked main, either. - let main = Pin::into_inner(main); - assert_eq!(Arc::strong_count(&main), 1); + assert_eq!(Arc::strong_count(&main.0), 1); } } /// A simple worker. When run, it submits the main worker to do the next work. struct SimpleWorker { - main: PinWeak>, + main: Weak>, workq: &'static WorkQueue, _id: usize, } impl SimpleWorker { - fn new(main: Pin>>, workq: &'static WorkQueue, id: usize) -> Self { + fn new(main: Arc>, workq: &'static WorkQueue, id: usize) -> Self { Self { - main: PinWeak::downgrade(main), + main: Arc::downgrade(&main), workq, _id: id, } @@ -650,10 +647,10 @@ impl SimpleWorker { } impl SimpleAction for SimpleWorker { - fn act(self: Pin<&Self>) { + fn act(self: &Self) { // Each time we are run, fire the main worker back up. - let main = self.main.upgrade().unwrap(); - Work::submit_to_queue(main.clone(), self.workq).unwrap(); + let main = ArcWork(self.main.upgrade().unwrap()); + main.clone().submit_to_queue(self.workq).unwrap(); } } @@ -668,7 +665,7 @@ struct SimpleMain { } impl SimpleAction for SimpleMain { - fn act(self: Pin<&Self>) { + fn act(self: &Self) { // Each time, take a worker from the queue, and submit it. let mut lock = self.locked.lock().unwrap(); @@ -683,7 +680,7 @@ impl SimpleAction for SimpleMain { lock.count -= 1; drop(lock); - Work::submit_to_queue(worker.clone(), self.workq).unwrap(); + ArcWork(worker.clone()).submit_to_queue(self.workq).unwrap(); } } @@ -698,7 +695,7 @@ impl SimpleMain { } struct Locked { - works: VecDeque>>>, + works: VecDeque>>, count: usize, } diff --git a/zephyr/src/work.rs b/zephyr/src/work.rs index 0a7e8325..5b14cefe 100644 --- a/zephyr/src/work.rs +++ b/zephyr/src/work.rs @@ -40,7 +40,6 @@ use core::{ cell::{RefCell, UnsafeCell}, ffi::{c_char, c_int, c_uint}, mem, - pin::Pin, sync::atomic::Ordering, }; @@ -49,7 +48,7 @@ use portable_atomic::AtomicBool; use portable_atomic_util::Arc; use zephyr_sys::{ k_poll_signal, k_poll_signal_check, k_poll_signal_init, k_poll_signal_raise, - k_poll_signal_reset, k_work, k_work_handler_t, k_work_init, k_work_q, k_work_queue_config, + k_poll_signal_reset, k_work, k_work_init, k_work_q, k_work_queue_config, k_work_queue_init, k_work_queue_start, k_work_submit, k_work_submit_to_queue, z_thread_stack_element, }; @@ -345,7 +344,7 @@ impl SubmitResult { /// below uses an Arc, so this data can be shared. pub trait SimpleAction { /// Perform the action. - fn act(self: Pin<&Self>); + fn act(self: &Self); } /// A basic Zephyr work item. @@ -373,150 +372,81 @@ where { } -impl Work { - /// Construct a new Work from the given action. - /// - /// Note that the data will be moved into the pinned Work. The data is internal, and only - /// accessible to the work thread (the `act` method). If shared data is needed, normal - /// inter-thread sharing mechanisms are needed. - /// - /// TODO: Can we come up with a way to allow sharing on the same worker using Rc instead of Arc? - pub fn new

(action: T) -> P - where - P: SubmittablePointer, - { - // SAFETY: Initializes the above zero-initialized struct. Initialization once is handled by - // ZephyrObject. - let this: P = unsafe { SubmittablePointer::new_ptr(action, Some(P::handler)) }; - - this +/// Arc held work. +/// +/// Because C code takes ownership of the work, we only support submitting work with very specific +/// pointer types. This wraps an Arc holding work to allow Work held in an Arc to be queued. +/// Earlier versions of this required the work to be `Pin>`, however, we use +/// [`ZephyrObject`] to hold work items, anyway, and this already does a runtime check to prevents +/// moves, so we can safely avoid needing to use `Pin`. However, note that if the work is moved +/// between it's first use, and subsequent use, it will panic. +pub struct ArcWork(pub Arc>); + +/// Clone just passes the clone to the arc. +impl Clone for ArcWork { + fn clone(&self) -> Self { + ArcWork(self.0.clone()) } +} +impl ArcWork { /// Submit this work to the system work queue. /// /// This can return several possible `Ok` results. See the docs on [`SubmitResult`] for an /// explanation of them. - pub fn submit

(this: P) -> crate::Result - where - P: SubmittablePointer, - { - // We "leak" the arc, so that when the handler runs, it can be safely turned back into an - // Arc, and the drop on the arc will then run. - let work = this.get_work(); - - // SAFETY: C the code does not perform moves on the data, and the `from_raw` below puts it - // back into a Pin when it reconstructs the Arc. - unsafe { - P::into_raw(this); - } + pub fn submit(self) -> crate::Result { + // Leak the arc, so that when the handler runs, it can be safely turned back into an Arc, + // and then the drop on the Arc will run. + // SAFETY: As we are leaking the pointer until the C code is done with it, it is safe to get + // the pointer to the raw work. + let work = unsafe { self.0.work.get() }; + let _ = Arc::into_raw(self.0); - // SAFETY: The Pin ensures this will not move. Our implementation of drop ensures that the - // work item is no longer queued when the data is dropped. let result = SubmitResult::to_result(unsafe { k_work_submit(work) }); - P::check_drop(work, &result); + Self::check_drop(work, &result); result } - /// Submit this work to a specified work queue. + /// Submit this work to the given work queue. /// - /// TODO: Change when we have better wrappers for work queues. - pub fn submit_to_queue

(this: P, queue: &'static WorkQueue) -> crate::Result - where - P: SubmittablePointer, - { - let work = this.get_work(); - - // "leak" the arc to give to C. We'll reconstruct it in the handler. - // SAFETY: The C code does not perform moves on the data, and the `from_raw` below puts it - // back into a Pin when it reconstructs the Arc. - unsafe { - P::into_raw(this); - } + /// This can return several possible `Ok` results. See the docs on [`SubmitResult`] for an + /// explanation of them. + pub fn submit_to_queue(self, queue: &'static WorkQueue) -> crate::Result { + // Leak the arc, so that when the handler runs, it can be safely turned back into an Arc, + // and then the drop on the Arc will run. + // SAFETY: As we are leaking the pointer until the C code is done with it, it is safe to get + // the pointer to the raw work. + let work = unsafe { self.0.work.get() }; + let _ = Arc::into_raw(self.0); - // SAFETY: The Pin ensures this will not move. Our implementation of drop ensures that the - // work item is no longer queued when the data is dropped. - let result = - SubmitResult::to_result(unsafe { k_work_submit_to_queue(queue.item.get(), work) }); + let result = SubmitResult::to_result(unsafe { k_work_submit_to_queue(queue.item.get(), work) }); - P::check_drop(work, &result); + Self::check_drop(work, &result); result } - /// Access the inner action. - pub fn action(&self) -> &T { - &self.action - } -} - -/// Capture the kinds of pointers that are safe to submit to work queues. -pub trait SubmittablePointer { - /// Create a new version of a pointer for this particular type. The pointer should be pinned - /// after this call, and can then be initialized and used by C code. - unsafe fn new_ptr(action: T, handler: k_work_handler_t) -> Self; - - /// Given a raw pointer to the work_q burried within, recover the Self pointer containing our - /// data. - unsafe fn from_raw(ptr: *const k_work) -> Self; - - /// Given our Self, indicate that this reference is now owned by the C code. For something like - /// Arc, this should leak a reference, and is the opposite of from_raw. - unsafe fn into_raw(self); - - /// Determine from the submitted work if this work has been enqueued, and if not, cause a "drop" - /// to happen on the Self pointer type. - fn check_drop(work: *const k_work, result: &crate::Result); - - /// Get the inner work pointer. - fn get_work(&self) -> *mut k_work; - - /// The low-level handler for this specific type. - extern "C" fn handler(work: *mut k_work); -} - -impl SubmittablePointer for Pin>> { - unsafe fn new_ptr(action: T, handler: k_work_handler_t) -> Self { - let work = >::new_raw(); - - unsafe { - let addr = work.get_uninit(); - (*addr).handler = handler; - } - - Arc::pin(Work { work, action }) - } - - fn get_work(&self) -> *mut k_work { - // SAFETY: The `get` method takes care of initialization as well as ensuring that the value - // is not moved since the first initialization. - unsafe { self.work.get() } - } - + /// Given the raw "C" work pointer, get a pointer back to our work item. unsafe fn from_raw(ptr: *const k_work) -> Self { - // SAFETY: This fixes the pointer back to the beginning of Self. This also assumes the - // pointer is valid. let ptr = ptr .cast::() .sub(mem::offset_of!(Work, work)) .cast::>(); let this = Arc::from_raw(ptr); - Pin::new_unchecked(this) - } - - unsafe fn into_raw(self) { - // SAFETY: The C code does not perform moves on the data, and the `from_raw` that gets back - // our Arc puts it back into the pin when it reconstructs the Arc. - let this = unsafe { Pin::into_inner_unchecked(self) }; - let _ = Arc::into_raw(this.clone()); + Self(this) } + /// Check if the C code has "dropped" it's reference, and drop our Arc reference as well. This + /// should detect the case where the work was not queued, and no callback, of this ownership, + /// will be called. fn check_drop(work: *const k_work, result: &crate::Result) { if matches!(result, Ok(SubmitResult::AlreadySubmitted) | Err(_)) { - // SAFETY: If the above submit indicates that it was already running, the work will not - // be submitted (no additional handle will be called). "un leak" the work so that it - // will be dropped. Also, any error indicates that the work did not enqueue. + // SAFETY: If the above matches, it indicates this work was already running, and someone + // other than the work itself is trying to submit it. In this case, there will be no + // callback that belongs to this particular context. Err also indicates that the work + // was not enqueued. unsafe { let this = Self::from_raw(work); drop(this); @@ -524,17 +454,66 @@ impl SubmittablePointer for Pin>> { } } + /// The handler for Arc based work. extern "C" fn handler(work: *mut k_work) { - // We want to avoid needing a `repr(C)` on our struct, so the `k_work` pointer is not - // necessarily at the beginning of the struct. - // SAFETY: Converts raw pointer to work back into the box. + // Reconstruct self out of the work. + // SAFETY: The submit functions will leak the arc any time C has ownership of the Work, and + // the C will relinquish that ownership when calling this handler. let this = unsafe { Self::from_raw(work) }; - // Access the action within, still pinned. - // SAFETY: It is safe to keep the pin on the interior. - let action = unsafe { this.as_ref().map_unchecked(|p| &p.action) }; + let action = &this.0.action; action.act(); + + // This will be dropped. + } +} + +impl Work { + /// Construct a new Work from the given action. + /// + /// Note that the data will be moved into the pinned Work. The data is internal, and only + /// accessible to the work thread (the `act` method). If shared data is needed, normal + /// inter-thread sharing mechanisms are needed. + /// + /// TODO: Can we come up with a way to allow sharing on the same worker using Rc instead of Arc? + pub fn new_arc(action: T) -> ArcWork { + let work = >::new_raw(); + + // SAFETY: Initializes the above zero-initialized struct. Initialization once is handled by + // ZephyrObject. + unsafe { + let addr = work.get_uninit(); + (*addr).handler = Some(ArcWork::::handler); + } + + let this = Arc::new(Work { work, action }); + + ArcWork(this) + } + + /// Access the inner action. + pub fn action(&self) -> &T { + &self.action + } +} + +/// Capture the kinds of pointers that are safe to submit to work queues. +pub trait SubmittablePointer { + /// Submit this work to the system work queue. + fn submit(self) -> crate::Result; + + /// Submit this work to the given work queue. + fn submit_to_queue(self, queue: &'static WorkQueue) -> crate::Result; +} + +impl SubmittablePointer for Arc> { + fn submit(self) -> crate::Result { + ArcWork(self).submit() + } + + fn submit_to_queue(self, queue: &'static WorkQueue) -> crate::Result { + ArcWork(self).submit_to_queue(queue) } } From e60848309897a30b04114c80297fd11bab481395 Mon Sep 17 00:00:00 2001 From: David Brown Date: Tue, 6 May 2025 16:30:06 -0600 Subject: [PATCH 14/15] zephyr: work: Add &'static Work Support enqueueing &'static Work as well as the Arc-based work. Signed-off-by: David Brown --- zephyr/src/work.rs | 55 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/zephyr/src/work.rs b/zephyr/src/work.rs index 5b14cefe..110f4b89 100644 --- a/zephyr/src/work.rs +++ b/zephyr/src/work.rs @@ -469,6 +469,37 @@ impl ArcWork { } } +/// Static Work. +/// +/// Work items can also be declared statically. Note that the work should only be submitted after +/// it has been moved to it's final static location. +pub struct StaticWork(pub &'static Work); + +impl StaticWork { + /// Submit this work to the system work queue. + pub fn submit(self) -> crate::Result { + SubmitResult::to_result(unsafe { k_work_submit(self.0.work.get()) }) + } + + /// Submit this work to the a specific work queue. + pub fn submit_to_queue(self, queue: &'static WorkQueue) -> crate::Result { + SubmitResult::to_result(unsafe { k_work_submit_to_queue(queue.item.get(), self.0.work.get()) }) + } + + /// The handler for static work. + extern "C" fn handler(work: *mut k_work) { + let ptr = unsafe { + work + .cast::() + .sub(mem::offset_of!(Work, work)) + .cast::>() + }; + let this = unsafe { &*ptr }; + let action = &this.action; + action.act(); + } +} + impl Work { /// Construct a new Work from the given action. /// @@ -491,6 +522,20 @@ impl Work { ArcWork(this) } +} + +impl Work { + /// Construct a static worker. + pub const fn new_static(action: T) -> Work { + let work = >::new_raw(); + + unsafe { + let addr = work.get_uninit(); + (*addr).handler = Some(StaticWork::::handler); + } + + Work { work, action } + } /// Access the inner action. pub fn action(&self) -> &T { @@ -517,6 +562,16 @@ impl SubmittablePointer for Arc> { } } +impl SubmittablePointer for &'static Work { + fn submit(self) -> crate::Result { + StaticWork(self).submit() + } + + fn submit_to_queue(self, queue: &'static WorkQueue) -> crate::Result { + StaticWork(self).submit_to_queue(queue) + } +} + impl ObjectInit for ZephyrObject { fn init(item: *mut k_work) { // SAFETY: The handler was stashed in this field when constructing. At this point, the item From 83f0e11c3f54f5c14dbb69fbc8eec25b0775d46d Mon Sep 17 00:00:00 2001 From: David Brown Date: Fri, 23 May 2025 13:23:05 -0600 Subject: [PATCH 15/15] zephyr: run cargo fmt Clean up formatting Signed-off-by: David Brown --- zephyr/src/work.rs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/zephyr/src/work.rs b/zephyr/src/work.rs index 110f4b89..b96342f1 100644 --- a/zephyr/src/work.rs +++ b/zephyr/src/work.rs @@ -48,9 +48,8 @@ use portable_atomic::AtomicBool; use portable_atomic_util::Arc; use zephyr_sys::{ k_poll_signal, k_poll_signal_check, k_poll_signal_init, k_poll_signal_raise, - k_poll_signal_reset, k_work, k_work_init, k_work_q, k_work_queue_config, - k_work_queue_init, k_work_queue_start, k_work_submit, k_work_submit_to_queue, - z_thread_stack_element, + k_poll_signal_reset, k_work, k_work_init, k_work_q, k_work_queue_config, k_work_queue_init, + k_work_queue_start, k_work_submit, k_work_submit_to_queue, z_thread_stack_element, }; use crate::{ @@ -421,7 +420,8 @@ impl ArcWork { let work = unsafe { self.0.work.get() }; let _ = Arc::into_raw(self.0); - let result = SubmitResult::to_result(unsafe { k_work_submit_to_queue(queue.item.get(), work) }); + let result = + SubmitResult::to_result(unsafe { k_work_submit_to_queue(queue.item.get(), work) }); Self::check_drop(work, &result); @@ -483,14 +483,15 @@ impl StaticWork { /// Submit this work to the a specific work queue. pub fn submit_to_queue(self, queue: &'static WorkQueue) -> crate::Result { - SubmitResult::to_result(unsafe { k_work_submit_to_queue(queue.item.get(), self.0.work.get()) }) + SubmitResult::to_result(unsafe { + k_work_submit_to_queue(queue.item.get(), self.0.work.get()) + }) } /// The handler for static work. extern "C" fn handler(work: *mut k_work) { let ptr = unsafe { - work - .cast::() + work.cast::() .sub(mem::offset_of!(Work, work)) .cast::>() }; @@ -529,7 +530,7 @@ impl Work { pub const fn new_static(action: T) -> Work { let work = >::new_raw(); - unsafe { + unsafe { let addr = work.get_uninit(); (*addr).handler = Some(StaticWork::::handler); }