Skip to content

Commit 64ea60d

Browse files
authored
Add shaded Caffeine producer cache provider (#124)
1 parent 2bf832c commit 64ea60d

File tree

10 files changed

+420
-10
lines changed

10 files changed

+420
-10
lines changed

README.adoc

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,9 @@ messageId.subscribe(System.out::println);
8080
8181
=== Sending messages with cached producer
8282
83-
By default a ConcurrentHashMap based cache is used. It's recommended to use a more advanced cache based on Caffeine. The cache will get used as the default implementation when it is on the classpath.
83+
By default, a ConcurrentHashMap based cache is used.
84+
It's recommended to use a more advanced cache based on Caffeine.
85+
The cache will get used as the default implementation when it is on the classpath.
8486
8587
Adding Caffeine based producer cache with Gradle:
8688
@@ -126,10 +128,47 @@ Mono<MessageId> messageId = messageSender
126128
messageId.subscribe(System.out::println);
127129
----
128130
129-
It is recommended to use a cached producer in most cases. The cache enables reusing the Pulsar Producer instance and related resources across multiple message sending calls.
131+
It is recommended to use a cached producer in most cases.
132+
The cache enables reusing the Pulsar Producer instance and related resources across multiple message sending calls.
130133
This improves performance since a producer won't have to be created and closed before and after sending a message.
131134
132-
The adapter library implementation together with the cache implementation will also enable reactive backpressure for sending messages. The `maxInflight` setting will limit the number of messages that are pending from the client to the broker. The solution will limit reactive streams subscription requests to keep the number of pending messages under the defined limit. This limit is per-topic and impacts the local JVM only.
135+
The adapter library implementation together with the cache implementation will also enable reactive backpressure for sending messages.
136+
The `maxInflight` setting will limit the number of messages that are pending from the client to the broker.
137+
The solution will limit reactive streams subscription requests to keep the number of pending messages under the defined limit.
138+
This limit is per-topic and impacts the local JVM only.
139+
140+
=== Shaded version of Caffeine
141+
A version of the provider is available that shades it usage of Caffeine.
142+
This is useful in scenarios where there is another version of Caffeine required in your application or if you do not want Caffeine on the classpath.
143+
144+
Adding shaded Caffeine based producer cache with Gradle:
145+
146+
[source,groovy,subs="verbatim,attributes"]
147+
----
148+
dependencies {
149+
implementation "org.apache.pulsar:pulsar-client-reactive-adapter:{latest_version}"
150+
implementation "org.apache.pulsar:pulsar-client-reactive-producer-cache-caffeine-shaded:{latest_version}"
151+
}
152+
----
153+
154+
Adding shaded Caffeine based producer cache with Maven:
155+
156+
[source,xml,subs="verbatim,attributes"]
157+
----
158+
<dependencies>
159+
<dependency>
160+
<groupId>org.apache.pulsar</groupId>
161+
<artifactId>pulsar-client-reactive-adapter</artifactId>
162+
<version>{latest_version}</version>
163+
</dependency>
164+
<dependency>
165+
<groupId>org.apache.pulsar</groupId>
166+
<artifactId>pulsar-client-reactive-producer-cache-caffeine-shaded</artifactId>
167+
<version>{latest_version}</version>
168+
</dependency>
169+
</dependencies>
170+
----
171+
133172
134173
=== Reading messages
135174

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,4 @@
1717
# under the License.
1818
#
1919

20-
version=0.2.0
20+
version=0.2.1-SNAPSHOT

pulsar-client-reactive-adapter/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ dependencies {
3737
testImplementation libs.mockito.inline
3838

3939
intTestImplementation project(':pulsar-client-reactive-producer-cache-caffeine')
40+
intTestImplementation project(path: ':pulsar-client-reactive-producer-cache-caffeine-shaded', configuration: 'shadow')
4041
intTestImplementation libs.junit.jupiter
4142
intTestImplementation libs.testcontainers.pulsar
4243
intTestImplementation libs.assertj.core

pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.pulsar.reactive.client.api.ReactivePulsarClient;
3737
import org.apache.pulsar.reactive.client.internal.adapter.ConcurrentHashMapProducerCacheProvider;
3838
import org.apache.pulsar.reactive.client.producercache.CaffeineProducerCacheProvider;
39+
import org.apache.pulsar.reactive.client.producercache.CaffeineShadedProducerCacheProvider;
3940
import org.junit.jupiter.api.Test;
4041
import org.junit.jupiter.params.ParameterizedTest;
4142
import org.junit.jupiter.params.provider.Arguments;
@@ -46,12 +47,14 @@
4647
class ReactiveMessageSenderE2ETest {
4748

4849
private static Stream<Arguments> shouldSendMessageToTopicWithCachedProducer() {
49-
return Arrays
50-
.asList(Arguments.of("ConcurrentHashMapProducerCacheProvider",
50+
return Arrays.asList(
51+
Arguments.of("ConcurrentHashMapProducerCacheProvider",
5152
AdaptedReactivePulsarClientFactory.createCache(new ConcurrentHashMapProducerCacheProvider())),
52-
Arguments.of("Default", AdaptedReactivePulsarClientFactory.createCache()),
53-
Arguments.of("CaffeineProducerCacheProvider",
54-
AdaptedReactivePulsarClientFactory.createCache(new CaffeineProducerCacheProvider())))
53+
Arguments.of("Default", AdaptedReactivePulsarClientFactory.createCache()),
54+
Arguments.of("CaffeineProducerCacheProvider",
55+
AdaptedReactivePulsarClientFactory.createCache(new CaffeineProducerCacheProvider())),
56+
Arguments.of("CaffeineShadedProducerCacheProvider",
57+
AdaptedReactivePulsarClientFactory.createCache(new CaffeineShadedProducerCacheProvider())))
5558
.stream();
5659
}
5760

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* https://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
plugins {
21+
id 'pulsar-client-reactive.codestyle-conventions'
22+
id 'pulsar-client-reactive.library-conventions'
23+
id 'com.github.johnrengelman.shadow' version '7.1.2'
24+
}
25+
26+
dependencies {
27+
api project(':pulsar-client-reactive-adapter')
28+
implementation libs.caffeine
29+
shadow project(':pulsar-client-reactive-adapter')
30+
testImplementation libs.junit.jupiter
31+
testImplementation libs.assertj.core
32+
testImplementation libs.reactor.test
33+
testImplementation libs.mockito.core
34+
}
35+
36+
description = "Caffeine (shaded) implementation of producer cache"
37+
38+
jar {
39+
archiveClassifier.set('original')
40+
}
41+
42+
shadowJar {
43+
archiveClassifier.set(null)
44+
dependsOn(project.tasks.jar)
45+
manifest {
46+
inheritFrom project.tasks.jar.manifest
47+
}
48+
relocate 'com.github.benmanes.caffeine', 'org.springframework.pulsar.shade.com.github.benmanes.caffeine'
49+
relocate 'com.google', 'org.springframework.pulsar.shade.com.google'
50+
relocate 'org.checkerframework', 'org.springframework.pulsar.shade.org.checkerframework'
51+
dependencies {
52+
exclude(dependency {
53+
!['com.github.ben-manes.caffeine', 'org.checkerframework', 'com.google.errorprone'].contains(it.moduleGroup)
54+
})
55+
}
56+
}
57+
58+
tasks.build.dependsOn tasks.shadowJar
59+
60+
// disable module metadata - otherwise original jar will be used when published
61+
tasks.withType(GenerateModuleMetadata) {
62+
enabled = false
63+
}
64+
65+
// delay the maven publishing - instead add shadowJar to the publication
66+
components.java.withVariantsFromConfiguration(configurations.shadowRuntimeElements) {
67+
skip()
68+
}
69+
70+
publishing {
71+
publications {
72+
mavenJava {
73+
artifact(shadowJar)
74+
pom.withXml {
75+
Node pomNode = asNode()
76+
pomNode.dependencies.'*'.findAll() {
77+
it.artifactId.text() == 'caffeine'
78+
}.each() {
79+
it.parent().remove(it)
80+
}
81+
}
82+
}
83+
}
84+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* https://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.pulsar.reactive.client.producercache;
21+
22+
import java.time.Duration;
23+
import java.util.concurrent.CompletableFuture;
24+
import java.util.function.Function;
25+
26+
import com.github.benmanes.caffeine.cache.AsyncCache;
27+
import com.github.benmanes.caffeine.cache.Caffeine;
28+
import com.github.benmanes.caffeine.cache.RemovalCause;
29+
import com.github.benmanes.caffeine.cache.Scheduler;
30+
import org.apache.pulsar.reactive.client.adapter.ProducerCacheProvider;
31+
import reactor.core.scheduler.Schedulers;
32+
33+
/**
34+
* Producer cache provider that uses a shaded Caffeine {@link AsyncCache} to cache
35+
* entries.
36+
*/
37+
public class CaffeineShadedProducerCacheProvider implements ProducerCacheProvider {
38+
39+
private final AsyncCache<Object, Object> cache;
40+
41+
/**
42+
* Create a cache provider instance with default values.
43+
*/
44+
public CaffeineShadedProducerCacheProvider() {
45+
this(Duration.ofMinutes(1), Duration.ofMinutes(10), 1000L, 50);
46+
}
47+
48+
/**
49+
* Create a cache provider instance with the specified options.
50+
* @param cacheExpireAfterAccess time period after last access to expire unused
51+
* entries in the cache
52+
* @param cacheExpireAfterWrite time period after last write to expire unused entries
53+
* in the cache
54+
* @param cacheMaximumSize maximum size of cache (entries)
55+
* @param cacheInitialCapacity the initial size of cache
56+
*/
57+
public CaffeineShadedProducerCacheProvider(Duration cacheExpireAfterAccess, Duration cacheExpireAfterWrite,
58+
Long cacheMaximumSize, Integer cacheInitialCapacity) {
59+
this.cache = Caffeine.newBuilder().expireAfterAccess(cacheExpireAfterAccess)
60+
.expireAfterWrite(cacheExpireAfterWrite).maximumSize(cacheMaximumSize)
61+
.initialCapacity(cacheInitialCapacity).scheduler(Scheduler.systemScheduler())
62+
.executor(Schedulers.boundedElastic()::schedule).removalListener(this::onRemoval).buildAsync();
63+
}
64+
65+
private void onRemoval(Object key, Object entry, RemovalCause cause) {
66+
if (entry instanceof AutoCloseable) {
67+
try {
68+
((AutoCloseable) entry).close();
69+
}
70+
catch (Exception ex) {
71+
throw new RuntimeException(ex);
72+
}
73+
}
74+
}
75+
76+
public void close() {
77+
this.cache.synchronous().invalidateAll();
78+
}
79+
80+
@Override
81+
public <K, V> CompletableFuture<V> getOrCreateCachedEntry(K key,
82+
Function<K, CompletableFuture<V>> createEntryFunction) {
83+
return (CompletableFuture<V>) this.cache.get(key,
84+
(__, ___) -> (CompletableFuture) createEntryFunction.apply(key));
85+
}
86+
87+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* https://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.pulsar.reactive.client.producercache;
21+
22+
import org.apache.pulsar.reactive.client.adapter.ProducerCacheProvider;
23+
import org.apache.pulsar.reactive.client.adapter.ProducerCacheProviderFactory;
24+
25+
/**
26+
* {@link ProducerCacheProviderFactory} that creates instances of
27+
* {@link CaffeineShadedProducerCacheProvider}.
28+
*/
29+
public class CaffeineShadedProducerCacheProviderFactory implements ProducerCacheProviderFactory {
30+
31+
@Override
32+
public ProducerCacheProvider get() {
33+
return new CaffeineShadedProducerCacheProvider();
34+
}
35+
36+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
org.apache.pulsar.reactive.client.producercache.CaffeineShadedProducerCacheProviderFactory

0 commit comments

Comments
 (0)