diff --git a/Cargo.toml b/Cargo.toml index c492654..c828769 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,3 +17,7 @@ exclude = [ "travis_test.sh", "tests-ios/**", ] + +[dependencies] +bitflags = "0.7" +libc = "0.2" diff --git a/src/ffi.rs b/src/ffi.rs index 91dad6f..3b4db5f 100644 --- a/src/ffi.rs +++ b/src/ffi.rs @@ -1,11 +1,15 @@ #![allow(missing_docs)] #![allow(non_camel_case_types)] +use libc::uintptr_t; use std::os::raw::{c_char, c_long, c_ulong, c_void}; #[repr(C)] pub struct dispatch_object_s { _private: [u8; 0] } +#[repr(C)] +pub struct dispatch_source_type_s { _private: [u8; 0] } + // dispatch_block_t pub type dispatch_function_t = extern fn(*mut c_void); pub type dispatch_semaphore_t = *mut dispatch_object_s; @@ -14,7 +18,8 @@ pub type dispatch_object_t = *mut dispatch_object_s; pub type dispatch_once_t = c_long; pub type dispatch_queue_t = *mut dispatch_object_s; pub type dispatch_time_t = u64; -// dispatch_source_type_t +pub type dispatch_source_t = *mut dispatch_object_s; +pub type dispatch_source_type_t = *const dispatch_source_type_s; // dispatch_fd_t // dispatch_data_t // dispatch_data_applier_t @@ -77,20 +82,32 @@ extern { // void dispatch_barrier_sync ( dispatch_queue_t queue, dispatch_block_t block ); pub fn dispatch_barrier_sync_f(queue: dispatch_queue_t, context: *mut c_void, work: dispatch_function_t); - // void dispatch_source_cancel ( dispatch_source_t source ); - // dispatch_source_t dispatch_source_create ( dispatch_source_type_t type, uintptr_t handle, unsigned long mask, dispatch_queue_t queue ); - // unsigned long dispatch_source_get_data ( dispatch_source_t source ); - // uintptr_t dispatch_source_get_handle ( dispatch_source_t source ); - // unsigned long dispatch_source_get_mask ( dispatch_source_t source ); - // void dispatch_source_merge_data ( dispatch_source_t source, unsigned long value ); + static _dispatch_source_type_data_add: dispatch_source_type_s; + static _dispatch_source_type_data_or: dispatch_source_type_s; + static _dispatch_source_type_mach_send: dispatch_source_type_s; + static _dispatch_source_type_mach_recv: dispatch_source_type_s; + static _dispatch_source_type_memorypressure: dispatch_source_type_s; + static _dispatch_source_type_proc: dispatch_source_type_s; + static _dispatch_source_type_read: dispatch_source_type_s; + static _dispatch_source_type_signal: dispatch_source_type_s; + static _dispatch_source_type_timer: dispatch_source_type_s; + static _dispatch_source_type_vnode: dispatch_source_type_s; + static _dispatch_source_type_write: dispatch_source_type_s; + + pub fn dispatch_source_cancel(source: dispatch_source_t); + pub fn dispatch_source_create(type_: dispatch_source_type_t, handle: uintptr_t, mask: c_ulong, queue: dispatch_queue_t) -> dispatch_source_t; + pub fn dispatch_source_get_data(source: dispatch_source_t) -> c_ulong; + pub fn dispatch_source_get_handle(source: dispatch_source_t) -> uintptr_t; + pub fn dispatch_source_get_mask(source: dispatch_source_t) -> c_ulong; + pub fn dispatch_source_merge_data(source: dispatch_source_t, value: c_ulong); // void dispatch_source_set_registration_handler ( dispatch_source_t source, dispatch_block_t handler ); // void dispatch_source_set_registration_handler_f ( dispatch_source_t source, dispatch_function_t handler ); // void dispatch_source_set_cancel_handler ( dispatch_source_t source, dispatch_block_t handler ); - // void dispatch_source_set_cancel_handler_f ( dispatch_source_t source, dispatch_function_t handler ); + pub fn dispatch_source_set_cancel_handler_f(source: dispatch_source_t, handler: dispatch_function_t); // void dispatch_source_set_event_handler ( dispatch_source_t source, dispatch_block_t handler ); - // void dispatch_source_set_event_handler_f ( dispatch_source_t source, dispatch_function_t handler ); - // void dispatch_source_set_timer ( dispatch_source_t source, dispatch_time_t start, uint64_t interval, uint64_t leeway ); - // long dispatch_source_testcancel ( dispatch_source_t source ); + pub fn dispatch_source_set_event_handler_f(source: dispatch_source_t, handler: dispatch_function_t); + pub fn dispatch_source_set_timer(source: dispatch_source_t, start: dispatch_time_t, interval: u64, leeway: u64); + pub fn dispatch_source_testcancel(source: dispatch_source_t) -> c_long; // void dispatch_read ( dispatch_fd_t fd, size_t length, dispatch_queue_t queue, void (^handler)(dispatch_data_t data, int error) ); // void dispatch_write ( dispatch_fd_t fd, dispatch_data_t data, dispatch_queue_t queue, void (^handler)(dispatch_data_t data, int error) ); @@ -146,6 +163,49 @@ pub const DISPATCH_QUEUE_PRIORITY_BACKGROUND: c_long = -1 << 15; pub const DISPATCH_TIME_NOW: dispatch_time_t = 0; pub const DISPATCH_TIME_FOREVER: dispatch_time_t = !0; +pub type dispatch_source_mach_send_flags_t = c_ulong; + +pub const DISPATCH_MACH_SEND_DEAD: dispatch_source_mach_send_flags_t = 1; + +pub type dispatch_source_memorypressure_flags_t = c_ulong; + +pub const DISPATCH_MEMORYPRESSURE_NORMAL: dispatch_source_memorypressure_flags_t = 0x1; +pub const DISPATCH_MEMORYPRESSURE_WARN: dispatch_source_memorypressure_flags_t = 0x2; +pub const DISPATCH_MEMORYPRESSURE_CRITICAL: dispatch_source_memorypressure_flags_t = 0x4; + +pub type dispatch_source_proc_flags_t = c_ulong; + +pub const DISPATCH_PROC_EXIT: dispatch_source_proc_flags_t = 0x80000000; +pub const DISPATCH_PROC_FORK: dispatch_source_proc_flags_t = 0x40000000; +pub const DISPATCH_PROC_EXEC: dispatch_source_proc_flags_t = 0x20000000; +pub const DISPATCH_PROC_SIGNAL: dispatch_source_proc_flags_t = 0x08000000; + +pub type dispatch_source_timer_flags_t = c_ulong; + +pub const DISPATCH_TIMER_STRICT: dispatch_source_timer_flags_t = 0x1; + +pub type dispatch_source_vnode_flags_t = c_ulong; + +pub const DISPATCH_VNODE_DELETE: dispatch_source_vnode_flags_t = 0x1; +pub const DISPATCH_VNODE_WRITE: dispatch_source_vnode_flags_t = 0x2; +pub const DISPATCH_VNODE_EXTEND: dispatch_source_vnode_flags_t = 0x4; +pub const DISPATCH_VNODE_ATTRIB: dispatch_source_vnode_flags_t = 0x8; +pub const DISPATCH_VNODE_LINK: dispatch_source_vnode_flags_t = 0x10; +pub const DISPATCH_VNODE_RENAME: dispatch_source_vnode_flags_t = 0x20; +pub const DISPATCH_VNODE_REVOKE: dispatch_source_vnode_flags_t = 0x40; + +pub static DISPATCH_SOURCE_TYPE_DATA_ADD: &'static dispatch_source_type_s = &_dispatch_source_type_data_add; +pub static DISPATCH_SOURCE_TYPE_DATA_OR: &'static dispatch_source_type_s = &_dispatch_source_type_data_or; +pub static DISPATCH_SOURCE_TYPE_MACH_SEND: &'static dispatch_source_type_s = &_dispatch_source_type_mach_send; +pub static DISPATCH_SOURCE_TYPE_MACH_RECV: &'static dispatch_source_type_s = &_dispatch_source_type_mach_recv; +pub static DISPATCH_SOURCE_TYPE_MEMORYPRESSURE: &'static dispatch_source_type_s = &_dispatch_source_type_memorypressure; +pub static DISPATCH_SOURCE_TYPE_PROC: &'static dispatch_source_type_s = &_dispatch_source_type_proc; +pub static DISPATCH_SOURCE_TYPE_READ: &'static dispatch_source_type_s = &_dispatch_source_type_read; +pub static DISPATCH_SOURCE_TYPE_SIGNAL: &'static dispatch_source_type_s = &_dispatch_source_type_signal; +pub static DISPATCH_SOURCE_TYPE_TIMER: &'static dispatch_source_type_s = &_dispatch_source_type_timer; +pub static DISPATCH_SOURCE_TYPE_VNODE: &'static dispatch_source_type_s = &_dispatch_source_type_vnode; +pub static DISPATCH_SOURCE_TYPE_WRITE: &'static dispatch_source_type_s = &_dispatch_source_type_write; + #[cfg(test)] mod tests { use super::*; diff --git a/src/lib.rs b/src/lib.rs index e3be8dc..047895a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -40,14 +40,36 @@ assert!(nums == [2, 3]); let nums = queue.map(nums, |x| x.to_string()); assert!(nums[0] == "2"); ``` + +# Sources + +Dispatch sources monitor system objects and deliver events to a target queue. + +``` +use dispatch::{Queue, QueueAttribute, SourceBuilder}; +use dispatch::source::{Proc, PROC_FORK}; + +let pid = 123; +let queue = Queue::create("com.example.rust", QueueAttribute::Serial); +let mut source_builder = SourceBuilder::new( + Proc { pid: pid, flags: PROC_FORK }, &queue).unwrap(); +source_builder.event_handler(|source| { + // process 123 forked +}); +let source = source_builder.resume(); +``` */ #![warn(missing_docs)] +#[macro_use] +extern crate bitflags; +extern crate libc; + use std::cell::UnsafeCell; use std::ffi::{CStr, CString}; use std::mem; -use std::os::raw::{c_long, c_void}; +use std::os::raw::{c_long, c_ulong, c_void}; use std::ptr; use std::str; use std::time::Duration; @@ -57,6 +79,9 @@ use ffi::*; /// Raw foreign function interface for libdispatch. pub mod ffi; +/// Source types. +pub mod source; + /// The type of a dispatch queue. #[derive(Clone, Debug, Hash, PartialEq)] pub enum QueueAttribute { @@ -122,10 +147,14 @@ pub struct Queue { ptr: dispatch_queue_t, } +fn duration_nanos(duration: Duration) -> Option { + duration.as_secs().checked_mul(1_000_000_000).and_then(|i| { + i.checked_add(duration.subsec_nanos() as u64) + }) +} + fn time_after_delay(delay: Duration) -> dispatch_time_t { - delay.as_secs().checked_mul(1_000_000_000).and_then(|i| { - i.checked_add(delay.subsec_nanos() as u64) - }).and_then(|i| { + duration_nanos(delay).and_then(|i| { if i < (i64::max_value() as u64) { Some(i as i64) } else { None } }).map_or(DISPATCH_TIME_FOREVER, |i| unsafe { dispatch_time(DISPATCH_TIME_NOW, i) @@ -390,6 +419,12 @@ impl Queue { } } +impl Suspend for Queue { + fn as_raw_suspend_object(&self) -> dispatch_object_t { + self.ptr + } +} + unsafe impl Sync for Queue { } unsafe impl Send for Queue { } @@ -410,33 +445,46 @@ impl Drop for Queue { } } -/// An RAII guard which will resume a suspended `Queue` when dropped. +trait Suspend: Clone { + fn as_raw_suspend_object(&self) -> dispatch_object_t; +} + +/// An RAII guard which will resume a suspended `Queue` or `Source` when dropped. pub struct SuspendGuard { - queue: Queue, + ptr: dispatch_object_t, } impl SuspendGuard { - fn new(queue: &Queue) -> SuspendGuard { + fn new(object: &T) -> SuspendGuard { + let ptr = object.as_raw_suspend_object(); unsafe { - dispatch_suspend(queue.ptr); + dispatch_retain(ptr); + dispatch_suspend(ptr); } - SuspendGuard { queue: queue.clone() } + SuspendGuard { ptr: ptr } } - /// Drops self, allowing the suspended `Queue` to resume. + /// Drops self, allowing the suspended object to resume. pub fn resume(self) { } } +impl Suspend for SuspendGuard { + fn as_raw_suspend_object(&self) -> dispatch_object_t { + self.ptr + } +} + impl Clone for SuspendGuard { fn clone(&self) -> Self { - SuspendGuard::new(&self.queue) + SuspendGuard::new(self) } } impl Drop for SuspendGuard { fn drop(&mut self) { unsafe { - dispatch_resume(self.queue.ptr); + dispatch_resume(self.ptr); + dispatch_release(self.ptr); } } } @@ -608,9 +656,241 @@ impl Once { unsafe impl Sync for Once { } +use source::SourceType; + +/// A builder used to create `Source`s. +pub struct SourceBuilder { + source_: Source, +} + +impl SourceBuilder { + /// Start creating a new source. Returns None if the source could not be created. + pub fn new(source_type: T, target_queue: &Queue) -> Option { + Source::create(source_type, target_queue).map(|s| SourceBuilder { source_: s }) + } +} + +impl SourceBuilder { + /// Sets a cancelation handler on the source. + pub fn cancel_handler(&mut self, handler: F) where F: 'static + Send + FnOnce(Source) { + self.source_.set_cancel_handler(handler); + } + + /// Sets an event handler on the source. + pub fn event_handler(&mut self, handler: F) where F: 'static + Send + Fn(Source) { + self.source_.set_event_handler(handler); + } + + /// Starts the source. + pub fn resume(self) -> Source { + unsafe { dispatch_resume(self.source_.ptr); } + self.source_ + } +} + +struct BoxedOnceHandler { + ptr: *mut c_void, + caller: fn(&mut BoxedOnceHandler, Option), +} + +impl BoxedOnceHandler { + fn new(f: F) -> BoxedOnceHandler { + let ptr: *mut Option = Box::into_raw(Box::new(Some(f))); + BoxedOnceHandler { + ptr: ptr as *mut c_void, + caller: Self::caller::, + } + } + + fn caller(&mut self, value: Option) { + if self.ptr.is_null() { + return; + } + unsafe { + let mut f: Box> = Box::from_raw(self.ptr as *mut Option); + self.ptr = ptr::null_mut(); + if let Some(x) = value { + f.take().unwrap()(x); + } + } + } + + fn call(mut self, value: T) { + (self.caller)(&mut self, Some(value)); + } +} + +impl Drop for BoxedOnceHandler { + fn drop(&mut self) { + if !self.ptr.is_null() { + (self.caller)(self, None); + } + } +} + +struct SourceContext { + cancel: Option>>, + event: Option) + Send>>, + source_ptr: dispatch_source_t, // unretained +} + +impl SourceContext { + fn new(source: dispatch_source_t) -> Self { + SourceContext { + cancel: None, + event: None, + source_ptr: source, + } + } +} + +/// Source type specific data. +pub type SourceData = c_ulong; + +/// A Grand Central Dispatch source. Create with a `SourceBuilder`. +pub struct Source { + ptr: dispatch_source_t, + source_type: ::std::marker::PhantomData, +} + +impl Source { + /// Creates a new source. + fn create(source_type: T, target_queue: &Queue) -> Option { + unsafe { + let (type_, handle, mask) = source_type.as_raw(); + let ptr = dispatch_source_create(type_, handle, mask, target_queue.ptr); + if ptr.is_null() { + return None; + } + + let source_handlers = Box::into_raw(Box::new(SourceContext::::new(ptr))); + extern fn source_handlers_finalizer(ptr: *mut c_void) { + let _ = unsafe { Box::from_raw(ptr as *mut SourceContext) }; + } + dispatch_set_finalizer_f(ptr, source_handlers_finalizer::); + dispatch_set_context(ptr, source_handlers as *mut c_void); + Some(Source { ptr: ptr, source_type: ::std::marker::PhantomData }) + } + } +} + +impl Source { + unsafe fn from_raw(ptr: dispatch_source_t) -> Self { + dispatch_retain(ptr); + Source { ptr: ptr, source_type: ::std::marker::PhantomData } + } + + unsafe fn context(&self) -> &mut SourceContext { + &mut *(dispatch_get_context(self.ptr) as *mut SourceContext) + } + + fn set_cancel_handler(&self, handler: F) + where F: 'static + Send + FnOnce(Source) + { + // is only run once per source + extern fn source_handler), T>(ptr: *mut c_void) { + unsafe { + let ctx = ptr as *mut SourceContext; + if let Some(f) = (*ctx).cancel.take() { + f.call(Source::from_raw((*ctx).source_ptr)); + } + } + } + unsafe { + self.context().cancel = Some(BoxedOnceHandler::new(handler)); + dispatch_source_set_cancel_handler_f(self.ptr, source_handler::); + } + } + + fn set_event_handler(&self, handler: F) + where F: 'static + Send + Fn(Source) + { + extern fn source_handler(ptr: *mut c_void) { + let ctx = ptr as *mut SourceContext; + unsafe { + (*ctx).event.as_ref().expect("event handler exists") + (Source::from_raw((*ctx).source_ptr)); + } + } + unsafe { + self.context().event = Some(Box::new(handler)); + dispatch_source_set_event_handler_f(self.ptr, source_handler::); + } + } + + /// Returns source type specific data. + pub fn data(&self) -> SourceData { + unsafe { dispatch_source_get_data(self.ptr) } + } + + /// Suspends the invocation of blocks on self and returns a `SuspendGuard` + pub fn suspend(&self) -> SuspendGuard { + SuspendGuard::new(self) + } + + /// Cancels the source. + pub fn cancel(&self) { + unsafe { + dispatch_source_cancel(self.ptr); + } + } + + /// Returns true if the source is canceled. + pub fn is_canceled(&self) -> bool { + unsafe { dispatch_source_testcancel(self.ptr) != 0 } + } +} + +impl Source { + /// Sets or resets the source's timer. + pub fn set_timer_after(&self, start: Duration, interval: Duration, leeway: Duration) { + let start = time_after_delay(start); + let interval = duration_nanos(interval).unwrap(); + let leeway = duration_nanos(leeway).unwrap(); + unsafe { + dispatch_source_set_timer(self.ptr, start, interval, leeway); + } + } +} + +impl Source { + /// Merges an integer into a `DataAnd` or `DataOr` source. + pub fn merge_data(&self, value: SourceData) { + unsafe { + dispatch_source_merge_data(self.ptr, value); + } + } +} + +impl Suspend for Source { + fn as_raw_suspend_object(&self) -> dispatch_object_t { + self.ptr + } +} + +unsafe impl Sync for Source { } +unsafe impl Send for Source { } + +impl Clone for Source { + fn clone(&self) -> Self { + unsafe { + dispatch_retain(self.ptr); + } + Source { ptr: self.ptr, source_type: ::std::marker::PhantomData } + } +} + +impl Drop for Source { + fn drop(&mut self) { + unsafe { + dispatch_release(self.ptr); + } + } +} + #[cfg(test)] mod tests { - use std::sync::{Arc, Mutex}; + use std::sync::{Arc, Barrier, Mutex}; use std::time::{Duration, Instant}; use super::*; @@ -813,4 +1093,71 @@ mod tests { // The notify must have run after the two blocks of the group assert_eq!(*num.lock().unwrap(), 10); } + + #[test] + fn test_source() { + let event_barrier = Arc::new(Barrier::new(2)); + let cancel_barrier = Arc::new(Barrier::new(2)); + let num = Arc::new(Mutex::new(0)); + let sum = Arc::new(Mutex::new(0)); + + let ev_num = num.clone(); + let ev_sum = sum.clone(); + let event_handler_barrier = event_barrier.clone(); + let cancel_num = num.clone(); + let cancel_handler_barrier = cancel_barrier.clone(); + + let q = Queue::create("", QueueAttribute::Serial); + let mut sb = SourceBuilder::new(source::DataAdd, &q).unwrap(); + sb.event_handler(move |source| { + let mut num = ev_num.lock().unwrap(); + let mut sum = ev_sum.lock().unwrap(); + *sum += source.data(); + *num |= 1; + event_handler_barrier.wait(); + }); + sb.cancel_handler(move |_| { + let mut num = cancel_num.lock().unwrap(); + *num |= 2; + cancel_handler_barrier.wait(); + }); + let source = sb.resume(); + + source.merge_data(3); + event_barrier.wait(); + assert_eq!(*sum.lock().unwrap(), 3); + source.merge_data(5); + event_barrier.wait(); + assert_eq!(*sum.lock().unwrap(), 8); + source.cancel(); + cancel_barrier.wait(); + assert_eq!(*num.lock().unwrap(), 3); + } + + #[test] + fn test_source_timer() { + let event_barrier = Arc::new(Barrier::new(2)); + let event_handler_barrier = event_barrier.clone(); + let num = Arc::new(Mutex::new(0)); + let ev_num = num.clone(); + + let q = Queue::create("", QueueAttribute::Serial); + let mut sb = SourceBuilder::new(source::Timer { flags: source::TimerFlags::empty() }, &q).unwrap(); + sb.event_handler(move |source| { + let mut num = ev_num.lock().unwrap(); + *num |= 1; + source.cancel(); + event_handler_barrier.wait(); + }); + let source = sb.resume(); + + source.set_timer_after( + Duration::from_millis(0), + Duration::from_millis(10000), + Duration::from_millis(1000)); + + event_barrier.wait(); + assert_eq!(*num.lock().unwrap(), 1); + assert!(source.is_canceled()); + } } diff --git a/src/source.rs b/src/source.rs new file mode 100644 index 0000000..6162749 --- /dev/null +++ b/src/source.rs @@ -0,0 +1,187 @@ +use ffi::*; +use libc::{uintptr_t}; +use std::os::raw::{c_uint, c_int, c_ulong}; + +/// A `mach_port_t`. +pub type MachPort = c_uint; + +/// A C file descriptor. +pub type FileDescriptor = c_int; + +bitflags! { + /// Flags for the MachSend source type. + pub flags MachSendFlags: dispatch_source_mach_send_flags_t { + /// If set, generates an event if the port destination's + /// receive right is destroyed. + const MACH_SEND_DEAD = DISPATCH_MACH_SEND_DEAD, + } +} + +bitflags! { + /// Event flags for the MemoryPressure source type. + pub flags MemoryPressureFlags: dispatch_source_memorypressure_flags_t { + /// System memory pressure is normal. + const MEMORYPRESSURE_NORMAL = DISPATCH_MEMORYPRESSURE_NORMAL, + /// System memory pressure warning. + const MEMORYPRESSURE_WARN = DISPATCH_MEMORYPRESSURE_WARN, + /// System memory pressure is critical. + const MEMORYPRESSURE_CRITICAL = DISPATCH_MEMORYPRESSURE_CRITICAL, + } +} + +bitflags! { + /// Event flags for the Proc source type. + pub flags ProcFlags: dispatch_source_proc_flags_t { + /// The process exited. + const PROC_EXIT = DISPATCH_PROC_EXIT, + /// The process forked. + const PROC_FORK = DISPATCH_PROC_FORK, + /// The process exec'd. + const PROC_EXEC = DISPATCH_PROC_EXEC, + /// The process received a signal. + const PROC_SIGNAL = DISPATCH_PROC_SIGNAL, + } +} + +bitflags! { + /// Flags for the Timer source type. + pub flags TimerFlags: dispatch_source_timer_flags_t { + /// Force stricter adherence to the specified leeway. + const TIMER_STRICT = DISPATCH_TIMER_STRICT, + } +} + +bitflags! { + /// Event flags for the Vnode source type. + pub flags VnodeFlags: dispatch_source_vnode_flags_t { + /// A filesystem object was deleted. + const VNODE_DELETE = DISPATCH_VNODE_DELETE, + /// The data of a filesystem object changed. + const VNODE_WRITE = DISPATCH_VNODE_WRITE, + /// The size of a filesystem object changed. + const VNODE_EXTEND = DISPATCH_VNODE_EXTEND, + /// The attributes of a filesystem object changed. + const VNODE_ATTRIB = DISPATCH_VNODE_ATTRIB, + /// The link count of a filesystem object changed. + const VNODE_LINK = DISPATCH_VNODE_LINK, + /// A filesystem object was renamed. + const VNODE_RENAME = DISPATCH_VNODE_RENAME, + /// A filesystem object was revoked. + const VNODE_REVOKE = DISPATCH_VNODE_REVOKE, + } +} + +/// Types of dispatch sources. +pub trait SourceType { + /// The underlying dispatch source configuration. + fn as_raw(&self) -> (dispatch_source_type_t, uintptr_t, c_ulong); +} + +macro_rules! impl_source_type { + (@handle, $s:expr, @) => (0); + (@handle, $s:expr, $handle:ident) => ($s.$handle as _); + (@mask, $s:expr, @) => (0); + (@mask, $s:expr, $mask:ident) => ($s.$mask.bits()); + ($t:ty, $dt:expr, $handle:tt, $mask:tt) => { + impl SourceType for $t { + fn as_raw(&self) -> (dispatch_source_type_t, uintptr_t, c_ulong) { + ($dt, + impl_source_type!(@handle, self, $handle), + impl_source_type!(@mask, self, $mask)) + } + } + }; +} + +/// Arithmetic add accumulator source type. +#[derive(Copy, Clone)] +pub struct DataAdd; +impl_source_type!(DataAdd, DISPATCH_SOURCE_TYPE_DATA_ADD, @, @); + +/// Bitwise or accumulator source type. +#[derive(Copy, Clone)] +pub struct DataOr; +impl_source_type!(DataOr, DISPATCH_SOURCE_TYPE_DATA_OR, @, @); + +/// Mach message send readyness event source type. +#[derive(Copy, Clone)] +pub struct MachSend { + /// The Mach port to send to. + pub port: MachPort, + /// Mach send event options. + pub flags: MachSendFlags, +} +impl_source_type!(MachSend, DISPATCH_SOURCE_TYPE_MACH_SEND, port, flags); + +/// Mach message receive readyness event source type. +#[derive(Copy, Clone)] +pub struct MachRecv { + /// The Mach port to receive from. + pub port: MachPort, +} +impl_source_type!(MachRecv, DISPATCH_SOURCE_TYPE_MACH_RECV, port, @); + +/// System memory pressure event source type. +#[derive(Copy, Clone)] +pub struct MemoryPressure { + /// Event mask. + pub flags: MemoryPressureFlags, +} +impl_source_type!(MemoryPressure, DISPATCH_SOURCE_TYPE_MEMORYPRESSURE, @, flags); + +/// Process lifecycle event source type. +#[derive(Copy, Clone)] +pub struct Proc { + /// Process ID to receive events from. + pub pid: ::libc::pid_t, + /// Event mask. + pub flags: ProcFlags, +} +impl_source_type!(Proc, DISPATCH_SOURCE_TYPE_PROC, pid, flags); + +/// File read readness event source type. +#[derive(Copy, Clone)] +pub struct Read { + /// The file descriptor to read from. + pub fd: FileDescriptor, +} +impl_source_type!(Read, DISPATCH_SOURCE_TYPE_READ, fd, @); + +/// Current process signal event source type. +#[derive(Copy, Clone)] +pub struct Signal { + /// The signal number to receive events for. + pub signal: c_int, +} +impl_source_type!(Signal, DISPATCH_SOURCE_TYPE_SIGNAL, signal, @); + +/// Timer event source type. +#[derive(Copy, Clone)] +pub struct Timer { + /// Timer event options. + pub flags: TimerFlags, +} +impl_source_type!(Timer, DISPATCH_SOURCE_TYPE_TIMER, @, flags); + +/// Filesystem vnode event source type. +#[derive(Copy, Clone)] +pub struct Vnode { + /// The vnode to receive events from. + pub fd: FileDescriptor, + /// Event mask. + pub flags: VnodeFlags, +} +impl_source_type!(Vnode, DISPATCH_SOURCE_TYPE_VNODE, fd, flags); + +/// File write readyness event source type. +#[derive(Copy, Clone)] +pub struct Write { + /// The file descriptor to write to. + pub fd: FileDescriptor, +} +impl_source_type!(Write, DISPATCH_SOURCE_TYPE_WRITE, fd, @); + +/// Types of sources that accumulate an integer value between events. +pub trait DataSourceType: SourceType {} +impl DataSourceType for DataAdd {} +impl DataSourceType for DataOr {} diff --git a/tests-ios/prelude.rs b/tests-ios/prelude.rs index 0b2f738..60125d5 100644 --- a/tests-ios/prelude.rs +++ b/tests-ios/prelude.rs @@ -1,6 +1,6 @@ extern crate dispatch; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Barrier, Mutex}; use std::time::{Duration, Instant}; use dispatch::*;