Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce a cache for Publishers that tracks subscriptions to manage the cache #2861

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
/*
* Copyright © 2024 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.concurrent.api;
mgodave marked this conversation as resolved.
Show resolved Hide resolved

import io.servicetalk.concurrent.PublisherSource.Subscriber;
import io.servicetalk.concurrent.PublisherSource.Subscription;

import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import javax.annotation.Nullable;

/**
* A cache of {@link Publisher}s. Keys must correctly implement Object::hashCode and Object::equals.
* The cache can be created with either multicast or replay configurations. Subscriptions
* are tracked by the cache and Publishers are removed when there are no more subscriptions.
*
* @param <K> a key type suitable for use as a Map key, that correctly implements hashCode/equals.
* @param <T> the type of the publisher that is cached.
*/
public final class PublisherCache<K, T> {
private final MulticastStrategy<T> multicastStrategy;
private final Map<K, Holder<T>> publisherCache;

PublisherCache(final MulticastStrategy<T> multicastStrategy) {
this.multicastStrategy = multicastStrategy;
this.publisherCache = new HashMap<>();
}

/**
* Create a new PublisherCache where the cached publishers must be configured with a multicast or replay operator
* by the multicastSupplier function.
*
* @param <K> a key type suitable for use as a {@link Map} key.
* @param <T> the type of the {@link Publisher} contained in the cache.
* @return a new PublisherCache that will not wrap cached values.
*/
public static <K, T> PublisherCache<K, T> create() {
return new PublisherCache<>(MulticastStrategy.identity());
}

/**
* Create a new PublisherCache where the cached publishers will be configured for multicast for all
* consumers of a specific key.
*
* @param <K> a key type suitable for use as a {@link Map} key.
* @param <T> the type of the {@link Publisher} contained in the cache.
* @return a new PublisherCache that will wrap cached values with multicast operator.
*/
public static <K, T> PublisherCache<K, T> multicast() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by exposing these static method we may need to add a new factory method here for each operator overload. did you consider instead exposing just a Function<> (or similar) so folks can apply the variant they want? some risks maybe:

  • folks could apply operators that don't obey the assumptions (we could clarify the constraints the operator must abide by ... allows for multiple subscribers, cancels upstream only after no subscribers present, no async cancel processing as the synchronization here depends upon it, ..)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did consider this. I'm open to removing these, the create() method just above is essentially what you describe, the user would specify the Publisher configured however they desire in the function on get.

I had initially built this without the static methods and a constructor that took a function which would be used to supply a new Publisher on a cache miss. I decided to move this function to the get method as it emulated how I might expect to use a cache, ex: I might not want a function from name -> Publisher but rather I would prefer a closure that would allow me to use the context at hand to instantiate the new object.

For an initial API I don't have any problem deferring to your suggestion as we learn how this ultimately ends up being used.

return new PublisherCache<>(MulticastStrategy.wrapMulticast());
}

/**
* Create a new PublisherCache where the cached publishers will be configured for multicast for all
* consumers of a specific key.
*
* @param minSubscribers The upstream subscribe operation will not happen until after this many {@link Subscriber}
* subscribe to the return value.
* @param <K> a key type suitable for use as a {@link Map} key.
* @param <T> the type of the {@link Publisher} contained in the cache.
* @return a new PublisherCache that will wrap cached values with multicast operator.
*/
public static <K, T> PublisherCache<K, T> multicast(final int minSubscribers) {
return new PublisherCache<>(MulticastStrategy.wrapMulticast(minSubscribers));
}

/**
* Create a new PublisherCache where the cached publishers will be configured for multicast for all
* consumers of a specific key.
*
* @param minSubscribers The upstream subscribe operation will not happen until after this many {@link Subscriber}
* subscribe to the return value.
* @param queueLimit The number of elements which will be queued for each {@link Subscriber} in order to compensate
* for unequal demand.
* @param <K> a key type suitable for use as a {@link Map} key.
* @param <T> the type of the {@link Publisher} contained in the cache.
* @return aa new PublisherCache that will wrap cached values with multicast operator.
*/
public static <K, T> PublisherCache<K, T> multicast(final int minSubscribers, final int queueLimit) {
return new PublisherCache<>(MulticastStrategy.wrapMulticast(minSubscribers, queueLimit));
}

/**
* Create a new PublisherCache where the cacehed publishers will be configured for replay given the privided
* ReplayStrategy.
*
* @param replayStrategy a {@link ReplayStrategy} that determines the replay behavior and history retention logic.
* @param <K> a key type suitable for use as a {@link Map} key.
* @param <T> the type of the {@link Publisher} contained in the cache.
* @return a new PublisherCache that will wrap cached values with replay operator.
*/
public static <K, T> PublisherCache<K, T> replay(ReplayStrategy<T> replayStrategy) {
return new PublisherCache<>(MulticastStrategy.wrapReplay(replayStrategy));
}

