diff --git a/src/main/java/dev/openfeature/sdk/Awaitable.java b/src/main/java/dev/openfeature/sdk/Awaitable.java new file mode 100644 index 000000000..7d5f477dc --- /dev/null +++ b/src/main/java/dev/openfeature/sdk/Awaitable.java @@ -0,0 +1,44 @@ +package dev.openfeature.sdk; + +/** + * A class to help with synchronization by allowing the optional awaiting of the associated action. + */ +public class Awaitable { + + /** + * An already-completed Awaitable. Awaiting this will return immediately. + */ + public static final Awaitable FINISHED = new Awaitable(true); + + private boolean isDone = false; + + public Awaitable() {} + + private Awaitable(boolean isDone) { + this.isDone = isDone; + } + + /** + * Lets the calling thread wait until some other thread calls {@link Awaitable#wakeup()}. If + * {@link Awaitable#wakeup()} has been called before the current thread invokes this method, it will return + * immediately. + */ + @SuppressWarnings("java:S2142") + public synchronized void await() { + while (!isDone) { + try { + this.wait(); + } catch (InterruptedException ignored) { + // ignored, do not propagate the interrupted state + } + } + } + + /** + * Wakes up all threads that have called {@link Awaitable#await()} and lets them proceed. + */ + public synchronized void wakeup() { + isDone = true; + this.notifyAll(); + } +} diff --git a/src/main/java/dev/openfeature/sdk/EventProvider.java b/src/main/java/dev/openfeature/sdk/EventProvider.java index 659c6ad46..0d7e897c2 100644 --- a/src/main/java/dev/openfeature/sdk/EventProvider.java +++ b/src/main/java/dev/openfeature/sdk/EventProvider.java @@ -76,15 +76,32 @@ public void shutdown() { * @param event The event type * @param details The details of the event */ - public void emit(ProviderEvent event, ProviderEventDetails details) { - if (eventProviderListener != null) { - eventProviderListener.onEmit(event, details); - } + public Awaitable emit(final ProviderEvent event, final ProviderEventDetails details) { + final var localEventProviderListener = this.eventProviderListener; + final var localOnEmit = this.onEmit; - final TriConsumer localOnEmit = this.onEmit; - if (localOnEmit != null) { - emitterExecutor.submit(() -> localOnEmit.accept(this, event, details)); + if (localEventProviderListener == null && localOnEmit == null) { + return Awaitable.FINISHED; } + + final var awaitable = new Awaitable(); + + // These calls need to be executed on a different thread to prevent deadlocks when the provider initialization + // relies on a ready event to be emitted + emitterExecutor.submit(() -> { + try (var ignored = OpenFeatureAPI.lock.readLockAutoCloseable()) { + if (localEventProviderListener != null) { + localEventProviderListener.onEmit(event, details); + } + if (localOnEmit != null) { + localOnEmit.accept(this, event, details); + } + } finally { + awaitable.wakeup(); + } + }); + + return awaitable; } /** @@ -93,8 +110,8 @@ public void emit(ProviderEvent event, ProviderEventDetails details) { * * @param details The details of the event */ - public void emitProviderReady(ProviderEventDetails details) { - emit(ProviderEvent.PROVIDER_READY, details); + public Awaitable emitProviderReady(ProviderEventDetails details) { + return emit(ProviderEvent.PROVIDER_READY, details); } /** @@ -104,8 +121,8 @@ public void emitProviderReady(ProviderEventDetails details) { * * @param details The details of the event */ - public void emitProviderConfigurationChanged(ProviderEventDetails details) { - emit(ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, details); + public Awaitable emitProviderConfigurationChanged(ProviderEventDetails details) { + return emit(ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, details); } /** @@ -114,8 +131,8 @@ public void emitProviderConfigurationChanged(ProviderEventDetails details) { * * @param details The details of the event */ - public void emitProviderStale(ProviderEventDetails details) { - emit(ProviderEvent.PROVIDER_STALE, details); + public Awaitable emitProviderStale(ProviderEventDetails details) { + return emit(ProviderEvent.PROVIDER_STALE, details); } /** @@ -124,7 +141,7 @@ public void emitProviderStale(ProviderEventDetails details) { * * @param details The details of the event */ - public void emitProviderError(ProviderEventDetails details) { - emit(ProviderEvent.PROVIDER_ERROR, details); + public Awaitable emitProviderError(ProviderEventDetails details) { + return emit(ProviderEvent.PROVIDER_ERROR, details); } } diff --git a/src/main/java/dev/openfeature/sdk/EventSupport.java b/src/main/java/dev/openfeature/sdk/EventSupport.java index 5ebe90a4c..8396795bd 100644 --- a/src/main/java/dev/openfeature/sdk/EventSupport.java +++ b/src/main/java/dev/openfeature/sdk/EventSupport.java @@ -1,12 +1,12 @@ package dev.openfeature.sdk; -import java.util.ArrayList; -import java.util.List; +import java.util.Collection; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -23,13 +23,10 @@ class EventSupport { // we use a v4 uuid as a "placeholder" for anonymous clients, since // ConcurrentHashMap doesn't support nulls - private static final String defaultClientUuid = UUID.randomUUID().toString(); + private static final String DEFAULT_CLIENT_UUID = UUID.randomUUID().toString(); private final Map handlerStores = new ConcurrentHashMap<>(); private final HandlerStore globalHandlerStore = new HandlerStore(); - private final ExecutorService taskExecutor = Executors.newCachedThreadPool(runnable -> { - final Thread thread = new Thread(runnable); - return thread; - }); + private final ExecutorService taskExecutor = Executors.newCachedThreadPool(); /** * Run all the event handlers associated with this domain. @@ -40,11 +37,10 @@ class EventSupport { * @param eventDetails the event details */ public void runClientHandlers(String domain, ProviderEvent event, EventDetails eventDetails) { - domain = Optional.ofNullable(domain).orElse(defaultClientUuid); + domain = Optional.ofNullable(domain).orElse(DEFAULT_CLIENT_UUID); // run handlers if they exist Optional.ofNullable(handlerStores.get(domain)) - .filter(store -> Optional.of(store).isPresent()) .map(store -> store.handlerMap.get(event)) .ifPresent(handlers -> handlers.forEach(handler -> runHandler(handler, eventDetails))); } @@ -69,7 +65,7 @@ public void runGlobalHandlers(ProviderEvent event, EventDetails eventDetails) { * @param handler the handler function to run */ public void addClientHandler(String domain, ProviderEvent event, Consumer handler) { - final String name = Optional.ofNullable(domain).orElse(defaultClientUuid); + final String name = Optional.ofNullable(domain).orElse(DEFAULT_CLIENT_UUID); // lazily create and cache a HandlerStore if it doesn't exist HandlerStore store = Optional.ofNullable(this.handlerStores.get(name)).orElseGet(() -> { @@ -89,7 +85,7 @@ public void addClientHandler(String domain, ProviderEvent event, Consumer handler) { - domain = Optional.ofNullable(domain).orElse(defaultClientUuid); + domain = Optional.ofNullable(domain).orElse(DEFAULT_CLIENT_UUID); this.handlerStores.get(domain).removeHandler(event, handler); } @@ -160,14 +156,14 @@ public void shutdown() { // instantiated when a handler is added to that client. static class HandlerStore { - private final Map>> handlerMap; + private final Map>> handlerMap; HandlerStore() { handlerMap = new ConcurrentHashMap<>(); - handlerMap.put(ProviderEvent.PROVIDER_READY, new ArrayList<>()); - handlerMap.put(ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, new ArrayList<>()); - handlerMap.put(ProviderEvent.PROVIDER_ERROR, new ArrayList<>()); - handlerMap.put(ProviderEvent.PROVIDER_STALE, new ArrayList<>()); + handlerMap.put(ProviderEvent.PROVIDER_READY, new ConcurrentLinkedQueue<>()); + handlerMap.put(ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, new ConcurrentLinkedQueue<>()); + handlerMap.put(ProviderEvent.PROVIDER_ERROR, new ConcurrentLinkedQueue<>()); + handlerMap.put(ProviderEvent.PROVIDER_STALE, new ConcurrentLinkedQueue<>()); } void addHandler(ProviderEvent event, Consumer handler) { diff --git a/src/test/java/dev/openfeature/sdk/AwaitableTest.java b/src/test/java/dev/openfeature/sdk/AwaitableTest.java new file mode 100644 index 000000000..70ef7902c --- /dev/null +++ b/src/test/java/dev/openfeature/sdk/AwaitableTest.java @@ -0,0 +1,75 @@ +package dev.openfeature.sdk; + +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +@Timeout(value = 5, threadMode = Timeout.ThreadMode.SEPARATE_THREAD) +class AwaitableTest { + @Test + void waitingForFinishedIsANoOp() { + var startTime = System.currentTimeMillis(); + Awaitable.FINISHED.await(); + var endTime = System.currentTimeMillis(); + assertTrue(endTime - startTime < 10); + } + + @Test + void waitingForNotFinishedWaitsEvenWhenInterrupted() throws InterruptedException { + var awaitable = new Awaitable(); + var mayProceed = new AtomicBoolean(false); + + var thread = new Thread(() -> { + awaitable.await(); + if (!mayProceed.get()) { + fail(); + } + }); + thread.start(); + + var startTime = System.currentTimeMillis(); + do { + thread.interrupt(); + } while (startTime + 1000 > System.currentTimeMillis()); + mayProceed.set(true); + awaitable.wakeup(); + thread.join(); + } + + @Test + void callingWakeUpWakesUpAllWaitingThreads() throws InterruptedException { + var awaitable = new Awaitable(); + var isRunning = new AtomicInteger(); + + Runnable runnable = () -> { + isRunning.incrementAndGet(); + var start = System.currentTimeMillis(); + awaitable.await(); + var end = System.currentTimeMillis(); + if (end - start > 10) { + fail(); + } + }; + + var numThreads = 2; + var threads = new Thread[numThreads]; + for (int i = 0; i < numThreads; i++) { + threads[i] = new Thread(runnable); + threads[i].start(); + } + + await().atMost(1, TimeUnit.SECONDS).until(() -> isRunning.get() == numThreads); + + awaitable.wakeup(); + + for (int i = 0; i < numThreads; i++) { + threads[i].join(); + } + } +} diff --git a/src/test/java/dev/openfeature/sdk/DeveloperExperienceTest.java b/src/test/java/dev/openfeature/sdk/DeveloperExperienceTest.java index 32fa605c2..c954c8b19 100644 --- a/src/test/java/dev/openfeature/sdk/DeveloperExperienceTest.java +++ b/src/test/java/dev/openfeature/sdk/DeveloperExperienceTest.java @@ -150,7 +150,7 @@ void shouldPutTheProviderInStateErrorAfterEmittingErrorEvent() { api.setProviderAndWait(domain, provider); Client client = api.getClient(domain); assertThat(client.getProviderState()).isEqualTo(ProviderState.READY); - provider.emitProviderError(ProviderEventDetails.builder().build()); + provider.emitProviderError(ProviderEventDetails.builder().build()).await(); assertThat(client.getProviderState()).isEqualTo(ProviderState.ERROR); } @@ -165,7 +165,7 @@ void shouldPutTheProviderInStateStaleAfterEmittingStaleEvent() { api.setProviderAndWait(domain, provider); Client client = api.getClient(domain); assertThat(client.getProviderState()).isEqualTo(ProviderState.READY); - provider.emitProviderStale(ProviderEventDetails.builder().build()); + provider.emitProviderStale(ProviderEventDetails.builder().build()).await(); assertThat(client.getProviderState()).isEqualTo(ProviderState.STALE); } @@ -180,9 +180,9 @@ void shouldPutTheProviderInStateReadyAfterEmittingReadyEvent() { api.setProviderAndWait(domain, provider); Client client = api.getClient(domain); assertThat(client.getProviderState()).isEqualTo(ProviderState.READY); - provider.emitProviderStale(ProviderEventDetails.builder().build()); + provider.emitProviderStale(ProviderEventDetails.builder().build()).await(); assertThat(client.getProviderState()).isEqualTo(ProviderState.STALE); - provider.emitProviderReady(ProviderEventDetails.builder().build()); + provider.emitProviderReady(ProviderEventDetails.builder().build()).await(); assertThat(client.getProviderState()).isEqualTo(ProviderState.READY); } } diff --git a/src/test/java/dev/openfeature/sdk/EventProviderTest.java b/src/test/java/dev/openfeature/sdk/EventProviderTest.java index ebf8901cb..d04fa88d1 100644 --- a/src/test/java/dev/openfeature/sdk/EventProviderTest.java +++ b/src/test/java/dev/openfeature/sdk/EventProviderTest.java @@ -32,6 +32,7 @@ public static void resetDefaultProvider() { } @Test + @Timeout(value = 2, threadMode = Timeout.ThreadMode.SEPARATE_THREAD) @DisplayName("should run attached onEmit with emitters") void emitsEventsWhenAttached() { TriConsumer onEmit = mockOnEmit(); diff --git a/src/test/java/dev/openfeature/sdk/EventsTest.java b/src/test/java/dev/openfeature/sdk/EventsTest.java index 157c0bafe..b232f1177 100644 --- a/src/test/java/dev/openfeature/sdk/EventsTest.java +++ b/src/test/java/dev/openfeature/sdk/EventsTest.java @@ -24,7 +24,7 @@ class EventsTest { private OpenFeatureAPI api; @BeforeEach - public void setUp() throws Exception { + void setUp() { api = new OpenFeatureAPI(); } @@ -578,7 +578,7 @@ void shouldHaveAllProperties() { number = "5.3.3", text = "Handlers attached after the provider is already in the associated state, MUST run immediately.") void matchingReadyEventsMustRunImmediately() { - final String name = "matchingEventsMustRunImmediately"; + final String name = "matchingReadyEventsMustRunImmediately"; final Consumer handler = mockHandler(); // provider which is already ready @@ -597,14 +597,14 @@ void matchingReadyEventsMustRunImmediately() { number = "5.3.3", text = "Handlers attached after the provider is already in the associated state, MUST run immediately.") void matchingStaleEventsMustRunImmediately() { - final String name = "matchingEventsMustRunImmediately"; + final String name = "matchingStaleEventsMustRunImmediately"; final Consumer handler = mockHandler(); // provider which is already stale TestEventsProvider provider = new TestEventsProvider(INIT_DELAY); Client client = api.getClient(name); api.setProviderAndWait(name, provider); - provider.emitProviderStale(ProviderEventDetails.builder().build()); + provider.emitProviderStale(ProviderEventDetails.builder().build()).await(); assertThat(client.getProviderState()).isEqualTo(ProviderState.STALE); // should run even though handler was added after stale @@ -618,14 +618,14 @@ void matchingStaleEventsMustRunImmediately() { number = "5.3.3", text = "Handlers attached after the provider is already in the associated state, MUST run immediately.") void matchingErrorEventsMustRunImmediately() { - final String name = "matchingEventsMustRunImmediately"; + final String name = "matchingErrorEventsMustRunImmediately"; final Consumer handler = mockHandler(); // provider which is already in error TestEventsProvider provider = new TestEventsProvider(INIT_DELAY); Client client = api.getClient(name); api.setProviderAndWait(name, provider); - provider.emitProviderError(ProviderEventDetails.builder().build()); + provider.emitProviderError(ProviderEventDetails.builder().build()).await(); assertThat(client.getProviderState()).isEqualTo(ProviderState.ERROR); verify(handler, never()).accept(any()); diff --git a/src/test/java/dev/openfeature/sdk/providers/memory/InMemoryProviderTest.java b/src/test/java/dev/openfeature/sdk/providers/memory/InMemoryProviderTest.java index 4d2a8b287..970495940 100644 --- a/src/test/java/dev/openfeature/sdk/providers/memory/InMemoryProviderTest.java +++ b/src/test/java/dev/openfeature/sdk/providers/memory/InMemoryProviderTest.java @@ -3,9 +3,14 @@ import static dev.openfeature.sdk.Structure.mapToStructure; import static dev.openfeature.sdk.testutils.TestFlagsUtils.buildFlags; import static org.awaitility.Awaitility.await; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.argThat; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import com.google.common.collect.ImmutableMap; import dev.openfeature.sdk.Client; @@ -19,6 +24,7 @@ import dev.openfeature.sdk.exceptions.TypeMismatchError; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import lombok.SneakyThrows; import org.junit.jupiter.api.BeforeEach; @@ -34,10 +40,11 @@ class InMemoryProviderTest { @SneakyThrows @BeforeEach void beforeEach() { + final var configChangedEventCounter = new AtomicInteger(); Map> flags = buildFlags(); provider = spy(new InMemoryProvider(flags)); api = OpenFeatureAPITestUtil.createAPI(); - api.onProviderConfigurationChanged(eventDetails -> {}); + api.onProviderConfigurationChanged(eventDetails -> configChangedEventCounter.incrementAndGet()); api.setProviderAndWait(provider); client = api.getClient(); provider.updateFlags(flags); @@ -48,6 +55,11 @@ void beforeEach() { .variant("off", false) .defaultVariant("on") .build()); + + // wait for the two config changed events to be fired, otherwise they could mess with our tests + while (configChangedEventCounter.get() < 2) { + Thread.sleep(1); + } } @Test