Skip to content

feat: add means of awaiting event emission, fix flaky build #1463

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions src/main/java/dev/openfeature/sdk/Awaitable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package dev.openfeature.sdk;

/**
* A class to help with synchronization by allowing the optional awaiting of the associated action.
*/
public class Awaitable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this.


/**
* An already-completed Awaitable. Awaiting this will return immediately.
*/
public static final Awaitable FINISHED = new Awaitable(true);

private boolean isDone = false;

public Awaitable() {}

private Awaitable(boolean isDone) {
this.isDone = isDone;
}

/**
* Lets the calling thread wait until some other thread calls {@link Awaitable#wakeup()}. If
* {@link Awaitable#wakeup()} has been called before the current thread invokes this method, it will return
* immediately.
*/
@SuppressWarnings("java:S2142")
public synchronized void await() {
while (!isDone) {
try {
this.wait();
} catch (InterruptedException ignored) {
// ignored, do not propagate the interrupted state
}
}
}

/**
* Wakes up all threads that have called {@link Awaitable#await()} and lets them proceed.
*/
public synchronized void wakeup() {
isDone = true;
this.notifyAll();
}
}
47 changes: 32 additions & 15 deletions src/main/java/dev/openfeature/sdk/EventProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,32 @@ public void shutdown() {
* @param event The event type
* @param details The details of the event
*/
public void emit(ProviderEvent event, ProviderEventDetails details) {
if (eventProviderListener != null) {
eventProviderListener.onEmit(event, details);
}
public Awaitable emit(final ProviderEvent event, final ProviderEventDetails details) {
final var localEventProviderListener = this.eventProviderListener;
final var localOnEmit = this.onEmit;

final TriConsumer<EventProvider, ProviderEvent, ProviderEventDetails> localOnEmit = this.onEmit;
if (localOnEmit != null) {
emitterExecutor.submit(() -> localOnEmit.accept(this, event, details));
if (localEventProviderListener == null && localOnEmit == null) {
return Awaitable.FINISHED;
}

final var awaitable = new Awaitable();

// These calls need to be executed on a different thread to prevent deadlocks when the provider initialization
// relies on a ready event to be emitted
emitterExecutor.submit(() -> {
try (var ignored = OpenFeatureAPI.lock.readLockAutoCloseable()) {
if (localEventProviderListener != null) {
localEventProviderListener.onEmit(event, details);
}
if (localOnEmit != null) {
localOnEmit.accept(this, event, details);
}
} finally {
awaitable.wakeup();
}
});

return awaitable;
}

/**
Expand All @@ -93,8 +110,8 @@ public void emit(ProviderEvent event, ProviderEventDetails details) {
*
* @param details The details of the event
*/
public void emitProviderReady(ProviderEventDetails details) {
emit(ProviderEvent.PROVIDER_READY, details);
public Awaitable emitProviderReady(ProviderEventDetails details) {
Copy link
Member

@toddbaert toddbaert Jun 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a public class people can override to implement a provider that supports events. I like this API and I think it's handy to have.

I was trying to see if there was any way this could be considered breaking. I tried adding this at the top of the class:

private BiConsumer<ProviderEvent, ProviderEventDetails> doesThisCompile = this::emit;

And (surprisingly to me) before and after your change, this still compiles even though BIcConsumer returns void. Apparently, return values of method references are ignored when assigned to void functional interfaces!

EDIT:

Oh - I just realized, this could break implementations that override these methods... I think that would be relatively rare but it's possible. What do you think @aepfli @chrfwow ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this would be much of a problem. Providers can just return Awaitable.FINISHED if they do this synchronously, otherwise it's probably best to return something to get informed about the state update anyway.

return emit(ProviderEvent.PROVIDER_READY, details);
}

/**
Expand All @@ -104,8 +121,8 @@ public void emitProviderReady(ProviderEventDetails details) {
*
* @param details The details of the event
*/
public void emitProviderConfigurationChanged(ProviderEventDetails details) {
emit(ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, details);
public Awaitable emitProviderConfigurationChanged(ProviderEventDetails details) {
return emit(ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, details);
}

/**
Expand All @@ -114,8 +131,8 @@ public void emitProviderConfigurationChanged(ProviderEventDetails details) {
*
* @param details The details of the event
*/
public void emitProviderStale(ProviderEventDetails details) {
emit(ProviderEvent.PROVIDER_STALE, details);
public Awaitable emitProviderStale(ProviderEventDetails details) {
return emit(ProviderEvent.PROVIDER_STALE, details);
}

/**
Expand All @@ -124,7 +141,7 @@ public void emitProviderStale(ProviderEventDetails details) {
*
* @param details The details of the event
*/
public void emitProviderError(ProviderEventDetails details) {
emit(ProviderEvent.PROVIDER_ERROR, details);
public Awaitable emitProviderError(ProviderEventDetails details) {
return emit(ProviderEvent.PROVIDER_ERROR, details);
}
}
28 changes: 12 additions & 16 deletions src/main/java/dev/openfeature/sdk/EventSupport.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package dev.openfeature.sdk;

import java.util.ArrayList;
import java.util.List;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand All @@ -23,13 +23,10 @@ class EventSupport {

// we use a v4 uuid as a "placeholder" for anonymous clients, since
// ConcurrentHashMap doesn't support nulls
private static final String defaultClientUuid = UUID.randomUUID().toString();
private static final String DEFAULT_CLIENT_UUID = UUID.randomUUID().toString();
private final Map<String, HandlerStore> handlerStores = new ConcurrentHashMap<>();
private final HandlerStore globalHandlerStore = new HandlerStore();
private final ExecutorService taskExecutor = Executors.newCachedThreadPool(runnable -> {
final Thread thread = new Thread(runnable);
return thread;
});
private final ExecutorService taskExecutor = Executors.newCachedThreadPool();

/**
* Run all the event handlers associated with this domain.
Expand All @@ -40,11 +37,10 @@ class EventSupport {
* @param eventDetails the event details
*/
public void runClientHandlers(String domain, ProviderEvent event, EventDetails eventDetails) {
domain = Optional.ofNullable(domain).orElse(defaultClientUuid);
domain = Optional.ofNullable(domain).orElse(DEFAULT_CLIENT_UUID);

// run handlers if they exist
Optional.ofNullable(handlerStores.get(domain))
.filter(store -> Optional.of(store).isPresent())
.map(store -> store.handlerMap.get(event))
.ifPresent(handlers -> handlers.forEach(handler -> runHandler(handler, eventDetails)));
}
Expand All @@ -69,7 +65,7 @@ public void runGlobalHandlers(ProviderEvent event, EventDetails eventDetails) {
* @param handler the handler function to run
*/
public void addClientHandler(String domain, ProviderEvent event, Consumer<EventDetails> handler) {
final String name = Optional.ofNullable(domain).orElse(defaultClientUuid);
final String name = Optional.ofNullable(domain).orElse(DEFAULT_CLIENT_UUID);

// lazily create and cache a HandlerStore if it doesn't exist
HandlerStore store = Optional.ofNullable(this.handlerStores.get(name)).orElseGet(() -> {
Expand All @@ -89,7 +85,7 @@ public void addClientHandler(String domain, ProviderEvent event, Consumer<EventD
* @param handler the handler ref to be removed
*/
public void removeClientHandler(String domain, ProviderEvent event, Consumer<EventDetails> handler) {
domain = Optional.ofNullable(domain).orElse(defaultClientUuid);
domain = Optional.ofNullable(domain).orElse(DEFAULT_CLIENT_UUID);
this.handlerStores.get(domain).removeHandler(event, handler);
}

Expand Down Expand Up @@ -160,14 +156,14 @@ public void shutdown() {
// instantiated when a handler is added to that client.
static class HandlerStore {

private final Map<ProviderEvent, List<Consumer<EventDetails>>> handlerMap;
private final Map<ProviderEvent, Collection<Consumer<EventDetails>>> handlerMap;

HandlerStore() {
handlerMap = new ConcurrentHashMap<>();
handlerMap.put(ProviderEvent.PROVIDER_READY, new ArrayList<>());
handlerMap.put(ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, new ArrayList<>());
handlerMap.put(ProviderEvent.PROVIDER_ERROR, new ArrayList<>());
handlerMap.put(ProviderEvent.PROVIDER_STALE, new ArrayList<>());
handlerMap.put(ProviderEvent.PROVIDER_READY, new ConcurrentLinkedQueue<>());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea.

handlerMap.put(ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, new ConcurrentLinkedQueue<>());
handlerMap.put(ProviderEvent.PROVIDER_ERROR, new ConcurrentLinkedQueue<>());
handlerMap.put(ProviderEvent.PROVIDER_STALE, new ConcurrentLinkedQueue<>());
}

void addHandler(ProviderEvent event, Consumer<EventDetails> handler) {
Expand Down
75 changes: 75 additions & 0 deletions src/test/java/dev/openfeature/sdk/AwaitableTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package dev.openfeature.sdk;

import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

@Timeout(value = 5, threadMode = Timeout.ThreadMode.SEPARATE_THREAD)
class AwaitableTest {
@Test
void waitingForFinishedIsANoOp() {
var startTime = System.currentTimeMillis();
Awaitable.FINISHED.await();
var endTime = System.currentTimeMillis();
assertTrue(endTime - startTime < 10);
}

@Test
void waitingForNotFinishedWaitsEvenWhenInterrupted() throws InterruptedException {
var awaitable = new Awaitable();
var mayProceed = new AtomicBoolean(false);

var thread = new Thread(() -> {
awaitable.await();
if (!mayProceed.get()) {
fail();
}
});
thread.start();

var startTime = System.currentTimeMillis();
do {
thread.interrupt();
} while (startTime + 1000 > System.currentTimeMillis());
mayProceed.set(true);
awaitable.wakeup();
thread.join();
}

@Test
void callingWakeUpWakesUpAllWaitingThreads() throws InterruptedException {
var awaitable = new Awaitable();
var isRunning = new AtomicInteger();

Runnable runnable = () -> {
isRunning.incrementAndGet();
var start = System.currentTimeMillis();
awaitable.await();
var end = System.currentTimeMillis();
if (end - start > 10) {
fail();
}
};

var numThreads = 2;
var threads = new Thread[numThreads];
for (int i = 0; i < numThreads; i++) {
threads[i] = new Thread(runnable);
threads[i].start();
}

await().atMost(1, TimeUnit.SECONDS).until(() -> isRunning.get() == numThreads);

awaitable.wakeup();

for (int i = 0; i < numThreads; i++) {
threads[i].join();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ void shouldPutTheProviderInStateErrorAfterEmittingErrorEvent() {
api.setProviderAndWait(domain, provider);
Client client = api.getClient(domain);
assertThat(client.getProviderState()).isEqualTo(ProviderState.READY);
provider.emitProviderError(ProviderEventDetails.builder().build());
provider.emitProviderError(ProviderEventDetails.builder().build()).await();
assertThat(client.getProviderState()).isEqualTo(ProviderState.ERROR);
}

Expand All @@ -165,7 +165,7 @@ void shouldPutTheProviderInStateStaleAfterEmittingStaleEvent() {
api.setProviderAndWait(domain, provider);
Client client = api.getClient(domain);
assertThat(client.getProviderState()).isEqualTo(ProviderState.READY);
provider.emitProviderStale(ProviderEventDetails.builder().build());
provider.emitProviderStale(ProviderEventDetails.builder().build()).await();
assertThat(client.getProviderState()).isEqualTo(ProviderState.STALE);
}

Expand All @@ -180,9 +180,9 @@ void shouldPutTheProviderInStateReadyAfterEmittingReadyEvent() {
api.setProviderAndWait(domain, provider);
Client client = api.getClient(domain);
assertThat(client.getProviderState()).isEqualTo(ProviderState.READY);
provider.emitProviderStale(ProviderEventDetails.builder().build());
provider.emitProviderStale(ProviderEventDetails.builder().build()).await();
assertThat(client.getProviderState()).isEqualTo(ProviderState.STALE);
provider.emitProviderReady(ProviderEventDetails.builder().build());
provider.emitProviderReady(ProviderEventDetails.builder().build()).await();
assertThat(client.getProviderState()).isEqualTo(ProviderState.READY);
}
}
1 change: 1 addition & 0 deletions src/test/java/dev/openfeature/sdk/EventProviderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public static void resetDefaultProvider() {
}

@Test
@Timeout(value = 2, threadMode = Timeout.ThreadMode.SEPARATE_THREAD)
@DisplayName("should run attached onEmit with emitters")
void emitsEventsWhenAttached() {
TriConsumer<EventProvider, ProviderEvent, ProviderEventDetails> onEmit = mockOnEmit();
Expand Down
12 changes: 6 additions & 6 deletions src/test/java/dev/openfeature/sdk/EventsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class EventsTest {
private OpenFeatureAPI api;

@BeforeEach
public void setUp() throws Exception {
void setUp() {
api = new OpenFeatureAPI();
}

Expand Down Expand Up @@ -578,7 +578,7 @@ void shouldHaveAllProperties() {
number = "5.3.3",
text = "Handlers attached after the provider is already in the associated state, MUST run immediately.")
void matchingReadyEventsMustRunImmediately() {
final String name = "matchingEventsMustRunImmediately";
final String name = "matchingReadyEventsMustRunImmediately";
final Consumer<EventDetails> handler = mockHandler();

// provider which is already ready
Expand All @@ -597,14 +597,14 @@ void matchingReadyEventsMustRunImmediately() {
number = "5.3.3",
text = "Handlers attached after the provider is already in the associated state, MUST run immediately.")
void matchingStaleEventsMustRunImmediately() {
final String name = "matchingEventsMustRunImmediately";
final String name = "matchingStaleEventsMustRunImmediately";
final Consumer<EventDetails> handler = mockHandler();

// provider which is already stale
TestEventsProvider provider = new TestEventsProvider(INIT_DELAY);
Client client = api.getClient(name);
api.setProviderAndWait(name, provider);
provider.emitProviderStale(ProviderEventDetails.builder().build());
provider.emitProviderStale(ProviderEventDetails.builder().build()).await();
assertThat(client.getProviderState()).isEqualTo(ProviderState.STALE);

// should run even though handler was added after stale
Expand All @@ -618,14 +618,14 @@ void matchingStaleEventsMustRunImmediately() {
number = "5.3.3",
text = "Handlers attached after the provider is already in the associated state, MUST run immediately.")
void matchingErrorEventsMustRunImmediately() {
final String name = "matchingEventsMustRunImmediately";
final String name = "matchingErrorEventsMustRunImmediately";
final Consumer<EventDetails> handler = mockHandler();

// provider which is already in error
TestEventsProvider provider = new TestEventsProvider(INIT_DELAY);
Client client = api.getClient(name);
api.setProviderAndWait(name, provider);
provider.emitProviderError(ProviderEventDetails.builder().build());
provider.emitProviderError(ProviderEventDetails.builder().build()).await();
assertThat(client.getProviderState()).isEqualTo(ProviderState.ERROR);

verify(handler, never()).accept(any());
Expand Down
Loading
Loading