From 001bcb7c9bca2de9966c92d769fa7c3f66cae3ab Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 5 Mar 2024 09:51:25 -0700 Subject: [PATCH 1/9] simple PublisherCache --- .../concurrent/api/PublisherCache.java | 212 +++++++++++++++++ .../concurrent/api/PublisherCacheTest.java | 222 ++++++++++++++++++ 2 files changed, 434 insertions(+) create mode 100644 servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherCache.java create mode 100644 servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherCacheTest.java diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherCache.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherCache.java new file mode 100644 index 0000000000..e64d5252bc --- /dev/null +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherCache.java @@ -0,0 +1,212 @@ +/* + * Copyright © 2024 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.api; + +import io.servicetalk.concurrent.PublisherSource.Subscriber; +import io.servicetalk.concurrent.PublisherSource.Subscription; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; +import javax.annotation.Nullable; + +/** + * A cache of publishers, Keys must correctly implement Object::hashCode and Object::equals. + * Publishers can be created as either multicast or with a replay configuration. Subscriptions + * are tracked by the cache and Publishers are removed when there are no more subscriptions. + * + * @param a key type suitable for use as a Map key, that correctly implements hashCode/equals. + * @param the type of the publisher that is cached. + */ +public final class PublisherCache { + private final MulticastStrategy multicastStrategy; + private final Function> publisherSupplier; + private final Map> publisherCache; + + PublisherCache( + final Function> publisherSupplier, + final MulticastStrategy multicastStrategy) { + this.publisherSupplier = publisherSupplier; + this.multicastStrategy = multicastStrategy; + this.publisherCache = new HashMap<>(); + } + + /** + * Create a new PublisherCache where the cached publishers will be configured for multicast for all + * consumers of a specific key. The publisherSupplier will be invoked when the cache does not contain a + * publisher for the requested key. + * + * @param publisherSupplier a function that takes the key and returns a new publisher corresponding to that key. + * @return a new publisher from the publisherSupplier if not contained in the cache, otherwise the cached publisher. + * @param a key type suitable for use as a Map key. + * @param the type of the Publisher contained in the cache. + */ + public static PublisherCache multicast(Function> publisherSupplier) { + return new PublisherCache<>(publisherSupplier, MulticastStrategy.wrapMulticast()); + } + + /** + * Create a new PublisherCache where the cacehed publishers will be configured for replay given the privided + * ReplayStrategy. The publisherSupplier will be invoked when the cache does not contain a publisher for + * the requested key. + * + * @param publisherSupplier a function that takes the key and returns a new publisher corresponding to that key. + * @param replayStrategy a replay strategy to be used by a newly cached publisher. + * @return a new publisher from the publisherSupplier if not contained in the cache, otherwise the cached publisher. + * @param a key type suitable for use as a Map key. + * @param the type of the Publisher contained in the cache. + */ + public static PublisherCache replay( + Function> publisherSupplier, + ReplayStrategy replayStrategy) { + return new PublisherCache<>(publisherSupplier, MulticastStrategy.wrapReplay(replayStrategy)); + } + + public Publisher get(K key) { + return Publisher.defer(() -> { + synchronized (publisherCache) { + if (publisherCache.containsKey(key)) { + return publisherCache.get(key).publisher; + } + + final Holder item2 = new Holder<>(); + publisherCache.put(key, item2); + + final Publisher newPublisher = publisherSupplier.apply(key) + .liftSync(subscriber -> new Subscriber() { + @Override + public void onSubscribe(Subscription subscription) { + subscriber.onSubscribe(new Subscription() { + @Override + public void request(long n) { + subscription.request(n); + } + + @Override + public void cancel() { + try { + assert Thread.holdsLock(publisherCache); + publisherCache.remove(key, item2); + } finally { + subscription.cancel(); + } + } + }); + } + + @Override + public void onNext(@Nullable T next) { + subscriber.onNext(next); + } + + @Override + public void onError(Throwable t) { + lockRemoveFromMap(); + subscriber.onError(t); + } + + @Override + public void onComplete() { + lockRemoveFromMap(); + subscriber.onComplete(); + } + + private void lockRemoveFromMap() { + synchronized (publisherCache) { + publisherCache.remove(key, item2); + } + } + }); + + item2.publisher = multicastStrategy.apply(newPublisher) + .liftSync(subscriber -> new Subscriber() { + @Override + public void onSubscribe(Subscription subscription) { + subscriber.onSubscribe(new Subscription() { + @Override + public void request(long n) { + subscription.request(n); + } + + @Override + public void cancel() { + synchronized (publisherCache) { + subscription.cancel(); + } + } + }); + } + + @Override + public void onNext(@Nullable T next) { + subscriber.onNext(next); + } + + @Override + public void onError(Throwable t) { + try { + subscriber.onError(t); + } finally { + lockRemoveFromMap(); + } + } + + @Override + public void onComplete() { + try { + subscriber.onComplete(); + } finally { + lockRemoveFromMap(); + } + } + + private void lockRemoveFromMap() { + synchronized (publisherCache) { + publisherCache.remove(key, item2); + } + } + }); + return item2.publisher; + } + }); + } + + private static final class Holder { + @Nullable + Publisher publisher; + } + + @FunctionalInterface + private interface MulticastStrategy { + Publisher apply(Publisher cached); + + static MulticastStrategy wrapMulticast() { + return cached -> cached.multicast(1, true); + } + + static MulticastStrategy wrapMulticast(int minSubscribers) { + return cached -> cached.multicast(minSubscribers, true); + } + + static MulticastStrategy wrapMulticast(int minSubscribers, int queueLimit) { + return cached -> cached.multicast(minSubscribers, queueLimit); + } + + static MulticastStrategy wrapReplay(final ReplayStrategy strategy) { + return cached -> cached.replay(strategy); + } + } +} diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherCacheTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherCacheTest.java new file mode 100644 index 0000000000..f233629364 --- /dev/null +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherCacheTest.java @@ -0,0 +1,222 @@ +/* + * Copyright © 2024 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.api; + +import io.servicetalk.concurrent.PublisherSource; +import io.servicetalk.concurrent.test.internal.TestPublisherSubscriber; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + + +class PublisherCacheTest { + private TestPublisher testPublisher; + private PublisherCache publisherCache; + private AtomicInteger upstreamSubscriptionCount; + private AtomicBoolean isUpstreamUnsubsrcibed; + private AtomicInteger upstreamCacheRequestCount; + + @BeforeEach + public void setup() { + this.testPublisher = new TestPublisher<>(); + this.upstreamSubscriptionCount = new AtomicInteger(0); + this.isUpstreamUnsubsrcibed = new AtomicBoolean(false); + this.upstreamCacheRequestCount = new AtomicInteger(0); + this.publisherCache = PublisherCache.multicast((_ignore) -> { + upstreamCacheRequestCount.incrementAndGet(); + return testPublisher.afterOnSubscribe(subscription -> + upstreamSubscriptionCount.incrementAndGet() + ).afterFinally(() -> isUpstreamUnsubsrcibed.set(true)); + }); + } + + @Test + public void testMultipleSubscribersReceiveCachedResults() { + final TestPublisherSubscriber subscriber1 = new TestPublisherSubscriber<>(); + SourceAdapters.toSource(publisherCache.get("foo")).subscribe(subscriber1); + final PublisherSource.Subscription subscription1 = subscriber1.awaitSubscription(); + + // the first subscriber receives the initial event + subscription1.request(1); + testPublisher.onNext(1); + assertThat(subscriber1.takeOnNext(), is(1)); + + // the second subscriber receives the cached event + final TestPublisherSubscriber subscriber2 = new TestPublisherSubscriber<>(); + SourceAdapters.toSource(publisherCache.get("bar")).subscribe(subscriber2); + final PublisherSource.Subscription subscription2 = subscriber2.awaitSubscription(); + + subscription2.request(1); + assertThat(subscriber2.takeOnNext(), is(1)); + + // subscribe with the first request + assertThat(upstreamSubscriptionCount.get(), is(1)); + + // all subscribers receive all subsequent events + subscription1.request(1); + subscription2.request(1); + testPublisher.onNext(2); + + assertThat(subscriber1.takeOnNext(), is(2)); + assertThat(subscriber2.takeOnNext(), is(2)); + + // make sure we still only have subscribed once + assertThat(upstreamSubscriptionCount.get(), is(1)); + assertThat(upstreamCacheRequestCount.get(), is(1)); + } + + // @Test + // public void testDiscovererAccumulatesEvents() { + // final var subscriber1 = new TestPublisherSubscriber>(); + // SourceAdapters.toSource(discoveryService.discover(SERVICE)).subscribe(subscriber1); + // final PublisherSource.Subscription subscription1 = subscriber1.awaitSubscription(); + // + // // the first subscriber receives the initial event + // subscription1.request(4); + // + // currentEventPublisher.get().onNext(List.of(new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT1, Map.of()))); + // currentEventPublisher.get().onNext(List.of(new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT2, Map.of()))); + // currentEventPublisher.get().onNext(List.of(new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT3, Map.of()))); + // currentEventPublisher.get().onNext(List.of(new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT4, Map.of()))); + // + // // Subscriber 1 receives individual events + // assertThat(subscriber1.takeOnNext(), contains(new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT1, Map.of()))); + // assertThat(subscriber1.takeOnNext(), contains(new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT2, Map.of()))); + // assertThat(subscriber1.takeOnNext(), contains(new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT3, Map.of()))); + // assertThat(subscriber1.takeOnNext(), contains(new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT4, Map.of()))); + // + // final var subscriber2 = new TestPublisherSubscriber>(); + // SourceAdapters.toSource(discoveryService.discover(SERVICE)).subscribe(subscriber2); + // final PublisherSource.Subscription subscription2 = subscriber2.awaitSubscription(); + // + // // Subscriber 2 receives a batch of accumulated events + // subscription2.request(1); + // assertThat(subscriber2.takeOnNext(), containsInAnyOrder( + // new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT1, Map.of()), + // new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT2, Map.of()), + // new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT3, Map.of()), + // new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT4, Map.of()) + // )); + // + // assertThat(upstreamCacheRequestCount.get(), is(1)); + // } + // + // @Test + // public void testDiscovererAccumulatesRemoveEvents() { + // final var subscriber1 = new TestPublisherSubscriber>(); + // SourceAdapters.toSource(discoveryService.discover(SERVICE)).subscribe(subscriber1); + // final PublisherSource.Subscription subscription1 = subscriber1.awaitSubscription(); + // + // // the first subscriber receives the initial event + // subscription1.request(4); + // + // currentEventPublisher.get().onNext(List.of(new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT1, Map.of()))); + // currentEventPublisher.get().onNext(List.of(new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT2, Map.of()))); + // currentEventPublisher.get().onNext(List.of(new RemoveEvent(SERVICE, ENDPOINT1))); + // + // // Subscriber 1 receives individual events + // assertThat(subscriber1.takeOnNext(), contains(new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT1, Map.of()))); + // assertThat(subscriber1.takeOnNext(), contains(new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT2, Map.of()))); + // assertThat(subscriber1.takeOnNext(), contains(new RemoveEvent(SERVICE, ENDPOINT1))); + // + // final var subscriber2 = new TestPublisherSubscriber>(); + // SourceAdapters.toSource(discoveryService.discover(SERVICE)).subscribe(subscriber2); + // final PublisherSource.Subscription subscription2 = subscriber2.awaitSubscription(); + // + // // Subscriber 2 receives a batch of accumulated events + // subscription2.request(1); + // assertThat(subscriber2.takeOnNext(), containsInAnyOrder( + // new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT2, Map.of()) + // )); + // + // assertThat(upstreamCacheRequestCount.get(), is(1)); + // } + // + // @Test + // public void testDiscovererConvertsModifyEventsToAddEvents() { + // final var subscriber1 = new TestPublisherSubscriber>(); + // SourceAdapters.toSource(discoveryService.discover(SERVICE)).subscribe(subscriber1); + // final PublisherSource.Subscription subscription1 = subscriber1.awaitSubscription(); + // + // // the first subscriber receives the initial event + // subscription1.request(4); + // + // final Map initialAttributes = Map.of(); + // final Map updateAttributes = Map.of("attr1", new LongAttribute(1)); + // + // currentEventPublisher.get().onNext(List.of(new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT1, initialAttributes))); + // currentEventPublisher.get().onNext(List.of(new UpdateEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT1, updateAttributes))); + // + // // Subscriber 1 receives individual events + // assertThat(subscriber1.takeOnNext(), contains(new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT1, initialAttributes))); + // assertThat(subscriber1.takeOnNext(), contains(new UpdateEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT1, updateAttributes))); + // + // final var subscriber2 = new TestPublisherSubscriber>(); + // SourceAdapters.toSource(discoveryService.discover(SERVICE)).subscribe(subscriber2); + // final PublisherSource.Subscription subscription2 = subscriber2.awaitSubscription(); + // + // // Subscriber 2 receives a batch of accumulated events + // subscription2.request(1); + // assertThat(subscriber2.takeOnNext(), containsInAnyOrder( + // new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT1, updateAttributes) + // )); + // + // assertThat(upstreamCacheRequestCount.get(), is(1)); + // } + // + // @Test + // public void testUnSubscribeUpstreamAndInvalidateCacheWhenEmpty() { + // final var subscriber1 = new TestPublisherSubscriber>(); + // SourceAdapters.toSource(discoveryService.discover(SERVICE)).subscribe(subscriber1); + // final PublisherSource.Subscription subscription1 = subscriber1.awaitSubscription(); + // + // assertThat(upstreamSubscriptionCount.get(), is(1)); + // + // final var subscriber2 = new TestPublisherSubscriber>(); + // SourceAdapters.toSource(discoveryService.discover(SERVICE)).subscribe(subscriber2); + // final PublisherSource.Subscription subscription2 = subscriber2.awaitSubscription(); + // + // assertThat(upstreamSubscriptionCount.get(), is(1)); + // + // subscription1.cancel(); + // assertThat(isUpstreamUnsubsrcibed.get(), is(false)); + // + // subscription2.cancel(); + // assertThat(isUpstreamUnsubsrcibed.get(), is(true)); + // + // assertThat(upstreamCacheRequestCount.get(), is(1)); + // } + // + // @Test + // public void testErrorFromUpstreamInvalidatesCacheEntryAndRequestsANewStream() { + // final var subscriber1 = new TestPublisherSubscriber>(); + // // use a stupid "always" retry policy to force the situation + // final var discovery = discoveryService.discover(SERVICE).retry((i, cause) -> true); + // SourceAdapters.toSource(discovery).subscribe(subscriber1); + // final PublisherSource.Subscription subscription1 = subscriber1.awaitSubscription(); + // + // subscription1.request(2); + // currentEventPublisher.get().onError(new Exception("bad stuff happened")); + // + // assertThat(upstreamCacheRequestCount.get(), is(2)); + // } +} \ No newline at end of file From df8a0002e993c921f1c22c8c6d0940acbe383157 Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 5 Mar 2024 13:27:58 -0700 Subject: [PATCH 2/9] Update docs and add tests --- .../concurrent/api/PublisherCache.java | 82 ++++- .../concurrent/api/PublisherCacheTest.java | 301 +++++++++--------- 2 files changed, 218 insertions(+), 165 deletions(-) diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherCache.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherCache.java index e64d5252bc..f1ca69f5f0 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherCache.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherCache.java @@ -36,6 +36,14 @@ public final class PublisherCache { private final Function> publisherSupplier; private final Map> publisherCache; + /** + * Create a new PublisherCache using a factory function and a multicast strategy to use after {@link Publisher} + * creation. New publishers must be wrapped in a multicast or replay strategy in order to be shared. + * VisibleForTesting + * + * @param publisherSupplier a function that takes the key and returns a new {@link Publisher} for that key. + * @param multicastStrategy a strategy used for wrapping new cache values. + */ PublisherCache( final Function> publisherSupplier, final MulticastStrategy multicastStrategy) { @@ -49,25 +57,61 @@ public final class PublisherCache { * consumers of a specific key. The publisherSupplier will be invoked when the cache does not contain a * publisher for the requested key. * - * @param publisherSupplier a function that takes the key and returns a new publisher corresponding to that key. - * @return a new publisher from the publisherSupplier if not contained in the cache, otherwise the cached publisher. - * @param a key type suitable for use as a Map key. - * @param the type of the Publisher contained in the cache. + * @param publisherSupplier a function that takes the key and returns a new {@link Publisher} for that key. + * @param a key type suitable for use as a {@link Map} key. + * @param the type of the {@link Publisher} contained in the cache. + * @return a new PublisherCache that will use the publisherSupplier to create new entries upon request. */ - public static PublisherCache multicast(Function> publisherSupplier) { + public static PublisherCache multicast(final Function> publisherSupplier) { return new PublisherCache<>(publisherSupplier, MulticastStrategy.wrapMulticast()); } + /** + * Create a new PublisherCache where the cached publishers will be configured for multicast for all + * consumers of a specific key. The publisherSupplier will be invoked when the cache does not contain a + * publisher for the requested key. + * + * @param publisherSupplier a function that takes the key and returns a new {@link Publisher} for that key. + * @param minSubscribers The upstream subscribe operation will not happen until after this many {@link Subscriber} + * subscribe to the return value. + * @param a key type suitable for use as a {@link Map} key. + * @param the type of the {@link Publisher} contained in the cache. + * @return a new PublisherCache that will use the publisherSupplier to create new entries upon request. + */ + public static PublisherCache multicast( + final Function> publisherSupplier, final int minSubscribers) { + return new PublisherCache<>(publisherSupplier, MulticastStrategy.wrapMulticast(minSubscribers)); + } + + /** + * Create a new PublisherCache where the cached publishers will be configured for multicast for all + * consumers of a specific key. The publisherSupplier will be invoked when the cache does not contain a + * publisher for the requested key. + * + * @param publisherSupplier a function that takes the key and returns a new {@link Publisher} for that key. + * @param minSubscribers The upstream subscribe operation will not happen until after this many {@link Subscriber} + * subscribe to the return value. + * @param queueLimit The number of elements which will be queued for each {@link Subscriber} in order to compensate + * for unequal demand. + * @param a key type suitable for use as a {@link Map} key. + * @param the type of the {@link Publisher} contained in the cache. + * @return a new PublisherCache that will use the publisherSupplier to create new entries upon request. + */ + public static PublisherCache multicast( + final Function> publisherSupplier, final int minSubscribers, final int queueLimit) { + return new PublisherCache<>(publisherSupplier, MulticastStrategy.wrapMulticast(minSubscribers, queueLimit)); + } + /** * Create a new PublisherCache where the cacehed publishers will be configured for replay given the privided * ReplayStrategy. The publisherSupplier will be invoked when the cache does not contain a publisher for * the requested key. * - * @param publisherSupplier a function that takes the key and returns a new publisher corresponding to that key. - * @param replayStrategy a replay strategy to be used by a newly cached publisher. - * @return a new publisher from the publisherSupplier if not contained in the cache, otherwise the cached publisher. - * @param a key type suitable for use as a Map key. - * @param the type of the Publisher contained in the cache. + * @param publisherSupplier a function that takes the key and returns a new {@link Publisher} for that key. + * @param replayStrategy a {@link ReplayStrategy} that determines the replay behavior and history retention logic. + * @param a key type suitable for use as a {@link Map} key. + * @param the type of the {@link Publisher} contained in the cache. + * @return a new PublisherCache that will use the publisherSupplier to create new entries upon request. */ public static PublisherCache replay( Function> publisherSupplier, @@ -75,6 +119,13 @@ public static PublisherCache replay( return new PublisherCache<>(publisherSupplier, MulticastStrategy.wrapReplay(replayStrategy)); } + /** + * Retrieve a the value for the given key, if no value exists in the cache it will be synchronously created. + * + * @param key a key corresponding to the requested {@link Publisher}. + * @return a new {@link Publisher} from the publisherSupplier if not contained in the cache, otherwise + * the cached publisher. + */ public Publisher get(K key) { return Publisher.defer(() -> { synchronized (publisherCache) { @@ -189,10 +240,19 @@ private static final class Holder { Publisher publisher; } + /** + * A series of strategies used to make cached {@link Publisher}s "shareable". + * + * @param the type of the {@link Publisher}. + */ @FunctionalInterface - private interface MulticastStrategy { + interface MulticastStrategy { Publisher apply(Publisher cached); + static MulticastStrategy identity() { + return cached -> cached; + } + static MulticastStrategy wrapMulticast() { return cached -> cached.multicast(1, true); } diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherCacheTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherCacheTest.java index f233629364..62dcda11e0 100644 --- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherCacheTest.java +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherCacheTest.java @@ -15,22 +15,25 @@ */ package io.servicetalk.concurrent.api; -import io.servicetalk.concurrent.PublisherSource; +import io.servicetalk.concurrent.PublisherSource.Subscription; +import io.servicetalk.concurrent.api.PublisherCache.MulticastStrategy; import io.servicetalk.concurrent.test.internal.TestPublisherSubscriber; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.util.Arrays; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import static io.servicetalk.concurrent.api.PublisherCache.MulticastStrategy.wrapMulticast; +import static io.servicetalk.concurrent.api.PublisherCache.MulticastStrategy.wrapReplay; +import static io.servicetalk.concurrent.api.SourceAdapters.toSource; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; - class PublisherCacheTest { private TestPublisher testPublisher; - private PublisherCache publisherCache; private AtomicInteger upstreamSubscriptionCount; private AtomicBoolean isUpstreamUnsubsrcibed; private AtomicInteger upstreamCacheRequestCount; @@ -41,182 +44,172 @@ public void setup() { this.upstreamSubscriptionCount = new AtomicInteger(0); this.isUpstreamUnsubsrcibed = new AtomicBoolean(false); this.upstreamCacheRequestCount = new AtomicInteger(0); - this.publisherCache = PublisherCache.multicast((_ignore) -> { + } + + private PublisherCache publisherCache(final MulticastStrategy strategy) { + return new PublisherCache<>((_ignore) -> { upstreamCacheRequestCount.incrementAndGet(); return testPublisher.afterOnSubscribe(subscription -> upstreamSubscriptionCount.incrementAndGet() ).afterFinally(() -> isUpstreamUnsubsrcibed.set(true)); - }); + }, strategy); } @Test - public void testMultipleSubscribersReceiveCachedResults() { + public void multipleSubscribersToSameKeyReceiveMulticastEvents() { + final PublisherCache publisherCache = publisherCache(wrapMulticast()); + final TestPublisherSubscriber subscriber1 = new TestPublisherSubscriber<>(); - SourceAdapters.toSource(publisherCache.get("foo")).subscribe(subscriber1); - final PublisherSource.Subscription subscription1 = subscriber1.awaitSubscription(); + toSource(publisherCache.get("foo")).subscribe(subscriber1); + final Subscription subscription1 = subscriber1.awaitSubscription(); // the first subscriber receives the initial event subscription1.request(1); testPublisher.onNext(1); assertThat(subscriber1.takeOnNext(), is(1)); - // the second subscriber receives the cached event + // the second subscriber receives the cached publisher final TestPublisherSubscriber subscriber2 = new TestPublisherSubscriber<>(); - SourceAdapters.toSource(publisherCache.get("bar")).subscribe(subscriber2); - final PublisherSource.Subscription subscription2 = subscriber2.awaitSubscription(); - - subscription2.request(1); - assertThat(subscriber2.takeOnNext(), is(1)); - - // subscribe with the first request - assertThat(upstreamSubscriptionCount.get(), is(1)); + toSource(publisherCache.get("foo")).subscribe(subscriber2); + final Subscription subscription2 = subscriber2.awaitSubscription(); // all subscribers receive all subsequent events subscription1.request(1); subscription2.request(1); testPublisher.onNext(2); - assertThat(subscriber1.takeOnNext(), is(2)); assertThat(subscriber2.takeOnNext(), is(2)); + // subscribe with the first request + assertThat(upstreamSubscriptionCount.get(), is(1)); + // make sure we still only have subscribed once assertThat(upstreamSubscriptionCount.get(), is(1)); assertThat(upstreamCacheRequestCount.get(), is(1)); } - // @Test - // public void testDiscovererAccumulatesEvents() { - // final var subscriber1 = new TestPublisherSubscriber>(); - // SourceAdapters.toSource(discoveryService.discover(SERVICE)).subscribe(subscriber1); - // final PublisherSource.Subscription subscription1 = subscriber1.awaitSubscription(); - // - // // the first subscriber receives the initial event - // subscription1.request(4); - // - // currentEventPublisher.get().onNext(List.of(new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT1, Map.of()))); - // currentEventPublisher.get().onNext(List.of(new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT2, Map.of()))); - // currentEventPublisher.get().onNext(List.of(new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT3, Map.of()))); - // currentEventPublisher.get().onNext(List.of(new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT4, Map.of()))); - // - // // Subscriber 1 receives individual events - // assertThat(subscriber1.takeOnNext(), contains(new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT1, Map.of()))); - // assertThat(subscriber1.takeOnNext(), contains(new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT2, Map.of()))); - // assertThat(subscriber1.takeOnNext(), contains(new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT3, Map.of()))); - // assertThat(subscriber1.takeOnNext(), contains(new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT4, Map.of()))); - // - // final var subscriber2 = new TestPublisherSubscriber>(); - // SourceAdapters.toSource(discoveryService.discover(SERVICE)).subscribe(subscriber2); - // final PublisherSource.Subscription subscription2 = subscriber2.awaitSubscription(); - // - // // Subscriber 2 receives a batch of accumulated events - // subscription2.request(1); - // assertThat(subscriber2.takeOnNext(), containsInAnyOrder( - // new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT1, Map.of()), - // new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT2, Map.of()), - // new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT3, Map.of()), - // new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT4, Map.of()) - // )); - // - // assertThat(upstreamCacheRequestCount.get(), is(1)); - // } - // - // @Test - // public void testDiscovererAccumulatesRemoveEvents() { - // final var subscriber1 = new TestPublisherSubscriber>(); - // SourceAdapters.toSource(discoveryService.discover(SERVICE)).subscribe(subscriber1); - // final PublisherSource.Subscription subscription1 = subscriber1.awaitSubscription(); - // - // // the first subscriber receives the initial event - // subscription1.request(4); - // - // currentEventPublisher.get().onNext(List.of(new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT1, Map.of()))); - // currentEventPublisher.get().onNext(List.of(new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT2, Map.of()))); - // currentEventPublisher.get().onNext(List.of(new RemoveEvent(SERVICE, ENDPOINT1))); - // - // // Subscriber 1 receives individual events - // assertThat(subscriber1.takeOnNext(), contains(new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT1, Map.of()))); - // assertThat(subscriber1.takeOnNext(), contains(new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT2, Map.of()))); - // assertThat(subscriber1.takeOnNext(), contains(new RemoveEvent(SERVICE, ENDPOINT1))); - // - // final var subscriber2 = new TestPublisherSubscriber>(); - // SourceAdapters.toSource(discoveryService.discover(SERVICE)).subscribe(subscriber2); - // final PublisherSource.Subscription subscription2 = subscriber2.awaitSubscription(); - // - // // Subscriber 2 receives a batch of accumulated events - // subscription2.request(1); - // assertThat(subscriber2.takeOnNext(), containsInAnyOrder( - // new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT2, Map.of()) - // )); - // - // assertThat(upstreamCacheRequestCount.get(), is(1)); - // } - // - // @Test - // public void testDiscovererConvertsModifyEventsToAddEvents() { - // final var subscriber1 = new TestPublisherSubscriber>(); - // SourceAdapters.toSource(discoveryService.discover(SERVICE)).subscribe(subscriber1); - // final PublisherSource.Subscription subscription1 = subscriber1.awaitSubscription(); - // - // // the first subscriber receives the initial event - // subscription1.request(4); - // - // final Map initialAttributes = Map.of(); - // final Map updateAttributes = Map.of("attr1", new LongAttribute(1)); - // - // currentEventPublisher.get().onNext(List.of(new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT1, initialAttributes))); - // currentEventPublisher.get().onNext(List.of(new UpdateEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT1, updateAttributes))); - // - // // Subscriber 1 receives individual events - // assertThat(subscriber1.takeOnNext(), contains(new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT1, initialAttributes))); - // assertThat(subscriber1.takeOnNext(), contains(new UpdateEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT1, updateAttributes))); - // - // final var subscriber2 = new TestPublisherSubscriber>(); - // SourceAdapters.toSource(discoveryService.discover(SERVICE)).subscribe(subscriber2); - // final PublisherSource.Subscription subscription2 = subscriber2.awaitSubscription(); - // - // // Subscriber 2 receives a batch of accumulated events - // subscription2.request(1); - // assertThat(subscriber2.takeOnNext(), containsInAnyOrder( - // new AddEvent(SERVICE, new KubernetesEndpointEvent.Condition(true), ENDPOINT1, updateAttributes) - // )); - // - // assertThat(upstreamCacheRequestCount.get(), is(1)); - // } - // - // @Test - // public void testUnSubscribeUpstreamAndInvalidateCacheWhenEmpty() { - // final var subscriber1 = new TestPublisherSubscriber>(); - // SourceAdapters.toSource(discoveryService.discover(SERVICE)).subscribe(subscriber1); - // final PublisherSource.Subscription subscription1 = subscriber1.awaitSubscription(); - // - // assertThat(upstreamSubscriptionCount.get(), is(1)); - // - // final var subscriber2 = new TestPublisherSubscriber>(); - // SourceAdapters.toSource(discoveryService.discover(SERVICE)).subscribe(subscriber2); - // final PublisherSource.Subscription subscription2 = subscriber2.awaitSubscription(); - // - // assertThat(upstreamSubscriptionCount.get(), is(1)); - // - // subscription1.cancel(); - // assertThat(isUpstreamUnsubsrcibed.get(), is(false)); - // - // subscription2.cancel(); - // assertThat(isUpstreamUnsubsrcibed.get(), is(true)); - // - // assertThat(upstreamCacheRequestCount.get(), is(1)); - // } - // - // @Test - // public void testErrorFromUpstreamInvalidatesCacheEntryAndRequestsANewStream() { - // final var subscriber1 = new TestPublisherSubscriber>(); - // // use a stupid "always" retry policy to force the situation - // final var discovery = discoveryService.discover(SERVICE).retry((i, cause) -> true); - // SourceAdapters.toSource(discovery).subscribe(subscriber1); - // final PublisherSource.Subscription subscription1 = subscriber1.awaitSubscription(); - // - // subscription1.request(2); - // currentEventPublisher.get().onError(new Exception("bad stuff happened")); - // - // assertThat(upstreamCacheRequestCount.get(), is(2)); - // } -} \ No newline at end of file + @Test + public void minSubscribersMulticastPolicySubscribesUpstreamOnce() { + final PublisherCache publisherCache = publisherCache(wrapMulticast()); + + final TestPublisherSubscriber subscriber1 = new TestPublisherSubscriber<>(); + toSource(publisherCache.get("foo")).subscribe(subscriber1); + final Subscription subscription1 = subscriber1.awaitSubscription(); + + assertThat(upstreamSubscriptionCount.get(), is(1)); + + final TestPublisherSubscriber subscriber2 = new TestPublisherSubscriber<>(); + toSource(publisherCache.get("foo")).subscribe(subscriber2); + final Subscription subscription2 = subscriber2.awaitSubscription(); + + assertThat(upstreamSubscriptionCount.get(), is(1)); + + subscription1.cancel(); + assertThat(isUpstreamUnsubsrcibed.get(), is(false)); + + subscription2.cancel(); + assertThat(isUpstreamUnsubsrcibed.get(), is(true)); + + assertThat(upstreamCacheRequestCount.get(), is(1)); + } + + @Test + public void unSubscribeUpstreamAndInvalidateCacheWhenEmpty() { + final PublisherCache publisherCache = publisherCache(wrapMulticast()); + + final TestPublisherSubscriber subscriber1 = new TestPublisherSubscriber<>(); + toSource(publisherCache.get("foo")).subscribe(subscriber1); + final Subscription subscription1 = subscriber1.awaitSubscription(); + + assertThat(upstreamSubscriptionCount.get(), is(1)); + + final TestPublisherSubscriber subscriber2 = new TestPublisherSubscriber<>(); + toSource(publisherCache.get("foo")).subscribe(subscriber2); + final Subscription subscription2 = subscriber2.awaitSubscription(); + + assertThat(upstreamSubscriptionCount.get(), is(1)); + + subscription1.cancel(); + assertThat(isUpstreamUnsubsrcibed.get(), is(false)); + + subscription2.cancel(); + assertThat(isUpstreamUnsubsrcibed.get(), is(true)); + + assertThat(upstreamCacheRequestCount.get(), is(1)); + } + + @Test + public void cacheSubscriptionAndUnsubscriptionWithMulticastMinSubscribers() { + final PublisherCache publisherCache = publisherCache(wrapMulticast(2)); + + final TestPublisherSubscriber subscriber1 = new TestPublisherSubscriber<>(); + toSource(publisherCache.get("foo")).subscribe(subscriber1); + final Subscription subscription1 = subscriber1.awaitSubscription(); + + assertThat(upstreamSubscriptionCount.get(), is(0)); + + final TestPublisherSubscriber subscriber2 = new TestPublisherSubscriber<>(); + toSource(publisherCache.get("foo")).subscribe(subscriber2); + final Subscription subscription2 = subscriber2.awaitSubscription(); + + assertThat(upstreamSubscriptionCount.get(), is(1)); + + subscription1.cancel(); + assertThat(isUpstreamUnsubsrcibed.get(), is(false)); + + subscription2.cancel(); + assertThat(isUpstreamUnsubsrcibed.get(), is(true)); + + assertThat(upstreamCacheRequestCount.get(), is(1)); + } + + @Test + public void cacheSubscriptionAndUnsubscriptionWithReplay() { + final PublisherCache publisherCache = publisherCache( + wrapReplay(ReplayStrategies.historyBuilder(1) + .cancelUpstream(true) + .minSubscribers(1) + .build())); + + final TestPublisherSubscriber subscriber1 = new TestPublisherSubscriber<>(); + toSource(publisherCache.get("foo")).subscribe(subscriber1); + final Subscription subscription1 = subscriber1.awaitSubscription(); + + subscription1.request(3); + testPublisher.onNext(1, 2, 3); + assertThat(subscriber1.takeOnNext(3), is(Arrays.asList(1, 2, 3))); + + final TestPublisherSubscriber subscriber2 = new TestPublisherSubscriber<>(); + toSource(publisherCache.get("foo")).subscribe(subscriber2); + final Subscription subscription2 = subscriber2.awaitSubscription(); + + assertThat(upstreamSubscriptionCount.get(), is(1)); + + subscription2.request(1); + assertThat(subscriber2.takeOnNext(), is(3)); + + subscription1.cancel(); + assertThat(isUpstreamUnsubsrcibed.get(), is(false)); + + subscription2.cancel(); + assertThat(isUpstreamUnsubsrcibed.get(), is(true)); + + assertThat(upstreamCacheRequestCount.get(), is(1)); + } + + @Test + public void testErrorFromUpstreamInvalidatesCacheEntryAndRequestsANewStream() { + final PublisherCache publisherCache = publisherCache(wrapMulticast()); + + final TestPublisherSubscriber subscriber1 = new TestPublisherSubscriber<>(); + // use an "always" retry policy to force the situation + final Publisher discovery = publisherCache.get("foo").retry((i, cause) -> true); + toSource(discovery).subscribe(subscriber1); + final Subscription subscription1 = subscriber1.awaitSubscription(); + + subscription1.request(2); + testPublisher.onError(new Exception("bad stuff happened")); + + assertThat(upstreamCacheRequestCount.get(), is(2)); + } +} From e6ce56241a86d894336678e0299b2e2bd3f0d68f Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 5 Mar 2024 13:49:16 -0700 Subject: [PATCH 3/9] Move the loading function from the constructor to get --- .../concurrent/api/PublisherCache.java | 60 +++++++------------ .../concurrent/api/PublisherCacheTest.java | 50 +++++++--------- 2 files changed, 46 insertions(+), 64 deletions(-) diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherCache.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherCache.java index f1ca69f5f0..9454bca03d 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherCache.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherCache.java @@ -24,8 +24,8 @@ import javax.annotation.Nullable; /** - * A cache of publishers, Keys must correctly implement Object::hashCode and Object::equals. - * Publishers can be created as either multicast or with a replay configuration. Subscriptions + * A cache of {@link Publisher}s. Keys must correctly implement Object::hashCode and Object::equals. + * The cache can be created with either multicast or replay configurations. Subscriptions * are tracked by the cache and Publishers are removed when there are no more subscriptions. * * @param a key type suitable for use as a Map key, that correctly implements hashCode/equals. @@ -33,62 +33,53 @@ */ public final class PublisherCache { private final MulticastStrategy multicastStrategy; - private final Function> publisherSupplier; private final Map> publisherCache; /** - * Create a new PublisherCache using a factory function and a multicast strategy to use after {@link Publisher} - * creation. New publishers must be wrapped in a multicast or replay strategy in order to be shared. - * VisibleForTesting + * Create a new PublisherCache where the cached publishers must be configured with a multicast or replay operator + * by the multicastSupplier function. * - * @param publisherSupplier a function that takes the key and returns a new {@link Publisher} for that key. * @param multicastStrategy a strategy used for wrapping new cache values. */ - PublisherCache( - final Function> publisherSupplier, - final MulticastStrategy multicastStrategy) { - this.publisherSupplier = publisherSupplier; + PublisherCache(final MulticastStrategy multicastStrategy) { this.multicastStrategy = multicastStrategy; this.publisherCache = new HashMap<>(); } + public static PublisherCache create() { + return new PublisherCache<>(MulticastStrategy.identity()); + } + /** * Create a new PublisherCache where the cached publishers will be configured for multicast for all - * consumers of a specific key. The publisherSupplier will be invoked when the cache does not contain a - * publisher for the requested key. + * consumers of a specific key. * - * @param publisherSupplier a function that takes the key and returns a new {@link Publisher} for that key. * @param a key type suitable for use as a {@link Map} key. * @param the type of the {@link Publisher} contained in the cache. * @return a new PublisherCache that will use the publisherSupplier to create new entries upon request. */ - public static PublisherCache multicast(final Function> publisherSupplier) { - return new PublisherCache<>(publisherSupplier, MulticastStrategy.wrapMulticast()); + public static PublisherCache multicast() { + return new PublisherCache<>(MulticastStrategy.wrapMulticast()); } /** * Create a new PublisherCache where the cached publishers will be configured for multicast for all - * consumers of a specific key. The publisherSupplier will be invoked when the cache does not contain a - * publisher for the requested key. + * consumers of a specific key. * - * @param publisherSupplier a function that takes the key and returns a new {@link Publisher} for that key. * @param minSubscribers The upstream subscribe operation will not happen until after this many {@link Subscriber} * subscribe to the return value. * @param a key type suitable for use as a {@link Map} key. * @param the type of the {@link Publisher} contained in the cache. * @return a new PublisherCache that will use the publisherSupplier to create new entries upon request. */ - public static PublisherCache multicast( - final Function> publisherSupplier, final int minSubscribers) { - return new PublisherCache<>(publisherSupplier, MulticastStrategy.wrapMulticast(minSubscribers)); + public static PublisherCache multicast(final int minSubscribers) { + return new PublisherCache<>(MulticastStrategy.wrapMulticast(minSubscribers)); } /** * Create a new PublisherCache where the cached publishers will be configured for multicast for all - * consumers of a specific key. The publisherSupplier will be invoked when the cache does not contain a - * publisher for the requested key. + * consumers of a specific key. * - * @param publisherSupplier a function that takes the key and returns a new {@link Publisher} for that key. * @param minSubscribers The upstream subscribe operation will not happen until after this many {@link Subscriber} * subscribe to the return value. * @param queueLimit The number of elements which will be queued for each {@link Subscriber} in order to compensate @@ -97,36 +88,31 @@ public static PublisherCache multicast( * @param the type of the {@link Publisher} contained in the cache. * @return a new PublisherCache that will use the publisherSupplier to create new entries upon request. */ - public static PublisherCache multicast( - final Function> publisherSupplier, final int minSubscribers, final int queueLimit) { - return new PublisherCache<>(publisherSupplier, MulticastStrategy.wrapMulticast(minSubscribers, queueLimit)); + public static PublisherCache multicast(final int minSubscribers, final int queueLimit) { + return new PublisherCache<>(MulticastStrategy.wrapMulticast(minSubscribers, queueLimit)); } /** * Create a new PublisherCache where the cacehed publishers will be configured for replay given the privided - * ReplayStrategy. The publisherSupplier will be invoked when the cache does not contain a publisher for - * the requested key. + * ReplayStrategy. * - * @param publisherSupplier a function that takes the key and returns a new {@link Publisher} for that key. * @param replayStrategy a {@link ReplayStrategy} that determines the replay behavior and history retention logic. * @param a key type suitable for use as a {@link Map} key. * @param the type of the {@link Publisher} contained in the cache. * @return a new PublisherCache that will use the publisherSupplier to create new entries upon request. */ - public static PublisherCache replay( - Function> publisherSupplier, - ReplayStrategy replayStrategy) { - return new PublisherCache<>(publisherSupplier, MulticastStrategy.wrapReplay(replayStrategy)); + public static PublisherCache replay(ReplayStrategy replayStrategy) { + return new PublisherCache<>(MulticastStrategy.wrapReplay(replayStrategy)); } /** - * Retrieve a the value for the given key, if no value exists in the cache it will be synchronously created. + * Retrieve the value for the given key, if no value exists in the cache it will be synchronously created. * * @param key a key corresponding to the requested {@link Publisher}. * @return a new {@link Publisher} from the publisherSupplier if not contained in the cache, otherwise * the cached publisher. */ - public Publisher get(K key) { + public Publisher get(final K key, final Function> publisherSupplier) { return Publisher.defer(() -> { synchronized (publisherCache) { if (publisherCache.containsKey(key)) { diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherCacheTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherCacheTest.java index 62dcda11e0..0d47a86dfd 100644 --- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherCacheTest.java +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherCacheTest.java @@ -16,7 +16,6 @@ package io.servicetalk.concurrent.api; import io.servicetalk.concurrent.PublisherSource.Subscription; -import io.servicetalk.concurrent.api.PublisherCache.MulticastStrategy; import io.servicetalk.concurrent.test.internal.TestPublisherSubscriber; import org.junit.jupiter.api.BeforeEach; @@ -25,9 +24,8 @@ import java.util.Arrays; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; -import static io.servicetalk.concurrent.api.PublisherCache.MulticastStrategy.wrapMulticast; -import static io.servicetalk.concurrent.api.PublisherCache.MulticastStrategy.wrapReplay; import static io.servicetalk.concurrent.api.SourceAdapters.toSource; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; @@ -37,6 +35,7 @@ class PublisherCacheTest { private AtomicInteger upstreamSubscriptionCount; private AtomicBoolean isUpstreamUnsubsrcibed; private AtomicInteger upstreamCacheRequestCount; + private Function> publisherSupplier; @BeforeEach public void setup() { @@ -44,23 +43,20 @@ public void setup() { this.upstreamSubscriptionCount = new AtomicInteger(0); this.isUpstreamUnsubsrcibed = new AtomicBoolean(false); this.upstreamCacheRequestCount = new AtomicInteger(0); - } - - private PublisherCache publisherCache(final MulticastStrategy strategy) { - return new PublisherCache<>((_ignore) -> { + this.publisherSupplier = (_ignore) -> { upstreamCacheRequestCount.incrementAndGet(); return testPublisher.afterOnSubscribe(subscription -> upstreamSubscriptionCount.incrementAndGet() ).afterFinally(() -> isUpstreamUnsubsrcibed.set(true)); - }, strategy); + }; } @Test public void multipleSubscribersToSameKeyReceiveMulticastEvents() { - final PublisherCache publisherCache = publisherCache(wrapMulticast()); + final PublisherCache publisherCache = PublisherCache.multicast(); final TestPublisherSubscriber subscriber1 = new TestPublisherSubscriber<>(); - toSource(publisherCache.get("foo")).subscribe(subscriber1); + toSource(publisherCache.get("foo", publisherSupplier)).subscribe(subscriber1); final Subscription subscription1 = subscriber1.awaitSubscription(); // the first subscriber receives the initial event @@ -70,7 +66,7 @@ public void multipleSubscribersToSameKeyReceiveMulticastEvents() { // the second subscriber receives the cached publisher final TestPublisherSubscriber subscriber2 = new TestPublisherSubscriber<>(); - toSource(publisherCache.get("foo")).subscribe(subscriber2); + toSource(publisherCache.get("foo", publisherSupplier)).subscribe(subscriber2); final Subscription subscription2 = subscriber2.awaitSubscription(); // all subscribers receive all subsequent events @@ -90,16 +86,16 @@ public void multipleSubscribersToSameKeyReceiveMulticastEvents() { @Test public void minSubscribersMulticastPolicySubscribesUpstreamOnce() { - final PublisherCache publisherCache = publisherCache(wrapMulticast()); + final PublisherCache publisherCache = PublisherCache.multicast(); final TestPublisherSubscriber subscriber1 = new TestPublisherSubscriber<>(); - toSource(publisherCache.get("foo")).subscribe(subscriber1); + toSource(publisherCache.get("foo", publisherSupplier)).subscribe(subscriber1); final Subscription subscription1 = subscriber1.awaitSubscription(); assertThat(upstreamSubscriptionCount.get(), is(1)); final TestPublisherSubscriber subscriber2 = new TestPublisherSubscriber<>(); - toSource(publisherCache.get("foo")).subscribe(subscriber2); + toSource(publisherCache.get("foo", publisherSupplier)).subscribe(subscriber2); final Subscription subscription2 = subscriber2.awaitSubscription(); assertThat(upstreamSubscriptionCount.get(), is(1)); @@ -115,16 +111,16 @@ public void minSubscribersMulticastPolicySubscribesUpstreamOnce() { @Test public void unSubscribeUpstreamAndInvalidateCacheWhenEmpty() { - final PublisherCache publisherCache = publisherCache(wrapMulticast()); + final PublisherCache publisherCache = PublisherCache.multicast(); final TestPublisherSubscriber subscriber1 = new TestPublisherSubscriber<>(); - toSource(publisherCache.get("foo")).subscribe(subscriber1); + toSource(publisherCache.get("foo", publisherSupplier)).subscribe(subscriber1); final Subscription subscription1 = subscriber1.awaitSubscription(); assertThat(upstreamSubscriptionCount.get(), is(1)); final TestPublisherSubscriber subscriber2 = new TestPublisherSubscriber<>(); - toSource(publisherCache.get("foo")).subscribe(subscriber2); + toSource(publisherCache.get("foo", publisherSupplier)).subscribe(subscriber2); final Subscription subscription2 = subscriber2.awaitSubscription(); assertThat(upstreamSubscriptionCount.get(), is(1)); @@ -140,16 +136,16 @@ public void unSubscribeUpstreamAndInvalidateCacheWhenEmpty() { @Test public void cacheSubscriptionAndUnsubscriptionWithMulticastMinSubscribers() { - final PublisherCache publisherCache = publisherCache(wrapMulticast(2)); + final PublisherCache publisherCache = PublisherCache.multicast(2); final TestPublisherSubscriber subscriber1 = new TestPublisherSubscriber<>(); - toSource(publisherCache.get("foo")).subscribe(subscriber1); + toSource(publisherCache.get("foo", publisherSupplier)).subscribe(subscriber1); final Subscription subscription1 = subscriber1.awaitSubscription(); assertThat(upstreamSubscriptionCount.get(), is(0)); final TestPublisherSubscriber subscriber2 = new TestPublisherSubscriber<>(); - toSource(publisherCache.get("foo")).subscribe(subscriber2); + toSource(publisherCache.get("foo", publisherSupplier)).subscribe(subscriber2); final Subscription subscription2 = subscriber2.awaitSubscription(); assertThat(upstreamSubscriptionCount.get(), is(1)); @@ -165,14 +161,14 @@ public void cacheSubscriptionAndUnsubscriptionWithMulticastMinSubscribers() { @Test public void cacheSubscriptionAndUnsubscriptionWithReplay() { - final PublisherCache publisherCache = publisherCache( - wrapReplay(ReplayStrategies.historyBuilder(1) + final PublisherCache publisherCache = PublisherCache.replay( + ReplayStrategies.historyBuilder(1) .cancelUpstream(true) .minSubscribers(1) - .build())); + .build()); final TestPublisherSubscriber subscriber1 = new TestPublisherSubscriber<>(); - toSource(publisherCache.get("foo")).subscribe(subscriber1); + toSource(publisherCache.get("foo", publisherSupplier)).subscribe(subscriber1); final Subscription subscription1 = subscriber1.awaitSubscription(); subscription1.request(3); @@ -180,7 +176,7 @@ public void cacheSubscriptionAndUnsubscriptionWithReplay() { assertThat(subscriber1.takeOnNext(3), is(Arrays.asList(1, 2, 3))); final TestPublisherSubscriber subscriber2 = new TestPublisherSubscriber<>(); - toSource(publisherCache.get("foo")).subscribe(subscriber2); + toSource(publisherCache.get("foo", publisherSupplier)).subscribe(subscriber2); final Subscription subscription2 = subscriber2.awaitSubscription(); assertThat(upstreamSubscriptionCount.get(), is(1)); @@ -199,11 +195,11 @@ public void cacheSubscriptionAndUnsubscriptionWithReplay() { @Test public void testErrorFromUpstreamInvalidatesCacheEntryAndRequestsANewStream() { - final PublisherCache publisherCache = publisherCache(wrapMulticast()); + final PublisherCache publisherCache = PublisherCache.multicast(); final TestPublisherSubscriber subscriber1 = new TestPublisherSubscriber<>(); // use an "always" retry policy to force the situation - final Publisher discovery = publisherCache.get("foo").retry((i, cause) -> true); + final Publisher discovery = publisherCache.get("foo", publisherSupplier).retry((i, cause) -> true); toSource(discovery).subscribe(subscriber1); final Subscription subscription1 = subscriber1.awaitSubscription(); From 1649c6204855c0951dd169e950758e1246e81441 Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 5 Mar 2024 13:50:45 -0700 Subject: [PATCH 4/9] doc was accidentally on the wrong method --- .../io/servicetalk/concurrent/api/PublisherCache.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherCache.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherCache.java index 9454bca03d..6a2d70982e 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherCache.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherCache.java @@ -35,17 +35,17 @@ public final class PublisherCache { private final MulticastStrategy multicastStrategy; private final Map> publisherCache; + PublisherCache(final MulticastStrategy multicastStrategy) { + this.multicastStrategy = multicastStrategy; + this.publisherCache = new HashMap<>(); + } + /** * Create a new PublisherCache where the cached publishers must be configured with a multicast or replay operator * by the multicastSupplier function. * * @param multicastStrategy a strategy used for wrapping new cache values. */ - PublisherCache(final MulticastStrategy multicastStrategy) { - this.multicastStrategy = multicastStrategy; - this.publisherCache = new HashMap<>(); - } - public static PublisherCache create() { return new PublisherCache<>(MulticastStrategy.identity()); } From b45a9d12fa8ed896395c09be791b71868a3f16cc Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 5 Mar 2024 13:52:06 -0700 Subject: [PATCH 5/9] make strategy private --- .../main/java/io/servicetalk/concurrent/api/PublisherCache.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherCache.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherCache.java index 6a2d70982e..ef5e314af0 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherCache.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherCache.java @@ -232,7 +232,7 @@ private static final class Holder { * @param the type of the {@link Publisher}. */ @FunctionalInterface - interface MulticastStrategy { + private interface MulticastStrategy { Publisher apply(Publisher cached); static MulticastStrategy identity() { From d6fc17dd23c25923af93a425c7e8f3aa28907c4f Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 5 Mar 2024 14:04:22 -0700 Subject: [PATCH 6/9] remove unused param tag --- .../main/java/io/servicetalk/concurrent/api/PublisherCache.java | 1 - 1 file changed, 1 deletion(-) diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherCache.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherCache.java index ef5e314af0..1447c97c8f 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherCache.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherCache.java @@ -44,7 +44,6 @@ public final class PublisherCache { * Create a new PublisherCache where the cached publishers must be configured with a multicast or replay operator * by the multicastSupplier function. * - * @param multicastStrategy a strategy used for wrapping new cache values. */ public static PublisherCache create() { return new PublisherCache<>(MulticastStrategy.identity()); From 813bac92835bad3e4c1ae60f2cb366195f1039b5 Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 5 Mar 2024 14:10:44 -0700 Subject: [PATCH 7/9] update javadoc --- .../io/servicetalk/concurrent/api/PublisherCache.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherCache.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherCache.java index 1447c97c8f..05ffa89267 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherCache.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherCache.java @@ -44,6 +44,7 @@ public final class PublisherCache { * Create a new PublisherCache where the cached publishers must be configured with a multicast or replay operator * by the multicastSupplier function. * + * @return a new PublisherCache that will not wrap cached values. */ public static PublisherCache create() { return new PublisherCache<>(MulticastStrategy.identity()); @@ -55,7 +56,7 @@ public static PublisherCache create() { * * @param a key type suitable for use as a {@link Map} key. * @param the type of the {@link Publisher} contained in the cache. - * @return a new PublisherCache that will use the publisherSupplier to create new entries upon request. + * @return a new PublisherCache that will wrap cached values with multicast operator. */ public static PublisherCache multicast() { return new PublisherCache<>(MulticastStrategy.wrapMulticast()); @@ -69,7 +70,7 @@ public static PublisherCache multicast() { * subscribe to the return value. * @param a key type suitable for use as a {@link Map} key. * @param the type of the {@link Publisher} contained in the cache. - * @return a new PublisherCache that will use the publisherSupplier to create new entries upon request. + * @return a new PublisherCache that will wrap cached values with multicast operator. */ public static PublisherCache multicast(final int minSubscribers) { return new PublisherCache<>(MulticastStrategy.wrapMulticast(minSubscribers)); @@ -85,7 +86,7 @@ public static PublisherCache multicast(final int minSubscribers) { * for unequal demand. * @param a key type suitable for use as a {@link Map} key. * @param the type of the {@link Publisher} contained in the cache. - * @return a new PublisherCache that will use the publisherSupplier to create new entries upon request. + * @return aa new PublisherCache that will wrap cached values with multicast operator. */ public static PublisherCache multicast(final int minSubscribers, final int queueLimit) { return new PublisherCache<>(MulticastStrategy.wrapMulticast(minSubscribers, queueLimit)); @@ -98,7 +99,7 @@ public static PublisherCache multicast(final int minSubscribers, fi * @param replayStrategy a {@link ReplayStrategy} that determines the replay behavior and history retention logic. * @param a key type suitable for use as a {@link Map} key. * @param the type of the {@link Publisher} contained in the cache. - * @return a new PublisherCache that will use the publisherSupplier to create new entries upon request. + * @return a new PublisherCache that will wrap cached values with replay operator. */ public static PublisherCache replay(ReplayStrategy replayStrategy) { return new PublisherCache<>(MulticastStrategy.wrapReplay(replayStrategy)); From 363310c58ec05e62e51bd957187fcd4e94b5b372 Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 5 Mar 2024 14:33:24 -0700 Subject: [PATCH 8/9] javadoc checkstyle --- .../java/io/servicetalk/concurrent/api/PublisherCache.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherCache.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherCache.java index 05ffa89267..fad7855823 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherCache.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherCache.java @@ -44,6 +44,8 @@ public final class PublisherCache { * Create a new PublisherCache where the cached publishers must be configured with a multicast or replay operator * by the multicastSupplier function. * + * @param a key type suitable for use as a {@link Map} key. + * @param the type of the {@link Publisher} contained in the cache. * @return a new PublisherCache that will not wrap cached values. */ public static PublisherCache create() { @@ -109,6 +111,7 @@ public static PublisherCache replay(ReplayStrategy replayStrateg * Retrieve the value for the given key, if no value exists in the cache it will be synchronously created. * * @param key a key corresponding to the requested {@link Publisher}. + * @param publisherSupplier if the key does not exist in the cache, used to create a new publisher. * @return a new {@link Publisher} from the publisherSupplier if not contained in the cache, otherwise * the cached publisher. */ From ea1b392db8b5c722a008dea0e2273c4bb3a90a8f Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 5 Mar 2024 14:44:06 -0700 Subject: [PATCH 9/9] tests public -> package private --- .../concurrent/api/PublisherCacheTest.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherCacheTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherCacheTest.java index 0d47a86dfd..1983a77702 100644 --- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherCacheTest.java +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherCacheTest.java @@ -52,7 +52,7 @@ public void setup() { } @Test - public void multipleSubscribersToSameKeyReceiveMulticastEvents() { + void multipleSubscribersToSameKeyReceiveMulticastEvents() { final PublisherCache publisherCache = PublisherCache.multicast(); final TestPublisherSubscriber subscriber1 = new TestPublisherSubscriber<>(); @@ -85,7 +85,7 @@ public void multipleSubscribersToSameKeyReceiveMulticastEvents() { } @Test - public void minSubscribersMulticastPolicySubscribesUpstreamOnce() { + void minSubscribersMulticastPolicySubscribesUpstreamOnce() { final PublisherCache publisherCache = PublisherCache.multicast(); final TestPublisherSubscriber subscriber1 = new TestPublisherSubscriber<>(); @@ -110,7 +110,7 @@ public void minSubscribersMulticastPolicySubscribesUpstreamOnce() { } @Test - public void unSubscribeUpstreamAndInvalidateCacheWhenEmpty() { + void unSubscribeUpstreamAndInvalidateCacheWhenEmpty() { final PublisherCache publisherCache = PublisherCache.multicast(); final TestPublisherSubscriber subscriber1 = new TestPublisherSubscriber<>(); @@ -135,7 +135,7 @@ public void unSubscribeUpstreamAndInvalidateCacheWhenEmpty() { } @Test - public void cacheSubscriptionAndUnsubscriptionWithMulticastMinSubscribers() { + void cacheSubscriptionAndUnsubscriptionWithMulticastMinSubscribers() { final PublisherCache publisherCache = PublisherCache.multicast(2); final TestPublisherSubscriber subscriber1 = new TestPublisherSubscriber<>(); @@ -160,7 +160,7 @@ public void cacheSubscriptionAndUnsubscriptionWithMulticastMinSubscribers() { } @Test - public void cacheSubscriptionAndUnsubscriptionWithReplay() { + void cacheSubscriptionAndUnsubscriptionWithReplay() { final PublisherCache publisherCache = PublisherCache.replay( ReplayStrategies.historyBuilder(1) .cancelUpstream(true) @@ -194,7 +194,7 @@ public void cacheSubscriptionAndUnsubscriptionWithReplay() { } @Test - public void testErrorFromUpstreamInvalidatesCacheEntryAndRequestsANewStream() { + void testErrorFromUpstreamInvalidatesCacheEntryAndRequestsANewStream() { final PublisherCache publisherCache = PublisherCache.multicast(); final TestPublisherSubscriber subscriber1 = new TestPublisherSubscriber<>();