/**
* Retrieve the value for the given key, if no value exists in the cache it will be synchronously created.
*
* @param key a key corresponding to the requested {@link Publisher}.
* @param publisherSupplier if the key does not exist in the cache, used to create a new publisher.
* @return a new {@link Publisher} from the publisherSupplier if not contained in the cache, otherwise
* the cached publisher.
*/
public Publisher<T> get(final K key, final Function<K, Publisher<T>> publisherSupplier) {
return Publisher.defer(() -> {
synchronized (publisherCache) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

synchronized has been used in the past bcz it doesn't require additional allocations. however loom fibers don't support synchronized, should we use Lock objects instead?

if (publisherCache.containsKey(key)) {
return publisherCache.get(key).publisher;
}

final Holder<T> item2 = new Holder<>();
mgodave marked this conversation as resolved.
Show resolved Hide resolved
publisherCache.put(key, item2);

final Publisher<T> newPublisher = publisherSupplier.apply(key)
.liftSync(subscriber -> new Subscriber<T>() {
@Override
public void onSubscribe(Subscription subscription) {
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
subscription.request(n);
}

@Override
public void cancel() {
try {
assert Thread.holdsLock(publisherCache);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider adding a comment here too (lock was acquired after the multi-cast, we need to be holding the lock here to interact with the map and prevent returning a cancelled Publisher)

publisherCache.remove(key, item2);
} finally {
subscription.cancel();
}
}
});
}

@Override
public void onNext(@Nullable T next) {
subscriber.onNext(next);
}

@Override
public void onError(Throwable t) {
lockRemoveFromMap();
subscriber.onError(t);
}

@Override
public void onComplete() {
lockRemoveFromMap();
subscriber.onComplete();
}

private void lockRemoveFromMap() {
synchronized (publisherCache) {
publisherCache.remove(key, item2);
}
}
});

item2.publisher = multicastStrategy.apply(newPublisher)
.liftSync(subscriber -> new Subscriber<T>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this second liftSync? I think it applies to the individual streams. In my minds eye that means if a single stream ends it removes the underlying stream from the cache. I don't know how that would happen unless the parent publisher completed and we remove it at that level as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's there to handle subscriber errors and clean up if the subscriber is put into a bad state via a throw. In fact it looks like I missed a case with onNext.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a test case that would demonstrate their purpose? If I delete the second liftsync and move the syncrhronized to the first everything still works as expected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgodave - can you add a comment along the lines of:

Motivation for this being "after" the multicast is bcz multicast doesn't propagate cancellation upstream unless there are no subscribers (e.g. they all cancel) ... so we acquire the lock in cancel here, there are no async boundaries in multi-cast, and then we remove from the map in cancel "above" multicast. This prevents race conditions where someone does a get and we return a Publisher that has been cancelled (because there are no subscriber).

Also consider breaking this out into a named (e.g. not anonymous, private/final) class which is easier to look at when debugging larger operator chains.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also consider breaking this out into a named (e.g. not anonymous, private/final) class which is easier to look at when debugging larger operator chains.

These are inline because they access four different pieces of local state and properties. I played around with breaking this out and it doesn't feel cohesive. I'm choosing to leave it as is for now but I am happy to revisit if you have strong feelings.

@Override
public void onSubscribe(Subscription subscription) {
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
subscription.request(n);
}

@Override
public void cancel() {
synchronized (publisherCache) {
subscription.cancel();
}
}
});
}

@Override
public void onNext(@Nullable T next) {
subscriber.onNext(next);
}

@Override
public void onError(Throwable t) {
try {
subscriber.onError(t);
} finally {
lockRemoveFromMap();
}
}

@Override
public void onComplete() {
try {
subscriber.onComplete();
} finally {
lockRemoveFromMap();
}
}

private void lockRemoveFromMap() {
synchronized (publisherCache) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment here to the effect:

  • completion of the first Subscriber after multicast means the the multicast operator is in a terminal state and we therefore remove it from the map. There are cases where folks may want to re-subscribe to the Publisher (e.g. get the cached value, trigger another event) however that currently isn't supported and we favor bounding the size of the map which has scope outside the operator chain.

publisherCache.remove(key, item2);
}
}
});
return item2.publisher;
}
});
}

private static final class Holder<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is here so that we can add it to the hashmap and avoid some jumping through hoops to manage the reference equality stuff for HashMap. A quick code comment might be helpful for future readers.

@Nullable
Publisher<T> publisher;
}

/**
* A series of strategies used to make cached {@link Publisher}s "shareable".
*
* @param <T> the type of the {@link Publisher}.
*/
@FunctionalInterface
private interface MulticastStrategy<T> {
Publisher<T> apply(Publisher<T> cached);

static <T> MulticastStrategy<T> identity() {
return cached -> cached;
}

static <T> MulticastStrategy<T> wrapMulticast() {
return cached -> cached.multicast(1, true);
}

static <T> MulticastStrategy<T> wrapMulticast(int minSubscribers) {
return cached -> cached.multicast(minSubscribers, true);
}

static <T> MulticastStrategy<T> wrapMulticast(int minSubscribers, int queueLimit) {
return cached -> cached.multicast(minSubscribers, queueLimit);
}

static <T> MulticastStrategy<T> wrapReplay(final ReplayStrategy<T> strategy) {
return cached -> cached.replay(strategy);
}
}
}
Loading
Loading