diff --git a/druntime/src/core/internal/gc/impl/conservative/gc.d b/druntime/src/core/internal/gc/impl/conservative/gc.d index 4ba458783e1..ce0c7897146 100644 --- a/druntime/src/core/internal/gc/impl/conservative/gc.d +++ b/druntime/src/core/internal/gc/impl/conservative/gc.d @@ -3269,8 +3269,7 @@ Lmark: Gcx.instance.scanThreadData = null; Gcx.instance.busyThreads = 0; - memset(&Gcx.instance.evStart, 0, Gcx.instance.evStart.sizeof); - memset(&Gcx.instance.evDone, 0, Gcx.instance.evDone.sizeof); + memset(&Gcx.instance.ev, 0, Gcx.instance.ev.sizeof); } } } @@ -3282,7 +3281,7 @@ Lmark: import core.atomic; import core.cpuid; - import core.sync.event; + import core.sync.event : EventAwaiter; private: // disable invariants for background threads @@ -3293,8 +3292,12 @@ Lmark: uint numScanThreads; ScanThreadData* scanThreadData; - Event evStart; - Event evDone; + static struct EvStartDone + { + EventAwaiter start; + EventAwaiter done; + } + EvStartDone ev; shared uint busyThreads; shared uint stoppedThreads; @@ -3336,7 +3339,7 @@ Lmark: busyThreads.atomicOp!"+="(1); // main thread is busy - evStart.setIfInitialized(); + ev.start.set(); debug(PARALLEL_PRINTF) printf("mark %lld roots\n", cast(ulong)(ptop - pbot)); @@ -3396,8 +3399,8 @@ Lmark: if (!scanThreadData) onOutOfMemoryError(); - evStart.initialize(false, false); - evDone.initialize(false, false); + ev.start = EventAwaiter(false, false); + ev.done = EventAwaiter(false, false); version (Posix) { @@ -3438,8 +3441,8 @@ Lmark: stopGC = true; while (atomicLoad(stoppedThreads) < startedThreads && !allThreadsDead) { - evStart.setIfInitialized(); - evDone.wait(dur!"msecs"(1)); + ev.start.set(); + ev.done.wait(dur!"msecs"(1)); } for (int idx = 0; idx < numScanThreads; idx++) @@ -3451,8 +3454,8 @@ Lmark: } } - evDone.terminate(); - evStart.terminate(); + destroy(ev.start); + destroy(ev.done); cstdlib.free(scanThreadData); // scanThreadData = null; // keep non-null to not start again after shutdown @@ -3465,9 +3468,9 @@ Lmark: { while (!stopGC) { - evStart.wait(); + ev.start.wait(); pullFromScanStack(); - evDone.setIfInitialized(); + ev.done.set(); } stoppedThreads.atomicOp!"+="(1); } @@ -3496,7 +3499,7 @@ Lmark: { if (toscan.empty) { - evDone.wait(dur!"msecs"(1)); + ev.done.wait(dur!"msecs"(1)); continue; } diff --git a/druntime/src/core/sync/event.d b/druntime/src/core/sync/event.d index fa3937a157b..1de9ac9e71e 100644 --- a/druntime/src/core/sync/event.d +++ b/druntime/src/core/sync/event.d @@ -13,7 +13,122 @@ module core.sync.event; import core.time; -import rt.sys.config; +struct EventAwaiter +{ +nothrow @nogc: + /** + * Creates an event object. + * + * Params: + * manualReset = the state of the event is not reset automatically after resuming waiting clients + * initialState = initial state of the signal + */ + this(bool manualReset, bool initialState) + { + osEvent = OsEvent(manualReset, initialState); + } + + // copying not allowed, can produce resource leaks + @disable this(this); + + ref EventAwaiter opAssign(return scope EventAwaiter s) @live + { + this.osEvent = s.osEvent; + + return this; + } + + /// Set the event to "signaled", so that waiting clients are resumed + void set() + { + osEvent.set(); + } + + /// Reset the event manually + void reset() + { + osEvent.reset(); + } + + /** + * Wait for the event to be signaled without timeout. + * + * Returns: + * `true` if the event is in signaled state, `false` if the event is uninitialized or another error occured + */ + bool wait() + { + return osEvent.wait(); + } + + /** + * Wait for the event to be signaled with timeout. + * + * Params: + * tmout = the maximum time to wait + * Returns: + * `true` if the event is in signaled state, `false` if the event was nonsignaled for the given time or + * the event is uninitialized or another error occured + */ + bool wait(Duration tmout) + { + return osEvent.wait(tmout); + } + +private: + import rt.sys.config; + + mixin("import " ~ osEventImport ~ ";"); + OsEvent osEvent; +} + +// Test single-thread (non-shared) use. +@nogc nothrow unittest +{ + // auto-reset, initial state false + EventAwaiter ev1 = EventAwaiter(false, false); + assert(!ev1.wait(1.dur!"msecs")); + ev1.set(); + assert(ev1.wait()); + assert(!ev1.wait(1.dur!"msecs")); + + // manual-reset, initial state true + EventAwaiter ev2 = EventAwaiter(true, true); + assert(ev2.wait()); + assert(ev2.wait()); + ev2.reset(); + assert(!ev2.wait(1.dur!"msecs")); +} + +unittest +{ + import core.thread, core.atomic; + + scope event = new EventAwaiter(true, false); + int numThreads = 10; + shared int numRunning = 0; + + void testFn() + { + event.wait(8.dur!"seconds"); // timeout below limit for druntime test_runner + numRunning.atomicOp!"+="(1); + } + + auto group = new ThreadGroup; + + for (int i = 0; i < numThreads; ++i) + group.create(&testFn); + + auto start = MonoTime.currTime; + assert(numRunning == 0); + + event.set(); + group.joinAll(); + + assert(numRunning == numThreads); + + assert(MonoTime.currTime - start < 5.dur!"seconds"); +} /** * represents an event. Clients of an event are suspended while waiting @@ -51,6 +166,7 @@ struct ProcessFile } --- */ +deprecated("Please use core.sync.event.EventAwaiter instead") struct Event { nothrow @nogc: @@ -73,9 +189,10 @@ nothrow @nogc: * manualReset = the state of the event is not reset automatically after resuming waiting clients * initialState = initial state of the signal */ - void initialize(bool manualReset, bool initialState) + void initialize(bool manualReset, bool initialState) @live { - osEvent.create(manualReset, initialState); + osEvent = EventAwaiter(manualReset, initialState); + m_initalized = true; } // copying not allowed, can produce resource leaks @@ -85,6 +202,7 @@ nothrow @nogc: ~this() { terminate(); + m_initalized = false; } /** @@ -104,13 +222,15 @@ nothrow @nogc: /// Set the event to "signaled", so that waiting clients are resumed void setIfInitialized() { - osEvent.setIfInitialized(); + if(m_initalized) + osEvent.set(); } /// Reset the event manually void reset() { - osEvent.reset(); + if(m_initalized) + osEvent.reset(); } /** @@ -121,6 +241,9 @@ nothrow @nogc: */ bool wait() { + if (!m_initalized) + return false; + return osEvent.wait(); } @@ -135,12 +258,15 @@ nothrow @nogc: */ bool wait(Duration tmout) { + if (!m_initalized) + return false; + return osEvent.wait(tmout); } private: - mixin("import " ~ osEventImport ~ ";"); - OsEvent osEvent; + EventAwaiter osEvent; + bool m_initalized; } // Test single-thread (non-shared) use. diff --git a/druntime/src/rt/sys/posix/osevent.d b/druntime/src/rt/sys/posix/osevent.d index 2d28a4a38c9..d6b8f1f9385 100644 --- a/druntime/src/rt/sys/posix/osevent.d +++ b/druntime/src/rt/sys/posix/osevent.d @@ -11,10 +11,8 @@ import core.internal.abort : abort; struct OsEvent { - void create(bool manualReset, bool initialState) nothrow @trusted @nogc + this(bool manualReset, bool initialState) nothrow @trusted @nogc { - if (m_initalized) - return; pthread_mutex_init(cast(pthread_mutex_t*) &m_mutex, null) == 0 || abort("Error: pthread_mutex_init failed."); static if ( is( typeof( pthread_condattr_setclock ) ) ) @@ -37,40 +35,29 @@ struct OsEvent m_state = initialState; m_manualReset = manualReset; - m_initalized = true; } - void destroy() nothrow @trusted @nogc + ~this() nothrow @trusted @nogc { - if (m_initalized) - { - pthread_mutex_destroy(&m_mutex) == 0 || - abort("Error: pthread_mutex_destroy failed."); - pthread_cond_destroy(&m_cond) == 0 || - abort("Error: pthread_cond_destroy failed."); - m_initalized = false; - } + pthread_mutex_destroy(&m_mutex) == 0 || + abort("Error: pthread_mutex_destroy failed."); + pthread_cond_destroy(&m_cond) == 0 || + abort("Error: pthread_cond_destroy failed."); } - void setIfInitialized() nothrow @trusted @nogc + void set() nothrow @trusted @nogc { - if (m_initalized) - { - pthread_mutex_lock(&m_mutex); - m_state = true; - pthread_cond_broadcast(&m_cond); - pthread_mutex_unlock(&m_mutex); - } + pthread_mutex_lock(&m_mutex); + m_state = true; + pthread_cond_broadcast(&m_cond); + pthread_mutex_unlock(&m_mutex); } void reset() nothrow @trusted @nogc { - if (m_initalized) - { - pthread_mutex_lock(&m_mutex); - m_state = false; - pthread_mutex_unlock(&m_mutex); - } + pthread_mutex_lock(&m_mutex); + m_state = false; + pthread_mutex_unlock(&m_mutex); } bool wait() nothrow @trusted @nogc @@ -80,9 +67,6 @@ struct OsEvent bool wait(Duration tmout) nothrow @trusted @nogc { - if (!m_initalized) - return false; - pthread_mutex_lock(&m_mutex); int result = 0; @@ -114,7 +98,6 @@ private: pthread_mutex_t m_mutex; pthread_cond_t m_cond; - bool m_initalized; bool m_state; bool m_manualReset; } \ No newline at end of file diff --git a/druntime/src/rt/sys/windows/osevent.d b/druntime/src/rt/sys/windows/osevent.d index e5536a81dfc..83ef4da79df 100644 --- a/druntime/src/rt/sys/windows/osevent.d +++ b/druntime/src/rt/sys/windows/osevent.d @@ -12,43 +12,34 @@ import core.internal.abort : abort; struct OsEvent { - void create(bool manualReset, bool initialState) nothrow @trusted @nogc + this(bool manualReset, bool initialState) nothrow @trusted @nogc { - if (m_event) - return; m_event = CreateEvent(null, manualReset, initialState, null); m_event || abort("Error: CreateEvent failed."); } - void destroy() nothrow @trusted @nogc + ~this() nothrow @trusted @nogc { - if (m_event) - CloseHandle(m_event); - m_event = null; + CloseHandle(m_event); } - void setIfInitialized() nothrow @trusted @nogc + void set() nothrow @trusted @nogc { - if (m_event) - SetEvent(m_event); + SetEvent(m_event); } void reset() nothrow @trusted @nogc { - if (m_event) - ResetEvent(m_event); + ResetEvent(m_event); } bool wait() nothrow @trusted @nogc { - return m_event && WaitForSingleObject(m_event, INFINITE) == WAIT_OBJECT_0; + return WaitForSingleObject(m_event, INFINITE) == WAIT_OBJECT_0; } bool wait(Duration tmout) nothrow @trusted @nogc { - if (!m_event) - return false; - auto maxWaitMillis = dur!("msecs")(uint.max - 1); while (tmout > maxWaitMillis)