44
55from __future__ import annotations
66
7+ cimport cpython
78from libc.stdint cimport uintptr_t
9+ from libc.string cimport memcpy
810
911from cuda.bindings cimport cydriver
1012
@@ -14,6 +16,7 @@ from cuda.core.experimental._utils.cuda_utils cimport (
1416)
1517
1618from dataclasses import dataclass
19+ import multiprocessing
1720from typing import TYPE_CHECKING, Optional
1821
1922from cuda.core.experimental._context import Context
@@ -40,15 +43,15 @@ cdef class EventOptions:
4043 has actually been completed.
4144 Otherwise, the CPU thread will busy-wait until the event has
4245 been completed. (Default to False)
43- support_ipc : bool, optional
46+ ipc_enabled : bool, optional
4447 Event will be suitable for interprocess use.
4548 Note that enable_timing must be False. (Default to False)
4649
4750 """
4851
4952 enable_timing: Optional[bool ] = False
5053 busy_waited_sync: Optional[bool ] = False
51- support_ipc : Optional[bool ] = False
54+ ipc_enabled : Optional[bool ] = False
5255
5356
5457cdef class Event:
@@ -86,24 +89,35 @@ cdef class Event:
8689 raise RuntimeError (" Event objects cannot be instantiated directly. Please use Stream APIs (record)." )
8790
8891 @classmethod
89- def _init (cls , device_id: int , ctx_handle: Context , options = None ):
92+ def _init (cls , device_id: int , ctx_handle: Context , options = None , is_free = False ):
9093 cdef Event self = Event.__new__ (cls )
9194 cdef EventOptions opts = check_or_create_options(EventOptions, options, " Event options" )
9295 cdef unsigned int flags = 0x0
9396 self ._timing_disabled = False
9497 self ._busy_waited = False
98+ self ._ipc_enabled = False
99+ self ._ipc_descriptor = None
95100 if not opts.enable_timing:
96101 flags |= cydriver.CUevent_flags.CU_EVENT_DISABLE_TIMING
97102 self ._timing_disabled = True
98103 if opts.busy_waited_sync:
99104 flags |= cydriver.CUevent_flags.CU_EVENT_BLOCKING_SYNC
100105 self ._busy_waited = True
101- if opts.support_ipc:
102- raise NotImplementedError (" WIP: https://github.com/NVIDIA/cuda-python/issues/103" )
106+ if opts.ipc_enabled:
107+ if is_free:
108+ raise TypeError (
109+ " IPC-enabled events must be bound; use Stream.record for creation."
110+ )
111+ flags |= cydriver.CUevent_flags.CU_EVENT_INTERPROCESS
112+ self ._ipc_enabled = True
113+ if not self ._timing_disabled:
114+ raise TypeError (" IPC-enabled events cannot use timing." )
103115 with nogil:
104116 HANDLE_RETURN(cydriver.cuEventCreate(& self ._handle, flags))
105117 self ._device_id = device_id
106118 self ._ctx_handle = ctx_handle
119+ if opts.ipc_enabled:
120+ self .get_ipc_descriptor()
107121 return self
108122
109123 cpdef close(self ):
@@ -151,6 +165,40 @@ cdef class Event:
151165 raise CUDAError(err)
152166 raise RuntimeError (explanation)
153167
168+ def get_ipc_descriptor (self ) -> IPCEventDescriptor:
169+ """Export an event allocated for sharing between processes."""
170+ if self._ipc_descriptor is not None:
171+ return self._ipc_descriptor
172+ if not self.is_ipc_enabled:
173+ raise RuntimeError("Event is not IPC-enabled")
174+ cdef cydriver.CUipcEventHandle data
175+ with nogil:
176+ HANDLE_RETURN(cydriver.cuIpcGetEventHandle(&data , <cydriver.CUevent>(self._handle )))
177+ cdef bytes data_b = cpython.PyBytes_FromStringAndSize(< char * > (data.reserved), sizeof(data.reserved))
178+ self._ipc_descriptor = IPCEventDescriptor._init(data_b, self ._busy_waited)
179+ return self._ipc_descriptor
180+
181+ @classmethod
182+ def from_ipc_descriptor(cls , ipc_descriptor: IPCEventDescriptor ) -> Event:
183+ """Import an event that was exported from another process."""
184+ cdef cydriver.CUipcEventHandle data
185+ memcpy(data.reserved , <const void*><const char*>(ipc_descriptor._reserved ), sizeof(data.reserved ))
186+ cdef Event self = Event.__new__ (cls )
187+ with nogil:
188+ HANDLE_RETURN(cydriver.cuIpcOpenEventHandle(&self._handle , data ))
189+ self._timing_disabled = True
190+ self._busy_waited = ipc_descriptor._busy_waited
191+ self._ipc_enabled = True
192+ self._ipc_descriptor = ipc_descriptor
193+ self._device_id = - 1 # ??
194+ self._ctx_handle = None # ??
195+ return self
196+
197+ @property
198+ def is_ipc_enabled(self ) -> bool:
199+ """Return True if the event can be shared across process boundaries , otherwise False."""
200+ return self._ipc_enabled
201+
154202 @property
155203 def is_timing_disabled(self ) -> bool:
156204 """Return True if the event does not record timing data , otherwise False."""
@@ -161,11 +209,6 @@ cdef class Event:
161209 """Return True if the event synchronization would keep the CPU busy-waiting , otherwise False."""
162210 return self._busy_waited
163211
164- @property
165- def is_ipc_supported(self ) -> bool:
166- """Return True if this event can be used as an interprocess event , otherwise False."""
167- raise NotImplementedError("WIP: https://github.com/NVIDIA/cuda-python/issues/103")
168-
169212 def sync(self ):
170213 """ Synchronize until the event completes.
171214
@@ -212,12 +255,43 @@ cdef class Event:
212255 context is set current after a event is created.
213256
214257 """
215-
216- from cuda.core.experimental._device import Device # avoid circular import
217-
218- return Device(self._device_id )
258+ if self._device_id >= 0:
259+ from ._device import Device # avoid circular import
260+ return Device(self._device_id )
219261
220262 @property
221263 def context(self ) -> Context:
222264 """Return the :obj:`~_context.Context` associated with this event."""
223- return Context._from_ctx(self._ctx_handle , self._device_id )
265+ if self._ctx_handle is not None and self._device_id >= 0:
266+ return Context._from_ctx(self._ctx_handle , self._device_id )
267+
268+
269+ cdef class IPCEventDescriptor:
270+ """Serializable object describing an event that can be shared between processes."""
271+
272+ cdef:
273+ bytes _reserved
274+ bint _busy_waited
275+
276+ def __init__(self , *arg , **kwargs ):
277+ raise RuntimeError (" IPCEventDescriptor objects cannot be instantiated directly. Please use Event APIs." )
278+
279+ @classmethod
280+ def _init (cls , reserved: bytes , busy_waited: bint ):
281+ cdef IPCEventDescriptor self = IPCEventDescriptor.__new__ (cls )
282+ self ._reserved = reserved
283+ self ._busy_waited = busy_waited
284+ return self
285+
286+ def __eq__ (self , IPCEventDescriptor rhs ):
287+ # No need to check self._busy_waited.
288+ return self ._reserved == rhs._reserved
289+
290+ def __reduce__ (self ):
291+ return self ._init, (self ._reserved, self ._busy_waited)
292+
293+
294+ def _reduce_event (event ):
295+ return event.from_ipc_descriptor, (event.get_ipc_descriptor(),)
296+
297+ multiprocessing.reduction.register(Event, _reduce_event)
0 commit comments