diff --git a/examples/main.rs b/examples/main.rs index c92dcf1..6841a7e 100644 --- a/examples/main.rs +++ b/examples/main.rs @@ -1,8 +1,8 @@ extern crate dispatch; +use dispatch::{Queue, QueuePriority}; use std::io; use std::process::exit; -use dispatch::{Queue, QueuePriority}; /// Prompts for a number and adds it to the given sum. /// diff --git a/src/ffi.rs b/src/ffi.rs index 91dad6f..7fc2265 100644 --- a/src/ffi.rs +++ b/src/ffi.rs @@ -4,10 +4,12 @@ use std::os::raw::{c_char, c_long, c_ulong, c_void}; #[repr(C)] -pub struct dispatch_object_s { _private: [u8; 0] } +pub struct dispatch_object_s { + _private: [u8; 0], +} // dispatch_block_t -pub type dispatch_function_t = extern fn(*mut c_void); +pub type dispatch_function_t = extern "C" fn(*mut c_void); pub type dispatch_semaphore_t = *mut dispatch_object_s; pub type dispatch_group_t = *mut dispatch_object_s; pub type dispatch_object_t = *mut dispatch_object_s; @@ -25,39 +27,78 @@ pub type dispatch_time_t = u64; // dispatch_io_interval_flags_t pub type dispatch_queue_attr_t = *const dispatch_object_s; -#[cfg_attr(any(target_os = "macos", target_os = "ios"), - link(name = "System", kind = "dylib"))] -#[cfg_attr(not(any(target_os = "macos", target_os = "ios")), - link(name = "dispatch", kind = "dylib"))] -extern { +#[cfg_attr( + any(target_os = "macos", target_os = "ios"), + link(name = "System", kind = "dylib") +)] +#[cfg_attr( + not(any(target_os = "macos", target_os = "ios")), + link(name = "dispatch", kind = "dylib") +)] +extern "C" { static _dispatch_main_q: dispatch_object_s; static _dispatch_queue_attr_concurrent: dispatch_object_s; pub fn dispatch_get_global_queue(identifier: c_long, flags: c_ulong) -> dispatch_queue_t; - pub fn dispatch_queue_create(label: *const c_char, attr: dispatch_queue_attr_t) -> dispatch_queue_t; + pub fn dispatch_queue_create( + label: *const c_char, + attr: dispatch_queue_attr_t, + ) -> dispatch_queue_t; // dispatch_queue_attr_t dispatch_queue_attr_make_with_qos_class ( dispatch_queue_attr_t attr, dispatch_qos_class_t qos_class, int relative_priority ); pub fn dispatch_queue_get_label(queue: dispatch_queue_t) -> *const c_char; pub fn dispatch_set_target_queue(object: dispatch_object_t, queue: dispatch_queue_t); pub fn dispatch_main(); // void dispatch_async ( dispatch_queue_t queue, dispatch_block_t block ); - pub fn dispatch_async_f(queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t); + pub fn dispatch_async_f( + queue: dispatch_queue_t, + context: *mut c_void, + work: dispatch_function_t, + ); // void dispatch_sync ( dispatch_queue_t queue, dispatch_block_t block ); - pub fn dispatch_sync_f(queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t); + pub fn dispatch_sync_f( + queue: dispatch_queue_t, + context: *mut c_void, + work: dispatch_function_t, + ); // void dispatch_after ( dispatch_time_t when, dispatch_queue_t queue, dispatch_block_t block ); - pub fn dispatch_after_f(when: dispatch_time_t, queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t); + pub fn dispatch_after_f( + when: dispatch_time_t, + queue: dispatch_queue_t, + context: *mut c_void, + work: dispatch_function_t, + ); // void dispatch_apply ( size_t iterations, dispatch_queue_t queue, void (^block)(size_t) ); - pub fn dispatch_apply_f(iterations: usize, queue: dispatch_queue_t, context: *mut c_void, work: extern fn(*mut c_void, usize)); + pub fn dispatch_apply_f( + iterations: usize, + queue: dispatch_queue_t, + context: *mut c_void, + work: extern "C" fn(*mut c_void, usize), + ); // void dispatch_once ( dispatch_once_t *predicate, dispatch_block_t block ); - pub fn dispatch_once_f(predicate: *mut dispatch_once_t, context: *mut c_void, function: dispatch_function_t); + pub fn dispatch_once_f( + predicate: *mut dispatch_once_t, + context: *mut c_void, + function: dispatch_function_t, + ); // void dispatch_group_async ( dispatch_group_t group, dispatch_queue_t queue, dispatch_block_t block ); - pub fn dispatch_group_async_f(group: dispatch_group_t, queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t); + pub fn dispatch_group_async_f( + group: dispatch_group_t, + queue: dispatch_queue_t, + context: *mut c_void, + work: dispatch_function_t, + ); pub fn dispatch_group_create() -> dispatch_group_t; pub fn dispatch_group_enter(group: dispatch_group_t); pub fn dispatch_group_leave(group: dispatch_group_t); // void dispatch_group_notify ( dispatch_group_t group, dispatch_queue_t queue, dispatch_block_t block ); - pub fn dispatch_group_notify_f(group: dispatch_group_t, queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t); + pub fn dispatch_group_notify_f( + group: dispatch_group_t, + queue: dispatch_queue_t, + context: *mut c_void, + work: dispatch_function_t, + ); pub fn dispatch_group_wait(group: dispatch_group_t, timeout: dispatch_time_t) -> c_long; pub fn dispatch_get_context(object: dispatch_object_t) -> *mut c_void; @@ -70,12 +111,21 @@ extern { pub fn dispatch_semaphore_create(value: c_long) -> dispatch_semaphore_t; pub fn dispatch_semaphore_signal(dsema: dispatch_semaphore_t) -> c_long; - pub fn dispatch_semaphore_wait(dsema: dispatch_semaphore_t, timeout: dispatch_time_t) -> c_long; + pub fn dispatch_semaphore_wait(dsema: dispatch_semaphore_t, timeout: dispatch_time_t) + -> c_long; // void dispatch_barrier_async ( dispatch_queue_t queue, dispatch_block_t block ); - pub fn dispatch_barrier_async_f(queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t); + pub fn dispatch_barrier_async_f( + queue: dispatch_queue_t, + context: *mut c_void, + work: dispatch_function_t, + ); // void dispatch_barrier_sync ( dispatch_queue_t queue, dispatch_block_t block ); - pub fn dispatch_barrier_sync_f(queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t); + pub fn dispatch_barrier_sync_f( + queue: dispatch_queue_t, + context: *mut c_void, + work: dispatch_function_t, + ); // void dispatch_source_cancel ( dispatch_source_t source ); // dispatch_source_t dispatch_source_create ( dispatch_source_type_t type, uintptr_t handle, unsigned long mask, dispatch_queue_t queue ); @@ -136,14 +186,15 @@ pub fn dispatch_get_main_queue() -> dispatch_queue_t { } pub const DISPATCH_QUEUE_SERIAL: dispatch_queue_attr_t = 0 as dispatch_queue_attr_t; -pub static DISPATCH_QUEUE_CONCURRENT: &'static dispatch_object_s = unsafe { &_dispatch_queue_attr_concurrent }; +pub static DISPATCH_QUEUE_CONCURRENT: &'static dispatch_object_s = + unsafe { &_dispatch_queue_attr_concurrent }; -pub const DISPATCH_QUEUE_PRIORITY_HIGH: c_long = 2; -pub const DISPATCH_QUEUE_PRIORITY_DEFAULT: c_long = 0; -pub const DISPATCH_QUEUE_PRIORITY_LOW: c_long = -2; +pub const DISPATCH_QUEUE_PRIORITY_HIGH: c_long = 2; +pub const DISPATCH_QUEUE_PRIORITY_DEFAULT: c_long = 0; +pub const DISPATCH_QUEUE_PRIORITY_LOW: c_long = -2; pub const DISPATCH_QUEUE_PRIORITY_BACKGROUND: c_long = -1 << 15; -pub const DISPATCH_TIME_NOW: dispatch_time_t = 0; +pub const DISPATCH_TIME_NOW: dispatch_time_t = 0; pub const DISPATCH_TIME_FOREVER: dispatch_time_t = !0; #[cfg(test)] @@ -155,7 +206,7 @@ mod tests { use std::os::raw::c_void; use std::ptr; - extern fn serial_queue_test_add(num: *mut c_void) { + extern "C" fn serial_queue_test_add(num: *mut c_void) { unsafe { *(num as *mut u32) = 1; } diff --git a/src/group.rs b/src/group.rs index 6ba6c5b..84c0741 100644 --- a/src/group.rs +++ b/src/group.rs @@ -1,8 +1,8 @@ use std::time::Duration; use crate::ffi::*; -use crate::{context_and_function, time_after_delay, WaitTimeout}; use crate::queue::Queue; +use crate::{context_and_function, time_after_delay, WaitTimeout}; /// A Grand Central Dispatch group. /// @@ -18,7 +18,9 @@ impl Group { /// Creates a new dispatch `Group`. pub fn create() -> Group { unsafe { - Group { ptr: dispatch_group_create() } + Group { + ptr: dispatch_group_create(), + } } } @@ -32,7 +34,9 @@ impl Group { /// Submits a closure asynchronously to the given `Queue` and associates it /// with self. pub fn exec_async(&self, queue: &Queue, work: F) - where F: 'static + Send + FnOnce() { + where + F: 'static + Send + FnOnce(), + { let (context, work) = context_and_function(work); unsafe { dispatch_group_async_f(self.ptr, queue.ptr, context, work); @@ -43,7 +47,9 @@ impl Group { /// associated with self have completed. /// If self is empty, the closure is submitted immediately. pub fn notify(&self, queue: &Queue, work: F) - where F: 'static + Send + FnOnce() { + where + F: 'static + Send + FnOnce(), + { let (context, work) = context_and_function(work); unsafe { dispatch_group_notify_f(self.ptr, queue.ptr, context, work); @@ -52,9 +58,7 @@ impl Group { /// Waits synchronously for all tasks associated with self to complete. pub fn wait(&self) { - let result = unsafe { - dispatch_group_wait(self.ptr, DISPATCH_TIME_FOREVER) - }; + let result = unsafe { dispatch_group_wait(self.ptr, DISPATCH_TIME_FOREVER) }; assert!(result == 0, "Dispatch group wait errored"); } @@ -63,9 +67,7 @@ impl Group { /// Returns true if the tasks completed or false if the timeout elapsed. pub fn wait_timeout(&self, timeout: Duration) -> Result<(), WaitTimeout> { let when = time_after_delay(timeout); - let result = unsafe { - dispatch_group_wait(self.ptr, when) - }; + let result = unsafe { dispatch_group_wait(self.ptr, when) }; if result == 0 { Ok(()) } else { @@ -75,15 +77,13 @@ impl Group { /// Returns whether self is currently empty. pub fn is_empty(&self) -> bool { - let result = unsafe { - dispatch_group_wait(self.ptr, DISPATCH_TIME_NOW) - }; + let result = unsafe { dispatch_group_wait(self.ptr, DISPATCH_TIME_NOW) }; result == 0 } } -unsafe impl Sync for Group { } -unsafe impl Send for Group { } +unsafe impl Sync for Group {} +unsafe impl Send for Group {} impl Clone for Group { fn clone(&self) -> Self { @@ -113,11 +113,13 @@ impl GroupGuard { unsafe { dispatch_group_enter(group.ptr); } - GroupGuard { group: group.clone() } + GroupGuard { + group: group.clone(), + } } /// Drops self, leaving the `Group`. - pub fn leave(self) { } + pub fn leave(self) {} } impl Clone for GroupGuard { @@ -136,9 +138,9 @@ impl Drop for GroupGuard { #[cfg(test)] mod tests { - use std::sync::{Arc, Mutex}; - use crate::{Queue, QueueAttribute}; use super::Group; + use crate::{Queue, QueueAttribute}; + use std::sync::{Arc, Mutex}; #[test] fn test_group() { diff --git a/src/lib.rs b/src/lib.rs index c5cd981..f851e3f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -44,10 +44,12 @@ assert!(nums[0] == "2"); #![warn(missing_docs)] +use std::convert::TryFrom; use std::error::Error; use std::fmt; -use std::mem; +use std::mem::ManuallyDrop; use std::os::raw::c_void; +use std::panic::AssertUnwindSafe; use std::time::Duration; use crate::ffi::*; @@ -60,8 +62,8 @@ pub use crate::sem::{Semaphore, SemaphoreGuard}; /// Raw foreign function interface for libdispatch. pub mod ffi; mod group; -mod queue; mod once; +mod queue; mod sem; /// An error indicating a wait timed out. @@ -76,58 +78,92 @@ impl fmt::Display for WaitTimeout { } } -impl Error for WaitTimeout { } +impl Error for WaitTimeout {} fn time_after_delay(delay: Duration) -> dispatch_time_t { - delay.as_secs().checked_mul(1_000_000_000).and_then(|i| { - i.checked_add(delay.subsec_nanos() as u64) - }).and_then(|i| { - if i < (i64::max_value() as u64) { Some(i as i64) } else { None } - }).map_or(DISPATCH_TIME_FOREVER, |i| unsafe { + i64::try_from(delay.as_nanos()).map_or(DISPATCH_TIME_FOREVER, |i| unsafe { dispatch_time(DISPATCH_TIME_NOW, i) }) } fn context_and_function(closure: F) -> (*mut c_void, dispatch_function_t) - where F: FnOnce() { - extern fn work_execute_closure(context: Box) where F: FnOnce() { - (*context)(); +where + F: 'static + FnOnce(), +{ + extern "C" fn work_execute_closure(context: *mut c_void) + where + F: FnOnce(), + { + let closure: Box = unsafe { Box::from_raw(context as *mut _) }; + if std::panic::catch_unwind(AssertUnwindSafe(closure)).is_err() { + // Abort to prevent unwinding across FFI + std::process::abort(); + } } let closure = Box::new(closure); - let func: extern fn(Box) = work_execute_closure::; - unsafe { - (mem::transmute(closure), mem::transmute(func)) - } + let func: dispatch_function_t = work_execute_closure::; + (Box::into_raw(closure) as *mut c_void, func) } -fn context_and_sync_function(closure: &mut Option) -> - (*mut c_void, dispatch_function_t) - where F: FnOnce() { - extern fn work_read_closure(context: &mut Option) where F: FnOnce() { - // This is always passed Some, so it's safe to unwrap - let closure = context.take().unwrap(); - closure(); +fn with_context_and_sync_function( + closure: F, + wrapper: impl FnOnce(*mut c_void, dispatch_function_t), +) -> Option +where + F: FnOnce() -> T, +{ + #[derive(Debug)] + struct SyncContext { + closure: ManuallyDrop, + result: Option>, } - let context: *mut Option = closure; - let func: extern fn(&mut Option) = work_read_closure::; - unsafe { - (context as *mut c_void, mem::transmute(func)) + extern "C" fn work_execute_closure(context: *mut c_void) + where + F: FnOnce() -> T, + { + let sync_context: &mut SyncContext = unsafe { &mut *(context as *mut _) }; + let closure = unsafe { ManuallyDrop::take(&mut sync_context.closure) }; + sync_context.result = Some(std::panic::catch_unwind(AssertUnwindSafe(closure))); + } + + let mut sync_context: SyncContext = SyncContext { + closure: ManuallyDrop::new(closure), + result: None, + }; + let func: dispatch_function_t = work_execute_closure::; + wrapper(&mut sync_context as *mut _ as *mut c_void, func); + + // If the closure panicked, resume unwinding + match sync_context.result.transpose() { + Ok(res) => { + if res.is_none() { + // if the closure didn't run (for example when using `Once`), free the closure + unsafe { ManuallyDrop::drop(&mut sync_context.closure); }; + } + res + } + Err(unwind_payload) => std::panic::resume_unwind(unwind_payload), } } -fn context_and_apply_function(closure: &F) -> - (*mut c_void, extern fn(*mut c_void, usize)) - where F: Fn(usize) { - extern fn work_apply_closure(context: &F, iter: usize) - where F: Fn(usize) { - context(iter); +fn context_and_apply_function(closure: &F) -> (*mut c_void, extern "C" fn(*mut c_void, usize)) +where + F: Fn(usize), +{ + extern "C" fn work_apply_closure(context: *mut c_void, iter: usize) + where + F: Fn(usize), + { + let context: &F = unsafe { &*(context as *const _) }; + if std::panic::catch_unwind(AssertUnwindSafe(|| context(iter))).is_err() { + // Abort to prevent unwinding across FFI + std::process::abort(); + } } let context: *const F = closure; - let func: extern fn(&F, usize) = work_apply_closure::; - unsafe { - (context as *mut c_void, mem::transmute(func)) - } + let func: extern "C" fn(*mut c_void, usize) = work_apply_closure::; + (context as *mut c_void, func) } diff --git a/src/once.rs b/src/once.rs index c1dbfa8..9388c3c 100644 --- a/src/once.rs +++ b/src/once.rs @@ -1,7 +1,7 @@ use std::cell::UnsafeCell; use crate::ffi::*; -use crate::context_and_sync_function; +use crate::with_context_and_sync_function; /// A predicate used to execute a closure only once for the lifetime of an /// application. @@ -13,7 +13,9 @@ pub struct Once { impl Once { /// Creates a new `Once`. pub const fn new() -> Once { - Once { predicate: UnsafeCell::new(0) } + Once { + predicate: UnsafeCell::new(0), + } } /// Executes a closure once, ensuring that no other closure has been or @@ -22,16 +24,19 @@ impl Once { /// If called simultaneously from multiple threads, waits synchronously // until the work has completed. #[inline(always)] - pub fn call_once(&'static self, work: F) where F: FnOnce() { + pub fn call_once(&'static self, work: F) + where + F: FnOnce(), + { #[cold] #[inline(never)] fn once(predicate: *mut dispatch_once_t, work: F) - where F: FnOnce() { - let mut work = Some(work); - let (context, work) = context_and_sync_function(&mut work); - unsafe { + where + F: FnOnce(), + { + with_context_and_sync_function(work, |context, work| unsafe { dispatch_once_f(predicate, context, work); - } + }); } unsafe { @@ -43,7 +48,7 @@ impl Once { } } -unsafe impl Sync for Once { } +unsafe impl Sync for Once {} #[cfg(test)] mod tests { diff --git a/src/queue.rs b/src/queue.rs index 46ed9cd..e728ef2 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -6,8 +6,8 @@ use std::time::Duration; use crate::ffi::*; use crate::{ - context_and_function, context_and_sync_function, context_and_apply_function, - time_after_delay, + context_and_apply_function, context_and_function, time_after_delay, + with_context_and_sync_function, }; /// The type of a dispatch queue. @@ -59,9 +59,9 @@ pub enum QueuePriority { impl QueuePriority { fn as_raw(&self) -> c_long { match *self { - QueuePriority::High => DISPATCH_QUEUE_PRIORITY_HIGH, - QueuePriority::Default => DISPATCH_QUEUE_PRIORITY_DEFAULT, - QueuePriority::Low => DISPATCH_QUEUE_PRIORITY_LOW, + QueuePriority::High => DISPATCH_QUEUE_PRIORITY_HIGH, + QueuePriority::Default => DISPATCH_QUEUE_PRIORITY_DEFAULT, + QueuePriority::Low => DISPATCH_QUEUE_PRIORITY_LOW, QueuePriority::Background => DISPATCH_QUEUE_PRIORITY_BACKGROUND, } } @@ -100,9 +100,7 @@ impl Queue { /// Creates a new dispatch `Queue`. pub fn create(label: &str, attr: QueueAttribute) -> Self { let label = CString::new(label).unwrap(); - let queue = unsafe { - dispatch_queue_create(label.as_ptr(), attr.as_raw()) - }; + let queue = unsafe { dispatch_queue_create(label.as_ptr(), attr.as_raw()) }; Queue { ptr: queue } } @@ -111,8 +109,7 @@ impl Queue { /// A dispatch queue's priority is inherited from its target queue. /// Additionally, if both the queue and its target are serial queues, /// their blocks will not be invoked concurrently. - pub fn with_target_queue(label: &str, attr: QueueAttribute, target: &Queue) - -> Self { + pub fn with_target_queue(label: &str, attr: QueueAttribute, target: &Queue) -> Self { let queue = Queue::create(label, attr); unsafe { dispatch_set_target_queue(queue.ptr, target.ptr); @@ -134,27 +131,22 @@ impl Queue { /// Submits a closure for execution on self and waits until it completes. pub fn exec_sync(&self, work: F) -> T - where F: Send + FnOnce() -> T, T: Send { - let mut result = None; - { - let result_ref = &mut result; - let work = move || { - *result_ref = Some(work()); - }; - - let mut work = Some(work); - let (context, work) = context_and_sync_function(&mut work); - unsafe { - dispatch_sync_f(self.ptr, context, work); - } - } - // This was set so it's safe to unwrap - result.unwrap() + where + F: Send + FnOnce() -> T, + T: Send, + { + with_context_and_sync_function(work, |context, work| unsafe { + dispatch_sync_f(self.ptr, context, work); + }) + .unwrap() } /// Submits a closure for asynchronous execution on self and returns /// immediately. - pub fn exec_async(&self, work: F) where F: 'static + Send + FnOnce() { + pub fn exec_async(&self, work: F) + where + F: 'static + Send + FnOnce(), + { let (context, work) = context_and_function(work); unsafe { dispatch_async_f(self.ptr, context, work); @@ -164,7 +156,9 @@ impl Queue { /// After the specified delay, submits a closure for asynchronous execution /// on self. pub fn exec_after(&self, delay: Duration, work: F) - where F: 'static + Send + FnOnce() { + where + F: 'static + Send + FnOnce(), + { let when = time_after_delay(delay); let (context, work) = context_and_function(work); unsafe { @@ -175,7 +169,9 @@ impl Queue { /// Submits a closure to be executed on self the given number of iterations /// and waits until it completes. pub fn apply(&self, iterations: usize, work: F) - where F: Sync + Fn(usize) { + where + F: Sync + Fn(usize), + { let (context, work) = context_and_apply_function(&work); unsafe { dispatch_apply_f(iterations, self.ptr, context, work); @@ -185,10 +181,13 @@ impl Queue { /// Submits a closure to be executed on self for each element of the /// provided slice and waits until it completes. pub fn for_each(&self, slice: &mut [T], work: F) - where F: Sync + Fn(&mut T), T: Send { + where + F: Sync + Fn(&mut T), + T: Send, + { let slice_ptr = slice.as_mut_ptr(); let work = move |i| unsafe { - work(&mut *slice_ptr.offset(i as isize)); + work(&mut *slice_ptr.add(i)); }; let (context, work) = context_and_apply_function(&work); unsafe { @@ -199,7 +198,11 @@ impl Queue { /// Submits a closure to be executed on self for each element of the /// provided vector and returns a `Vec` of the mapped elements. pub fn map(&self, vec: Vec, work: F) -> Vec - where F: Sync + Fn(T) -> U, T: Send, U: Send { + where + F: Sync + Fn(T) -> U, + T: Send, + U: Send, + { let mut src = vec; let len = src.len(); let src_ptr = src.as_ptr(); @@ -208,8 +211,8 @@ impl Queue { let dest_ptr = dest.as_mut_ptr(); let work = move |i| unsafe { - let result = work(ptr::read(src_ptr.offset(i as isize))); - ptr::write(dest_ptr.offset(i as isize), result); + let result = work(ptr::read(src_ptr.add(i))); + ptr::write(dest_ptr.add(i), result); }; let (context, work) = context_and_apply_function(&work); unsafe { @@ -234,22 +237,14 @@ impl Queue { /// If self is a serial queue or one of the global concurrent queues, /// this method behaves like the normal `sync` method. pub fn barrier_sync(&self, work: F) -> T - where F: Send + FnOnce() -> T, T: Send { - let mut result = None; - { - let result_ref = &mut result; - let work = move || { - *result_ref = Some(work()); - }; - - let mut work = Some(work); - let (context, work) = context_and_sync_function(&mut work); - unsafe { - dispatch_barrier_sync_f(self.ptr, context, work); - } - } - // This was set so it's safe to unwrap - result.unwrap() + where + F: Send + FnOnce() -> T, + T: Send, + { + with_context_and_sync_function(work, |context, work| unsafe { + dispatch_barrier_sync_f(self.ptr, context, work); + }) + .unwrap() } /// Submits a closure to be executed on self as a barrier and returns @@ -265,7 +260,9 @@ impl Queue { /// If self is a serial queue or one of the global concurrent queues, /// this method behaves like the normal `async` method. pub fn barrier_async(&self, work: F) - where F: 'static + Send + FnOnce() { + where + F: 'static + Send + FnOnce(), + { let (context, work) = context_and_function(work); unsafe { dispatch_barrier_async_f(self.ptr, context, work); @@ -283,8 +280,8 @@ impl Queue { } } -unsafe impl Sync for Queue { } -unsafe impl Send for Queue { } +unsafe impl Sync for Queue {} +unsafe impl Send for Queue {} impl Clone for Queue { fn clone(&self) -> Self { @@ -314,11 +311,13 @@ impl SuspendGuard { unsafe { dispatch_suspend(queue.ptr); } - SuspendGuard { queue: queue.clone() } + SuspendGuard { + queue: queue.clone(), + } } /// Drops self, allowing the suspended `Queue` to resume. - pub fn resume(self) { } + pub fn resume(self) {} } impl Clone for SuspendGuard { @@ -337,10 +336,10 @@ impl Drop for SuspendGuard { #[cfg(test)] mod tests { + use super::*; + use crate::Group; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; - use crate::Group; - use super::*; fn async_increment(queue: &Queue, num: &Arc>) { let num = num.clone(); @@ -503,4 +502,34 @@ mod tests { q.exec_sync(|| ()); assert_eq!(*num.lock().unwrap(), 1); } + + #[test] + #[should_panic(expected = "panic from exec_sync")] + fn test_panic_serial() { + let q = Queue::create("", QueueAttribute::Serial); + + q.exec_sync(|| panic!("panic from exec_sync")); + + panic!("exec_sync did NOT panic"); + } + + #[test] + #[should_panic(expected = "panic from exec_sync")] + fn test_panic_concurrent() { + let q = Queue::create("", QueueAttribute::Concurrent); + + q.exec_sync(|| panic!("panic from exec_sync")); + + panic!("exec_sync did NOT panic"); + } + + #[test] + #[should_panic(expected = "panic from barrier_sync")] + fn test_panic_barrier() { + let q = Queue::create("", QueueAttribute::Serial); + + q.barrier_sync(|| panic!("panic from barrier_sync")); + + panic!("barrier_sync did NOT panic"); + } } diff --git a/src/sem.rs b/src/sem.rs index b6faa24..bb70826 100644 --- a/src/sem.rs +++ b/src/sem.rs @@ -18,26 +18,20 @@ impl Semaphore { /// successful calls to `wait` than `signal`, the system assumes the /// `Semaphore` is still in use and will abort if it is disposed. pub fn new(value: u32) -> Self { - let ptr = unsafe { - dispatch_semaphore_create(value as c_long) - }; + let ptr = unsafe { dispatch_semaphore_create(value as c_long) }; Semaphore { ptr } } /// Wait for (decrement) self. pub fn wait(&self) { - let result = unsafe { - dispatch_semaphore_wait(self.ptr, DISPATCH_TIME_FOREVER) - }; + let result = unsafe { dispatch_semaphore_wait(self.ptr, DISPATCH_TIME_FOREVER) }; assert!(result == 0, "Dispatch semaphore wait errored"); } /// Wait for (decrement) self until the specified timeout has elapsed. pub fn wait_timeout(&self, timeout: Duration) -> Result<(), WaitTimeout> { let when = time_after_delay(timeout); - let result = unsafe { - dispatch_semaphore_wait(self.ptr, when) - }; + let result = unsafe { dispatch_semaphore_wait(self.ptr, when) }; if result == 0 { Ok(()) } else { @@ -50,9 +44,7 @@ impl Semaphore { /// If the previous value was less than zero, this method wakes a waiting thread. /// Returns `true` if a thread is woken or `false` otherwise. pub fn signal(&self) -> bool { - unsafe { - dispatch_semaphore_signal(self.ptr) != 0 - } + unsafe { dispatch_semaphore_signal(self.ptr) != 0 } } /// Wait to access a resource protected by self. @@ -64,8 +56,7 @@ impl Semaphore { /// Wait until the specified timeout to access a resource protected by self. /// This decrements self and returns a guard that increments when dropped. - pub fn access_timeout(&self, timeout: Duration) - -> Result { + pub fn access_timeout(&self, timeout: Duration) -> Result { self.wait_timeout(timeout)?; Ok(SemaphoreGuard::new(self.clone())) } @@ -103,7 +94,7 @@ impl SemaphoreGuard { } /// Drops self, signaling the `Semaphore`. - pub fn signal(self) { } + pub fn signal(self) {} } impl Drop for SemaphoreGuard {