diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index 0e65bd73..26316fe1 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -45,7 +45,8 @@ jobs: working-directory: zephyr-lang-rust run: | # Note that the above build doesn't set Zephyrbase, so we'll need to do that here. - west build -t rustdoc -b qemu_cortex_m3 docgen + west build -b qemu_cortex_m3 docgen + west build -t rustdoc mkdir rustdocs mv build/rust/target/thumbv7m-none-eabi/doc rustdocs/nostd diff --git a/samples/bench/src/lib.rs b/samples/bench/src/lib.rs index 432384fe..de02d7ba 100644 --- a/samples/bench/src/lib.rs +++ b/samples/bench/src/lib.rs @@ -10,17 +10,16 @@ extern crate alloc; use core::mem; -use core::pin::Pin; use alloc::collections::vec_deque::VecDeque; use alloc::vec; use executor::AsyncTests; use static_cell::StaticCell; -use zephyr::kobj_define; +use zephyr::define_work_queue; use zephyr::raw::k_yield; -use zephyr::sync::{PinWeak, SpinMutex}; +use zephyr::sync::{SpinMutex, Weak}; use zephyr::time::NoWait; -use zephyr::work::{SimpleAction, Work, WorkQueueBuilder}; +use zephyr::work::{ArcWork, SimpleAction, Work}; use zephyr::{ kconfig::CONFIG_SYS_CLOCK_HW_CYCLES_PER_SEC, printkln, @@ -80,7 +79,7 @@ extern "C" fn rust_main() { spin_bench(); sem_bench(); - let simple = Simple::new(tester.workq.clone()); + let simple = Simple::new(tester.workq); let mut num = 6; while num < 250 { simple.run(num, TOTAL_ITERS / num); @@ -147,7 +146,7 @@ struct ThreadTests { high_command: Sender, /// A work queue for the main runners. - workq: Arc, + workq: &'static WorkQueue, /// The test also all return their result to the main. The threads Send, the main running /// receives. @@ -163,15 +162,7 @@ impl ThreadTests { let (low_send, low_recv) = bounded(1); let (high_send, high_recv) = bounded(1); - let workq = Arc::new( - WorkQueueBuilder::new() - .set_priority(5) - .set_no_yield(true) - .start(WORK_STACK.init_once(()).unwrap()), - ); - - // Leak the workqueue so it doesn't get dropped. - let _ = Arc::into_raw(workq.clone()); + let workq = WORKQ.start(); let mut result = Self { sems: &SEMS, @@ -581,32 +572,32 @@ enum TestResult { /// The Simple test just does a ping pong test using manually submitted work. struct Simple { - workq: Arc, + workq: &'static WorkQueue, } impl Simple { - fn new(workq: Arc) -> Self { + fn new(workq: &'static WorkQueue) -> Self { Self { workq } } fn run(&self, workers: usize, iterations: usize) { // printkln!("Running Simple"); - let main = Work::new(SimpleMain::new(workers * iterations, self.workq.clone())); + let main = Work::new_arc(SimpleMain::new(workers * iterations, self.workq)); let children: VecDeque<_> = (0..workers) - .map(|n| Work::new(SimpleWorker::new(main.clone(), self.workq.clone(), n))) + .map(|n| Work::new_arc(SimpleWorker::new(main.0.clone(), self.workq, n)).0) .collect(); - let mut locked = main.action().locked.lock().unwrap(); + let mut locked = main.0.action().locked.lock().unwrap(); let _ = mem::replace(&mut locked.works, children); drop(locked); let start = now(); // Fire off main, which will run everything. - Work::submit_to_queue(main.clone(), &self.workq).unwrap(); + main.clone().submit_to_queue(self.workq).unwrap(); // And wait for the completion semaphore. - main.action().done.take(Forever).unwrap(); + main.0.action().done.take(Forever).unwrap(); let stop = now(); let time = (stop - start) as f64 / (CONFIG_SYS_CLOCK_HW_CYCLES_PER_SEC as f64) * 1000.0; @@ -627,29 +618,28 @@ impl Simple { ); // Before we go away, make sure that there aren't any leaked workers. - /* - let mut locked = main.action().locked.lock().unwrap(); + let mut locked = main.0.action().locked.lock().unwrap(); while let Some(other) = locked.works.pop_front() { - // Portable atomic's Arc seems to be a problem here. - let other = unsafe { Pin::into_inner_unchecked(other) }; assert_eq!(Arc::strong_count(&other), 1); - // printkln!("Child: {} refs", Arc::strong_count(&other)); } - */ + drop(locked); + + // And nothing has leaked main, either. + assert_eq!(Arc::strong_count(&main.0), 1); } } /// A simple worker. When run, it submits the main worker to do the next work. struct SimpleWorker { - main: PinWeak>, - workq: Arc, + main: Weak>, + workq: &'static WorkQueue, _id: usize, } impl SimpleWorker { - fn new(main: Pin>>, workq: Arc, id: usize) -> Self { + fn new(main: Arc>, workq: &'static WorkQueue, id: usize) -> Self { Self { - main: PinWeak::downgrade(main), + main: Arc::downgrade(&main), workq, _id: id, } @@ -657,10 +647,10 @@ impl SimpleWorker { } impl SimpleAction for SimpleWorker { - fn act(self: Pin<&Self>) { + fn act(self: &Self) { // Each time we are run, fire the main worker back up. - let main = self.main.upgrade().unwrap(); - Work::submit_to_queue(main.clone(), &self.workq).unwrap(); + let main = ArcWork(self.main.upgrade().unwrap()); + main.clone().submit_to_queue(self.workq).unwrap(); } } @@ -670,12 +660,12 @@ impl SimpleAction for SimpleWorker { struct SimpleMain { /// All of the work items. locked: SpinMutex, - workq: Arc, + workq: &'static WorkQueue, done: Semaphore, } impl SimpleAction for SimpleMain { - fn act(self: Pin<&Self>) { + fn act(self: &Self) { // Each time, take a worker from the queue, and submit it. let mut lock = self.locked.lock().unwrap(); @@ -690,12 +680,12 @@ impl SimpleAction for SimpleMain { lock.count -= 1; drop(lock); - Work::submit_to_queue(worker.clone(), &self.workq).unwrap(); + ArcWork(worker.clone()).submit_to_queue(self.workq).unwrap(); } } impl SimpleMain { - fn new(count: usize, workq: Arc) -> Self { + fn new(count: usize, workq: &'static WorkQueue) -> Self { Self { locked: SpinMutex::new(Locked::new(count)), done: Semaphore::new(0, 1), @@ -705,7 +695,7 @@ impl SimpleMain { } struct Locked { - works: VecDeque>>>, + works: VecDeque>>, count: usize, } @@ -812,9 +802,7 @@ impl<'a> BenchTimer<'a> { } } -kobj_define! { - static WORK_STACK: ThreadStack; -} +define_work_queue!(WORKQ, WORK_STACK_SIZE, priority = 5, no_yield = true); static SEMS: [Semaphore; NUM_THREADS] = [const { Semaphore::new(0, u32::MAX) }; NUM_THREADS]; static BACK_SEMS: [Semaphore; NUM_THREADS] = [const { Semaphore::new(0, u32::MAX) }; NUM_THREADS]; diff --git a/samples/embassy/Cargo.toml b/samples/embassy/Cargo.toml index 6301463e..beeee526 100644 --- a/samples/embassy/Cargo.toml +++ b/samples/embassy/Cargo.toml @@ -28,17 +28,16 @@ features = [ [dependencies.embassy-futures] version = "0.1.1" -# path = "../../embassy/embassy-futures" [dependencies.embassy-sync] version = "0.6.2" -# path = "../../embassy/embassy-sync" [dependencies.embassy-time] version = "0.4.0" -# path = "../../embassy/embassy-time" -# This is board specific. -features = ["tick-hz-10_000"] +# For real builds, you should figure out your target's tick rate and set the appropriate feature, +# like in these examples. Without this, embassy-time will assume a 1Mhz tick rate, and every time +# operation will involve a conversion. +#features = ["tick-hz-10_000"] [dependencies.critical-section] version = "1.2" diff --git a/tests/drivers/gpio-async/README.md b/tests/drivers/gpio-async/README.md new file mode 100644 index 00000000..de7a3836 --- /dev/null +++ b/tests/drivers/gpio-async/README.md @@ -0,0 +1,6 @@ +# Async gpio + +This demo makes use of the GPIO `wait_for_high()` and `wait_for_low()` async operations. + +Unfortunately, not all GPIO controllers support level triggered interrupts. Notably, most of the +stm32 line does not support level triggered interrupts. diff --git a/zephyr-build/src/devicetree/augment.rs b/zephyr-build/src/devicetree/augment.rs index 2881da08..7b333a4c 100644 --- a/zephyr-build/src/devicetree/augment.rs +++ b/zephyr-build/src/devicetree/augment.rs @@ -28,6 +28,12 @@ pub trait Augment { /// The default implementation checks if this node matches and calls a generator if it does, or /// does nothing if not. fn augment(&self, node: &Node, tree: &DeviceTree) -> TokenStream { + // If there is a status field present, and it is not set to "okay", don't augment this node. + if let Some(status) = node.get_single_string("status") { + if status != "okay" { + return TokenStream::new(); + } + } if self.is_compatible(node) { self.generate(node, tree) } else { diff --git a/zephyr-macros/Cargo.toml b/zephyr-macros/Cargo.toml index a9c386b0..72c2a1b6 100644 --- a/zephyr-macros/Cargo.toml +++ b/zephyr-macros/Cargo.toml @@ -9,7 +9,7 @@ descriptions = "Macros for managing tasks and work queues in Zephyr" proc-macro = true [dependencies] -syn = { version = "2.0.85", features = ["full", "visit"] } +syn = { version = "2.0.79", features = ["full", "visit"] } quote = "1.0.37" proc-macro2 = "1.0.86" darling = "0.20.1" diff --git a/zephyr/src/embassy.rs b/zephyr/src/embassy.rs index 8bbc2546..d924ae95 100644 --- a/zephyr/src/embassy.rs +++ b/zephyr/src/embassy.rs @@ -71,7 +71,7 @@ //! //! ## Caveats //! -//! [`Semaphore::take_async`]: crate::sys::sync::Semaphore::take_async +//! The executor currently doesn't support async waits on Zephyr primitives, such as Semaphore. #[cfg(feature = "time-driver")] mod time_driver; diff --git a/zephyr/src/embassy/time_driver.rs b/zephyr/src/embassy/time_driver.rs index c9e73247..f5616137 100644 --- a/zephyr/src/embassy/time_driver.rs +++ b/zephyr/src/embassy/time_driver.rs @@ -14,6 +14,18 @@ use embassy_time_queue_utils::Queue; use crate::raw::{k_timeout_t, k_timer, k_timer_init, k_timer_start}; use crate::sys::K_FOREVER; +/// The time base configured into Zephyr. +pub const ZEPHYR_TICK_HZ: u64 = crate::time::SYS_FREQUENCY as u64; + +/// The configured Embassy time tick rate. +pub const EMBASSY_TICK_HZ: u64 = embassy_time_driver::TICK_HZ; + +/// When the zephyr and embassy rates differ, use this intermediate type. This can be selected by +/// feature. At the worst case, with Embassy's tick at 1Mhz, and Zephyr's at 50k, it is a little +/// over 11 years. Higher of either will reduce that further. But, 128-bit arithmetic is fairly +/// inefficient. +type InterTime = u128; + embassy_time_driver::time_driver_impl!(static DRIVER: ZephyrTimeDriver = ZephyrTimeDriver { queue: Mutex::new(RefCell::new(Queue::new())), timer: Mutex::new(RefCell::new(unsafe { mem::zeroed() })), @@ -63,9 +75,40 @@ impl ZTimer { } } +/// Convert from a zephyr tick count, to an embassy tick count. +/// +/// This is done using an intermediate type defined above. +/// This conversion truncates. +fn zephyr_to_embassy(ticks: u64) -> u64 { + if ZEPHYR_TICK_HZ == EMBASSY_TICK_HZ { + // This should happen at compile time. + return ticks; + } + + // Otherwise do the intermediate conversion. + let prod = (ticks as InterTime) * (EMBASSY_TICK_HZ as InterTime); + (prod / (ZEPHYR_TICK_HZ as InterTime)) as u64 +} + +/// Convert from an embassy tick count to a zephyr. +/// +/// This conversion use ceil so that values are always large enough. +fn embassy_to_zephyr(ticks: u64) -> u64 { + if ZEPHYR_TICK_HZ == EMBASSY_TICK_HZ { + return ticks; + } + + let prod = (ticks as InterTime) * (ZEPHYR_TICK_HZ as InterTime); + prod.div_ceil(EMBASSY_TICK_HZ as InterTime) as u64 +} + +fn zephyr_now() -> u64 { + crate::time::now().ticks() +} + impl Driver for ZephyrTimeDriver { fn now(&self) -> u64 { - crate::time::now().ticks() + zephyr_to_embassy(zephyr_now()) } fn schedule_wake(&self, at: u64, waker: &core::task::Waker) { @@ -73,10 +116,13 @@ impl Driver for ZephyrTimeDriver { let mut queue = self.queue.borrow(cs).borrow_mut(); let mut timer = self.timer.borrow(cs).borrow_mut(); + // All times below are in Zephyr units. + let at = embassy_to_zephyr(at); + if queue.schedule_wake(at, waker) { - let mut next = queue.next_expiration(self.now()); - while !timer.set_alarm(next, self.now()) { - next = queue.next_expiration(self.now()); + let mut next = queue.next_expiration(zephyr_now()); + while !timer.set_alarm(next, zephyr_now()) { + next = queue.next_expiration(zephyr_now()); } } }) @@ -89,9 +135,9 @@ impl ZephyrTimeDriver { let mut queue = self.queue.borrow(cs).borrow_mut(); let mut timer = self.timer.borrow(cs).borrow_mut(); - let mut next = queue.next_expiration(self.now()); - while !timer.set_alarm(next, self.now()) { - next = queue.next_expiration(self.now()); + let mut next = queue.next_expiration(zephyr_now()); + while !timer.set_alarm(next, zephyr_now()) { + next = queue.next_expiration(zephyr_now()); } }) } diff --git a/zephyr/src/lib.rs b/zephyr/src/lib.rs index cccdfaf6..2764e1e7 100644 --- a/zephyr/src/lib.rs +++ b/zephyr/src/lib.rs @@ -39,7 +39,7 @@ //! level operation that is still quite useful in regular code. //! - [`timer`]: Rust interfaces to Zephyr timers. These timers can be used either by registering a //! callback, or polled or waited for for an elapsed time. -//! - [`work`]: Zephyr work queues for Rust. The [`work::WorkQueueBuilder`] and resulting +//! - [`work`]: Zephyr work queues for Rust. The [`define_work_queue`] macro and resulting //! [`work::WorkQueue`] allow creation of Zephyr work queues to be used from Rust. The //! [`work::Work`] item had an action that will be invoked by the work queue, and can be manually //! submitted when needed. diff --git a/zephyr/src/sync.rs b/zephyr/src/sync.rs index 001ea8bc..6924f83e 100644 --- a/zephyr/src/sync.rs +++ b/zephyr/src/sync.rs @@ -57,6 +57,21 @@ mod pinweak { .upgrade() .map(|arc| unsafe { Pin::new_unchecked(arc) }) } + + /// Equivalent to [`Weak::strong_count`] + pub fn strong_count(&self) -> usize { + self.0.strong_count() + } + + /// Equivalent to [`Weak::weak_count`] + pub fn weak_count(&self) -> usize { + self.0.weak_count() + } + + /// Equivalent to [`Weak::ptr_eq`] + pub fn ptr_eq(&self, other: &Self) -> bool { + self.0.ptr_eq(&other.0) + } } } diff --git a/zephyr/src/work.rs b/zephyr/src/work.rs index c3ed8d20..b96342f1 100644 --- a/zephyr/src/work.rs +++ b/zephyr/src/work.rs @@ -17,227 +17,152 @@ //! having the `k_work` embedded in their structure, and Zephyr schedules the work when the given //! reason happens. //! -//! At this time, only the basic work queue type is supported. +//! At this point, this code supports the simple work queues, with [`Work`] items. //! -//! Zephyr's work queues can be used in different ways: +//! Work Queues should be declared with the `define_work_queue!` macro, this macro requires the name +//! of the symbol for the work queue, the stack size, and then zero or more optional arguments, +//! defined by the fields in the [`WorkQueueDeclArgs`] struct. For example: //! -//! - Work can be scheduled as needed. For example, an IRQ handler can queue a work item to process -//! data it has received from a device. -//! - Work can be scheduled periodically. -//! -//! As most C use of Zephyr statically allocates things like work, these are typically rescheduled -//! when the work is complete. The work queue scheduling functions are designed, and intended, for -//! a given work item to be able to reschedule itself, and such usage is common. -//! -//! ## Ownership -//! -//! The remaining challenge with implementing `k_work` for Rust is that of ownership. The model -//! taken here is that the work items are held in a `Box` that is effectively owned by the work -//! itself. When the work item is scheduled to Zephyr, ownership of that box is effectively handed -//! off to C, and then when the work item is called, the Box re-constructed. This repeats until the -//! work is no longer needed, at which point the work will be dropped. -//! -//! There are two common ways the lifecycle of work can be managed in an embedded system: -//! -//! - A set of `Future`'s are allocated once at the start, and these never return a value. Work -//! Futures inside of this (which correspond to `.await` in async code) can have lives and return -//! values, but the main loops will not return values, or be dropped. Embedded Futures will -//! typically not be boxed. -//! -//! One consequence of the ownership being passed through to C code is that if the work cancellation -//! mechanism is used on a work queue, the work items themselves will be leaked. -//! -//! These work items are also `Pin`, to ensure that the work actions are not moved. -//! -//! ## The work queues themselves -//! -//! Workqueues themselves are built using [`WorkQueueBuilder`]. This needs a statically defined -//! stack. Typical usage will be along the lines of: //! ```rust -//! kobj_define! { -//! WORKER_STACK: ThreadStack<2048>; -//! } -//! // ... -//! let main_worker = Box::new( -//! WorkQueueBuilder::new() -//! .set_priority(2). -//! .set_name(c"mainloop") -//! .set_no_yield(true) -//! .start(MAIN_LOOP_STACK.init_once(()).unwrap()) -//! ); -//! -//! let _ = zephyr::kio::spawn( -//! mainloop(), // Async or function returning Future. -//! &main_worker, -//! c"w:mainloop", -//! ); -//! -//! ... -//! -//! // Leak the Box so that the worker is never freed. -//! let _ = Box::leak(main_worker); +//! define_work_queue!(MY_WORKQ, 2048, no_yield = true, priority = 2); //! ``` //! -//! It is important that WorkQueues never be dropped. It has a Drop implementation that invokes -//! panic. Zephyr provides no mechanism to stop work queue threads, so dropping would result in -//! undefined behavior. -//! -//! # Current Status -//! -//! Although Zephyr has 3 types of work queues, the `k_work_poll` is sufficient to implement all of -//! the behavior, and this implementation only implements this type. Non Future work could be built -//! around the other work types. -//! -//! As such, this means that manually constructed work is still built using `Future`. The `_async` -//! primitives throughout this crate can be used just as readily by hand-written Futures as by async -//! code. Notable, the use of [`Signal`] will likely be common, along with possible timeouts. -//! -//! [`sys::sync::Semaphore`]: crate::sys::sync::Semaphore -//! [`sync::channel`]: crate::sync::channel -//! [`sync::Mutex`]: crate::sync::Mutex -//! [`join`]: futures::JoinHandle::join -//! [`join_async`]: futures::JoinHandle::join_async +//! Then, in code, the work queue can be started, and used to issue work. +//! ```rust +//! let my_workq = MY_WORKQ.start(); +//! let action = Work::new(action_item); +//! action.submit(my_workq); +//! ``` extern crate alloc; use core::{ - cell::UnsafeCell, - ffi::{c_int, c_uint, CStr}, + cell::{RefCell, UnsafeCell}, + ffi::{c_char, c_int, c_uint}, mem, - pin::Pin, - ptr, + sync::atomic::Ordering, }; +use critical_section::Mutex; +use portable_atomic::AtomicBool; +use portable_atomic_util::Arc; use zephyr_sys::{ k_poll_signal, k_poll_signal_check, k_poll_signal_init, k_poll_signal_raise, k_poll_signal_reset, k_work, k_work_init, k_work_q, k_work_queue_config, k_work_queue_init, - k_work_queue_start, k_work_submit, k_work_submit_to_queue, + k_work_queue_start, k_work_submit, k_work_submit_to_queue, z_thread_stack_element, }; use crate::{ error::to_result_void, - object::Fixed, + object::{Fixed, ObjectInit, ZephyrObject}, simpletls::SimpleTls, - sync::{Arc, Mutex}, - sys::thread::ThreadStack, }; -/// A builder for work queues themselves. -/// -/// A work queue is a Zephyr thread that instead of directly running a piece of code, manages a work -/// queue. Various types of `Work` can be submitted to these queues, along with various types of -/// triggering conditions. -pub struct WorkQueueBuilder { - /// The "config" value passed in. - config: k_work_queue_config, - /// Priority for the work queue thread. - priority: c_int, +/// The WorkQueue decl args as a struct, so we can have a default, and the macro can fill in those +/// specified by the user. +pub struct WorkQueueDeclArgs { + /// Should this work queue call yield after each queued item. + pub no_yield: bool, + /// Is this work queue thread "essential". + /// + /// Threads marked essential will panic if they stop running. + pub essential: bool, + /// Zephyr thread priority for the work queue thread. + pub priority: c_int, } -impl WorkQueueBuilder { - /// Construct a new WorkQueueBuilder with default values. - pub fn new() -> Self { +impl WorkQueueDeclArgs { + /// Like `Default::default`, but const. + pub const fn default_values() -> Self { Self { - config: k_work_queue_config { - name: ptr::null(), - no_yield: false, - essential: false, - }, + no_yield: false, + essential: false, priority: 0, } } +} - /// Set the name for the WorkQueue thread. - /// - /// This name shows up in debuggers and some analysis tools. - pub fn set_name(&mut self, name: &'static CStr) -> &mut Self { - self.config.name = name.as_ptr(); - self - } - - /// Set the "no yield" flag for the created worker. - /// - /// If this is not set, the work queue will call `k_yield` between each enqueued work item. For - /// non-preemptible threads, this will allow other threads to run. For preemptible threads, - /// this will allow other threads at the same priority to run. - /// - /// This method has a negative in the name, which goes against typical conventions. This is - /// done to match the field in the Zephyr config. - pub fn set_no_yield(&mut self, value: bool) -> &mut Self { - self.config.no_yield = value; - self - } - - /// Set the "essential" flag for the created worker. - /// - /// This sets the essential flag on the running thread. The system considers the termination of - /// an essential thread to be a fatal error. - pub fn set_essential(&mut self, value: bool) -> &mut Self { - self.config.essential = value; - self - } - - /// Set the priority for the worker thread. - /// - /// See the Zephyr docs for the meaning of priority. - pub fn set_priority(&mut self, value: c_int) -> &mut Self { - self.priority = value; - self - } +/// A static declaration of a work-queue. This associates a work queue, with a stack, and an atomic +/// to determine if it has been initialized. +// TODO: Remove the pub on the fields, and make a constructor. +pub struct WorkQueueDecl { + queue: WorkQueue, + stack: &'static crate::thread::ThreadStack, + config: k_work_queue_config, + priority: c_int, + started: AtomicBool, +} - /// Start the given work queue thread. - /// - /// TODO: Implement a 'start' that works from a static work queue. - pub fn start(&self, stack: ThreadStack) -> WorkQueue { - let item: Fixed = Fixed::new(unsafe { mem::zeroed() }); - unsafe { - // SAFETY: Initialize zeroed memory. - k_work_queue_init(item.get()); - - // SAFETY: This associates the workqueue with the thread ID that runs it. The thread is - // a pointer into this work item, which will not move, because of the Fixed. - let this = &mut *item.get(); - WORK_QUEUES - .lock() - .unwrap() - .insert(&this.thread, WorkQueueRef(item.get())); - - // SAFETY: Start work queue thread. The main issue here is that the work queue cannot - // be deallocated once the thread has started. We enforce this by making Drop panic. - k_work_queue_start( - item.get(), - stack.base, - stack.size, - self.priority, - &self.config, - ); +// SAFETY: Sync is needed here to make a static declaration, despite the `*const i8` that is burried +// in the config. +unsafe impl Sync for WorkQueueDecl {} + +impl WorkQueueDecl { + /// Static constructor. Mostly for use by the macro. + pub const fn new( + stack: &'static crate::thread::ThreadStack, + name: *const c_char, + args: WorkQueueDeclArgs, + ) -> Self { + Self { + queue: unsafe { mem::zeroed() }, + stack, + config: k_work_queue_config { + name, + no_yield: args.no_yield, + essential: args.essential, + }, + priority: args.priority, + started: AtomicBool::new(false), } + } - WorkQueue { item } + /// Start the work queue thread, if needed, and return a reference to it. + pub fn start(&'static self) -> &'static WorkQueue { + critical_section::with(|cs| { + if self.started.load(Ordering::Relaxed) { + // Already started, just return it. + return &self.queue; + } + + // SAFETY: Starting is coordinated by the atomic, as well as being protected in a + // critical section. + unsafe { + let this = &mut *self.queue.item.get(); + + k_work_queue_init(self.queue.item.get()); + + // Add to the WORK_QUEUES data. That needs to be changed to a critical + // section Mutex from a Zephyr Mutex, as that would deadlock if called while in a + // critrical section. + let mut tls = WORK_QUEUES.borrow_ref_mut(cs); + tls.insert(&this.thread, WorkQueueRef(self.queue.item.get())); + + // Start the work queue thread. + k_work_queue_start( + self.queue.item.get(), + self.stack.data.get() as *mut z_thread_stack_element, + self.stack.size(), + self.priority, + &self.config, + ); + } + + &self.queue + }) } } /// A running work queue thread. /// -/// # Panic +/// This must be declared statically, and initialized once. Please see the macro +/// [`define_work_queue`] which declares this with a [`WorkQueue`] to help with the +/// association with a stack, and making sure the queue is only started once. /// -/// Allowing a work queue to drop will result in a panic. There are two ways to handle this, -/// depending on whether the WorkQueue is in a Box, or an Arc: -/// ``` -/// // Leak a work queue in an Arc. -/// let wq = Arc::new(WorkQueueBuilder::new().start(...)); -/// // If the Arc is used after this: -/// let _ = Arc::into_raw(wq.clone()); -/// // If the Arc is no longer needed: -/// let _ = Arc::into_raw(wq); -/// -/// // Leak a work queue in a Box. -/// let wq = Box::new(WorkQueueBuilder::new().start(...)); -/// let _ = Box::leak(wq); -/// ``` +/// [`define_work_queue`]: crate::define_work_queue pub struct WorkQueue { #[allow(dead_code)] - item: Fixed, + item: UnsafeCell, } /// Work queues can be referenced from multiple threads, and thus are Send and Sync. @@ -265,7 +190,8 @@ impl Drop for WorkQueue { /// /// This is a little bit messy as we don't have a lazy mechanism, so we have to handle this a bit /// manually right now. -static WORK_QUEUES: Mutex> = Mutex::new(SimpleTls::new()); +static WORK_QUEUES: Mutex>> = + Mutex::new(RefCell::new(SimpleTls::new())); /// For the queue mapping, we need a simple wrapper around the underlying pointer, one that doesn't /// implement stop. @@ -278,7 +204,7 @@ unsafe impl Sync for WorkQueueRef {} /// Retrieve the current work queue, if we are running within one. pub fn get_current_workq() -> Option<*mut k_work_q> { - WORK_QUEUES.lock().unwrap().get().map(|wq| wq.0) + critical_section::with(|cs| WORK_QUEUES.borrow_ref(cs).get().map(|wq| wq.0)) } /// A Rust wrapper for `k_poll_signal`. @@ -417,7 +343,7 @@ impl SubmitResult { /// below uses an Arc, so this data can be shared. pub trait SimpleAction { /// Perform the action. - fn act(self: Pin<&Self>); + fn act(self: &Self); } /// A basic Zephyr work item. @@ -425,7 +351,7 @@ pub trait SimpleAction { /// Holds a `k_work`, along with the data associated with that work. When the work is queued, the /// `act` method will be called on the provided `SimpleAction`. pub struct Work { - work: UnsafeCell, + work: ZephyrObject, action: T, } @@ -445,100 +371,171 @@ where { } -impl Work { - /// Construct a new Work from the given action. - /// - /// Note that the data will be moved into the pinned Work. The data is internal, and only - /// accessible to the work thread (the `act` method). If shared data is needed, normal - /// inter-thread sharing mechanisms are needed. - /// - /// TODO: Can we come up with a way to allow sharing on the same worker using Rc instead of Arc? - pub fn new(action: T) -> Pin> { - let this = Arc::pin(Self { - // SAFETY: will be initialized below, after this is pinned. - work: unsafe { mem::zeroed() }, - action, - }); - - // SAFETY: Initializes above zero-initialized struct. - unsafe { - k_work_init(this.work.get(), Some(Self::handler)); - } - - this +/// Arc held work. +/// +/// Because C code takes ownership of the work, we only support submitting work with very specific +/// pointer types. This wraps an Arc holding work to allow Work held in an Arc to be queued. +/// Earlier versions of this required the work to be `Pin>`, however, we use +/// [`ZephyrObject`] to hold work items, anyway, and this already does a runtime check to prevents +/// moves, so we can safely avoid needing to use `Pin`. However, note that if the work is moved +/// between it's first use, and subsequent use, it will panic. +pub struct ArcWork(pub Arc>); + +/// Clone just passes the clone to the arc. +impl Clone for ArcWork { + fn clone(&self) -> Self { + ArcWork(self.0.clone()) } +} +impl ArcWork { /// Submit this work to the system work queue. /// /// This can return several possible `Ok` results. See the docs on [`SubmitResult`] for an /// explanation of them. - pub fn submit(this: Pin>) -> crate::Result { - // We "leak" the arc, so that when the handler runs, it can be safely turned back into an - // Arc, and the drop on the arc will then run. - let work = this.work.get(); - - // SAFETY: C the code does not perform moves on the data, and the `from_raw` below puts it - // back into a Pin when it reconstructs the Arc. - let this = unsafe { Pin::into_inner_unchecked(this) }; - let _ = Arc::into_raw(this); - - // SAFETY: The Pin ensures this will not move. Our implementation of drop ensures that the - // work item is no longer queued when the data is dropped. - SubmitResult::to_result(unsafe { k_work_submit(work) }) + pub fn submit(self) -> crate::Result { + // Leak the arc, so that when the handler runs, it can be safely turned back into an Arc, + // and then the drop on the Arc will run. + // SAFETY: As we are leaking the pointer until the C code is done with it, it is safe to get + // the pointer to the raw work. + let work = unsafe { self.0.work.get() }; + let _ = Arc::into_raw(self.0); + + let result = SubmitResult::to_result(unsafe { k_work_submit(work) }); + + Self::check_drop(work, &result); + + result } - /// Submit this work to a specified work queue. + /// Submit this work to the given work queue. /// - /// TODO: Change when we have better wrappers for work queues. - pub fn submit_to_queue(this: Pin>, queue: &WorkQueue) -> crate::Result { - let work = this.work.get(); - - // "leak" the arc to give to C. We'll reconstruct it in the handler. - // SAFETY: The C code does not perform moves on the data, and the `from_raw` below puts it - // back into a Pin when it reconstructs the Arc. - let this = unsafe { Pin::into_inner_unchecked(this) }; - let _ = Arc::into_raw(this); - - // SAFETY: The Pin ensures this will not move. Our implementation of drop ensures that the - // work item is no longer queued when the data is dropped. - SubmitResult::to_result(unsafe { k_work_submit_to_queue(queue.item.get(), work) }) + /// This can return several possible `Ok` results. See the docs on [`SubmitResult`] for an + /// explanation of them. + pub fn submit_to_queue(self, queue: &'static WorkQueue) -> crate::Result { + // Leak the arc, so that when the handler runs, it can be safely turned back into an Arc, + // and then the drop on the Arc will run. + // SAFETY: As we are leaking the pointer until the C code is done with it, it is safe to get + // the pointer to the raw work. + let work = unsafe { self.0.work.get() }; + let _ = Arc::into_raw(self.0); + + let result = + SubmitResult::to_result(unsafe { k_work_submit_to_queue(queue.item.get(), work) }); + + Self::check_drop(work, &result); + + result + } + + /// Given the raw "C" work pointer, get a pointer back to our work item. + unsafe fn from_raw(ptr: *const k_work) -> Self { + let ptr = ptr + .cast::() + .sub(mem::offset_of!(Work, work)) + .cast::>(); + let this = Arc::from_raw(ptr); + Self(this) + } + + /// Check if the C code has "dropped" it's reference, and drop our Arc reference as well. This + /// should detect the case where the work was not queued, and no callback, of this ownership, + /// will be called. + fn check_drop(work: *const k_work, result: &crate::Result) { + if matches!(result, Ok(SubmitResult::AlreadySubmitted) | Err(_)) { + // SAFETY: If the above matches, it indicates this work was already running, and someone + // other than the work itself is trying to submit it. In this case, there will be no + // callback that belongs to this particular context. Err also indicates that the work + // was not enqueued. + unsafe { + let this = Self::from_raw(work); + drop(this); + } + } } - /// Callback, through C, but bound by a specific type. + /// The handler for Arc based work. extern "C" fn handler(work: *mut k_work) { - // We want to avoid needing a `repr(C)` on our struct, so the `k_work` pointer is not - // necessarily at the beginning of the struct. - // SAFETY: Converts raw pointer to work back into the box. + // Reconstruct self out of the work. + // SAFETY: The submit functions will leak the arc any time C has ownership of the Work, and + // the C will relinquish that ownership when calling this handler. let this = unsafe { Self::from_raw(work) }; - // Access the action within, still pinned. - // SAFETY: It is safe to keep the pin on the interior. - let action = unsafe { this.as_ref().map_unchecked(|p| &p.action) }; + let action = &this.0.action; action.act(); + + // This will be dropped. } +} + +/// Static Work. +/// +/// Work items can also be declared statically. Note that the work should only be submitted after +/// it has been moved to it's final static location. +pub struct StaticWork(pub &'static Work); - /* - /// Consume this Arc, returning the internal pointer. Needs to have a complementary `from_raw` - /// called to avoid leaking the item. - fn into_raw(this: Pin>) -> *const Self { - // SAFETY: This removes the Pin guarantee, but is given as a raw pointer to C, which doesn't - // generally use move. - let this = unsafe { Pin::into_inner_unchecked(this) }; - Arc::into_raw(this) +impl StaticWork { + /// Submit this work to the system work queue. + pub fn submit(self) -> crate::Result { + SubmitResult::to_result(unsafe { k_work_submit(self.0.work.get()) }) } - */ - /// Given a pointer to the work_q burried within, recover the Pinned Box containing our data. - unsafe fn from_raw(ptr: *const k_work) -> Pin> { - // SAFETY: This fixes the pointer back to the beginning of Self. This also assumes the - // pointer is valid. - let ptr = ptr - .cast::() - .sub(mem::offset_of!(Self, work)) - .cast::(); - let this = Arc::from_raw(ptr); - Pin::new_unchecked(this) + /// Submit this work to the a specific work queue. + pub fn submit_to_queue(self, queue: &'static WorkQueue) -> crate::Result { + SubmitResult::to_result(unsafe { + k_work_submit_to_queue(queue.item.get(), self.0.work.get()) + }) + } + + /// The handler for static work. + extern "C" fn handler(work: *mut k_work) { + let ptr = unsafe { + work.cast::() + .sub(mem::offset_of!(Work, work)) + .cast::>() + }; + let this = unsafe { &*ptr }; + let action = &this.action; + action.act(); + } +} + +impl Work { + /// Construct a new Work from the given action. + /// + /// Note that the data will be moved into the pinned Work. The data is internal, and only + /// accessible to the work thread (the `act` method). If shared data is needed, normal + /// inter-thread sharing mechanisms are needed. + /// + /// TODO: Can we come up with a way to allow sharing on the same worker using Rc instead of Arc? + pub fn new_arc(action: T) -> ArcWork { + let work = >::new_raw(); + + // SAFETY: Initializes the above zero-initialized struct. Initialization once is handled by + // ZephyrObject. + unsafe { + let addr = work.get_uninit(); + (*addr).handler = Some(ArcWork::::handler); + } + + let this = Arc::new(Work { work, action }); + + ArcWork(this) + } +} + +impl Work { + /// Construct a static worker. + pub const fn new_static(action: T) -> Work { + let work = >::new_raw(); + + unsafe { + let addr = work.get_uninit(); + (*addr).handler = Some(StaticWork::::handler); + } + + Work { work, action } } /// Access the inner action. @@ -546,3 +543,72 @@ impl Work { &self.action } } + +/// Capture the kinds of pointers that are safe to submit to work queues. +pub trait SubmittablePointer { + /// Submit this work to the system work queue. + fn submit(self) -> crate::Result; + + /// Submit this work to the given work queue. + fn submit_to_queue(self, queue: &'static WorkQueue) -> crate::Result; +} + +impl SubmittablePointer for Arc> { + fn submit(self) -> crate::Result { + ArcWork(self).submit() + } + + fn submit_to_queue(self, queue: &'static WorkQueue) -> crate::Result { + ArcWork(self).submit_to_queue(queue) + } +} + +impl SubmittablePointer for &'static Work { + fn submit(self) -> crate::Result { + StaticWork(self).submit() + } + + fn submit_to_queue(self, queue: &'static WorkQueue) -> crate::Result { + StaticWork(self).submit_to_queue(queue) + } +} + +impl ObjectInit for ZephyrObject { + fn init(item: *mut k_work) { + // SAFETY: The handler was stashed in this field when constructing. At this point, the item + // will be pinned. + unsafe { + let handler = (*item).handler; + k_work_init(item, handler); + } + } +} + +/// Declare a static work queue. +/// +/// This declares a static work queue (of type [`WorkQueueDecl`]). This will have a single method +/// `.start()` which can be used to start the work queue, as well as return the persistent handle +/// that can be used to enqueue to it. +#[macro_export] +macro_rules! define_work_queue { + ($name:ident, $stack_size:expr) => { + $crate::define_work_queue!($name, $stack_size,); + }; + ($name:ident, $stack_size:expr, $($key:ident = $value:expr),* $(,)?) => { + static $name: $crate::work::WorkQueueDecl<$stack_size> = { + #[link_section = concat!(".noinit.workq.", stringify!($name))] + static _ZEPHYR_STACK: $crate::thread::ThreadStack<$stack_size> = + $crate::thread::ThreadStack::new(); + const _ZEPHYR_C_NAME: &[u8] = concat!(stringify!($name), "\0").as_bytes(); + const _ZEPHYR_ARGS: $crate::work::WorkQueueDeclArgs = $crate::work::WorkQueueDeclArgs { + $($key: $value,)* + ..$crate::work::WorkQueueDeclArgs::default_values() + }; + $crate::work::WorkQueueDecl::new( + &_ZEPHYR_STACK, + _ZEPHYR_C_NAME.as_ptr() as *const ::core::ffi::c_char, + _ZEPHYR_ARGS, + ) + }; + }; +}