Skip to content

Commit 17fedf6

Browse files
authored
Introduce share consumer factories for Kafka Queues (Early Access)
* Introduce share consumer factories for Kafka Queues (Early Access) - Preliminary set of changes to support Kafka queueus introduced via KIP-932 (Kafka Queue) for early access in Apache Kafka 4.0.0. See: https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka - Add ShareConsumerFactory interface and DefaultShareConsumerFactory implementation as the initial building blocks for supporting Kafka Queues (KIP-932) in Spring for Apache Kafka 4.0.x. This factory and the implementation provide a flexible API for creating share consumers, and are designed as the foundation for further queue integration. - Tests to verify the share consumer behavior Related to #3875 #3875 Signed-off-by: Soby Chacko <[email protected]>
1 parent 579e7cb commit 17fedf6

File tree

3 files changed

+687
-0
lines changed

3 files changed

+687
-0
lines changed
Lines changed: 355 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,355 @@
1+
/*
2+
* Copyright 2025-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.core;
18+
19+
import java.time.Duration;
20+
import java.util.ArrayList;
21+
import java.util.Collections;
22+
import java.util.HashMap;
23+
import java.util.Iterator;
24+
import java.util.List;
25+
import java.util.Map;
26+
import java.util.concurrent.ConcurrentHashMap;
27+
import java.util.function.Supplier;
28+
29+
import org.apache.kafka.clients.consumer.KafkaShareConsumer;
30+
import org.apache.kafka.clients.consumer.ShareConsumer;
31+
import org.apache.kafka.common.MetricName;
32+
import org.apache.kafka.common.serialization.Deserializer;
33+
import org.jspecify.annotations.Nullable;
34+
35+
import org.springframework.beans.factory.BeanNameAware;
36+
import org.springframework.util.Assert;
37+
38+
/**
39+
* The {@link ShareConsumerFactory} implementation to produce new {@link ShareConsumer} instances
40+
* for provided {@link Map} {@code configs} and optional {@link Deserializer}s on each
41+
* {@link #createShareConsumer(String, String)} invocation.
42+
* <p>
43+
* If you are using {@link Deserializer}s that have no-arg constructors and require no setup, then simplest to
44+
* specify {@link Deserializer} classes in the configs passed to the
45+
* {@link DefaultShareConsumerFactory} constructor.
46+
* <p>
47+
* If that is not possible, but you are using {@link Deserializer}s that may be shared between all {@link ShareConsumer}
48+
* instances (and specifically that their close() method is a no-op), then you can pass in {@link Deserializer}
49+
* instances for one or both of the key and value deserializers.
50+
* <p>
51+
* If neither of the above is true then you may provide a {@link Supplier} for one or both {@link Deserializer}s
52+
* which will be used to obtain {@link Deserializer}(s) each time a {@link ShareConsumer} is created by the factory.
53+
*
54+
* @param <K> the key type.
55+
* @param <V> the value type.
56+
*
57+
* @author Soby Chacko
58+
* @since 4.0
59+
*/
60+
public class DefaultShareConsumerFactory<K, V> extends KafkaResourceFactory
61+
implements ShareConsumerFactory<K, V>, BeanNameAware {
62+
63+
private final Map<String, Object> configs;
64+
65+
private @Nullable Supplier<@Nullable Deserializer<K>> keyDeserializerSupplier;
66+
67+
private @Nullable Supplier<@Nullable Deserializer<V>> valueDeserializerSupplier;
68+
69+
private boolean configureDeserializers = true;
70+
71+
private final List<Listener<K, V>> listeners = new ArrayList<>();
72+
73+
private String beanName = "not.managed.by.Spring";
74+
75+
/**
76+
* Construct a factory with the provided configuration.
77+
* @param configs the configuration.
78+
*/
79+
public DefaultShareConsumerFactory(Map<String, Object> configs) {
80+
this(configs, null, null);
81+
}
82+
83+
/**
84+
* Construct a factory with the provided configuration and deserializer suppliers.
85+
* When the suppliers are invoked to get an instance, the deserializers'
86+
* {@code configure()} methods will be called with the configuration map.
87+
* @param configs the configuration.
88+
* @param keyDeserializerSupplier the key {@link Deserializer} supplier function (nullable).
89+
* @param valueDeserializerSupplier the value {@link Deserializer} supplier function (nullable).
90+
*/
91+
public DefaultShareConsumerFactory(Map<String, Object> configs,
92+
@Nullable Supplier<@Nullable Deserializer<K>> keyDeserializerSupplier,
93+
@Nullable Supplier<@Nullable Deserializer<V>> valueDeserializerSupplier) {
94+
this(configs, keyDeserializerSupplier, valueDeserializerSupplier, true);
95+
}
96+
97+
/**
98+
* Construct a factory with the provided configuration and deserializers.
99+
* The deserializers' {@code configure()} methods will be called with the
100+
* configuration map unless {@code configureDeserializers} is false.
101+
* @param configs the configuration.
102+
* @param keyDeserializer the key {@link Deserializer}.
103+
* @param valueDeserializer the value {@link Deserializer}.
104+
* @param configureDeserializers false to not configure the deserializers.
105+
*/
106+
public DefaultShareConsumerFactory(Map<String, Object> configs,
107+
@Nullable Deserializer<K> keyDeserializer,
108+
@Nullable Deserializer<V> valueDeserializer, boolean configureDeserializers) {
109+
this(configs, keyDeserializer != null ? () -> keyDeserializer : null,
110+
valueDeserializer != null ? () -> valueDeserializer : null, configureDeserializers);
111+
}
112+
113+
/**
114+
* Construct a factory with the provided configuration, deserializer suppliers, and deserializer config flag.
115+
* When the suppliers are invoked to get an instance, the deserializers'
116+
* {@code configure()} methods will be called with the configuration map unless
117+
* {@code configureDeserializers} is false.
118+
* @param configs the configuration.
119+
* @param keyDeserializerSupplier the key {@link Deserializer} supplier function (nullable).
120+
* @param valueDeserializerSupplier the value {@link Deserializer} supplier function (nullable).
121+
* @param configureDeserializers whether to configure deserializers.
122+
*/
123+
public DefaultShareConsumerFactory(Map<String, Object> configs,
124+
@Nullable Supplier<@Nullable Deserializer<K>> keyDeserializerSupplier,
125+
@Nullable Supplier<@Nullable Deserializer<V>> valueDeserializerSupplier,
126+
boolean configureDeserializers) {
127+
this.configs = new ConcurrentHashMap<>(configs);
128+
this.configureDeserializers = configureDeserializers;
129+
this.keyDeserializerSupplier = keyDeserializerSupplier;
130+
this.valueDeserializerSupplier = valueDeserializerSupplier;
131+
}
132+
133+
/**
134+
* Create a share consumer with the provided group id and client id.
135+
* @param groupId the group id (maybe null).
136+
* @param clientId the client id.
137+
* @return the share consumer.
138+
*/
139+
@Override
140+
public ShareConsumer<K, V> createShareConsumer(@Nullable String groupId, @Nullable String clientId) {
141+
return createRawConsumer(groupId, clientId);
142+
}
143+
144+
/**
145+
* Actually create the consumer.
146+
* @param groupId the group id (maybe null).
147+
* @param clientId the client id.
148+
* @return the share consumer.
149+
*/
150+
protected ShareConsumer<K, V> createRawConsumer(@Nullable String groupId, @Nullable String clientId) {
151+
Map<String, Object> consumerProperties = new HashMap<>(this.configs);
152+
if (groupId != null) {
153+
consumerProperties.put("group.id", groupId);
154+
}
155+
if (clientId != null) {
156+
consumerProperties.put("client.id", clientId);
157+
}
158+
return new ExtendedShareConsumer(consumerProperties);
159+
}
160+
161+
@Override
162+
public void setBeanName(String name) {
163+
this.beanName = name;
164+
}
165+
166+
/**
167+
* Set the key deserializer. The deserializer will be configured using the consumer
168+
* configuration, unless {@link #setConfigureDeserializers(boolean)
169+
* configureDeserializers} is false.
170+
* @param keyDeserializer the deserializer.
171+
*/
172+
public void setKeyDeserializer(@Nullable Deserializer<K> keyDeserializer) {
173+
this.keyDeserializerSupplier = () -> keyDeserializer;
174+
}
175+
176+
/**
177+
* Set the value deserializer. The deserializer will be configured using the consumer
178+
* configuration, unless {@link #setConfigureDeserializers(boolean)
179+
* configureDeserializers} is false.
180+
* @param valueDeserializer the value deserializer.
181+
*/
182+
public void setValueDeserializer(@Nullable Deserializer<V> valueDeserializer) {
183+
this.valueDeserializerSupplier = () -> valueDeserializer;
184+
}
185+
186+
@Override
187+
@Nullable
188+
public Deserializer<K> getKeyDeserializer() {
189+
return this.keyDeserializerSupplier != null ? this.keyDeserializerSupplier.get() : null;
190+
}
191+
192+
@Override
193+
@Nullable
194+
public Deserializer<V> getValueDeserializer() {
195+
return this.valueDeserializerSupplier != null ? this.valueDeserializerSupplier.get() : null;
196+
}
197+
198+
/**
199+
* Set a supplier to supply instances of the key deserializer. The deserializer will
200+
* be configured using the consumer configuration, unless
201+
* {@link #setConfigureDeserializers(boolean) configureDeserializers} is false.
202+
* @param keyDeserializerSupplier the supplier (nullable).
203+
*/
204+
public void setKeyDeserializerSupplier(@Nullable Supplier<@Nullable Deserializer<K>> keyDeserializerSupplier) {
205+
this.keyDeserializerSupplier = keyDeserializerSupplier;
206+
}
207+
208+
/**
209+
* Set a supplier to supply instances of the value deserializer. The deserializer will
210+
* be configured using the consumer configuration, unless
211+
* {@link #setConfigureDeserializers(boolean) configureDeserializers} is false.
212+
* @param valueDeserializerSupplier the supplier (nullable).
213+
*/
214+
public void setValueDeserializerSupplier(@Nullable Supplier<@Nullable Deserializer<V>> valueDeserializerSupplier) {
215+
this.valueDeserializerSupplier = valueDeserializerSupplier;
216+
}
217+
218+
/**
219+
* Set to false (default true) to prevent programmatically provided deserializers (via
220+
* constructor or setters) from being configured using the consumer configuration,
221+
* e.g. if the deserializers are already fully configured.
222+
* @param configureDeserializers false to not configure.
223+
* @see #setKeyDeserializer(Deserializer)
224+
* @see #setKeyDeserializerSupplier(Supplier)
225+
* @see #setValueDeserializer(Deserializer)
226+
* @see #setValueDeserializerSupplier(Supplier)
227+
**/
228+
public void setConfigureDeserializers(boolean configureDeserializers) {
229+
this.configureDeserializers = configureDeserializers;
230+
}
231+
232+
/**
233+
* Get the current list of listeners.
234+
* @return the listeners.
235+
*/
236+
@Override
237+
public List<Listener<K, V>> getListeners() {
238+
return Collections.unmodifiableList(this.listeners);
239+
}
240+
241+
/**
242+
* Add a listener.
243+
* @param listener the listener.
244+
*/
245+
@Override
246+
public void addListener(Listener<K, V> listener) {
247+
Assert.notNull(listener, "'listener' cannot be null");
248+
this.listeners.add(listener);
249+
}
250+
251+
/**
252+
* Add a listener at a specific index.
253+
* <p>
254+
* This method allows insertion of a listener at a particular position in the internal listener list.
255+
* While this enables ordering of listener callbacks (which can be important for certain monitoring or extension scenarios),
256+
* there is intentionally no corresponding {@code removeListener(int index)} contract. Removing listeners by index is
257+
* discouraged because the position of a listener can change if others are added or removed, making it easy to
258+
* accidentally remove the wrong one. Managing listeners by their reference (object) is safer and less error-prone,
259+
* especially as listeners are usually set up once during initialization.
260+
* {@see #removeListener(Listener)}
261+
* </p>
262+
* @param index the index (list position).
263+
* @param listener the listener to add.
264+
*/
265+
@Override
266+
public void addListener(int index, Listener<K, V> listener) {
267+
Assert.notNull(listener, "'listener' cannot be null");
268+
if (index >= this.listeners.size()) {
269+
this.listeners.add(listener);
270+
}
271+
else {
272+
this.listeners.add(index, listener);
273+
}
274+
}
275+
276+
/**
277+
* Remove a listener.
278+
* @param listener the listener.
279+
* @return true if removed.
280+
*/
281+
@Override
282+
public boolean removeListener(Listener<K, V> listener) {
283+
return this.listeners.remove(listener);
284+
}
285+
286+
@Nullable
287+
private Deserializer<K> keyDeserializer(Map<String, Object> configs) {
288+
Deserializer<K> deserializer =
289+
this.keyDeserializerSupplier != null
290+
? this.keyDeserializerSupplier.get()
291+
: null;
292+
if (deserializer != null && this.configureDeserializers) {
293+
deserializer.configure(configs, true);
294+
}
295+
return deserializer;
296+
}
297+
298+
@Nullable
299+
private Deserializer<V> valueDeserializer(Map<String, Object> configs) {
300+
Deserializer<V> deserializer =
301+
this.valueDeserializerSupplier != null
302+
? this.valueDeserializerSupplier.get()
303+
: null;
304+
if (deserializer != null && this.configureDeserializers) {
305+
deserializer.configure(configs, false);
306+
}
307+
return deserializer;
308+
}
309+
310+
@Override
311+
public Map<String, Object> getConfigurationProperties() {
312+
return Collections.unmodifiableMap(this.configs);
313+
}
314+
315+
protected class ExtendedShareConsumer extends KafkaShareConsumer<K, V> {
316+
317+
private @Nullable String idForListeners;
318+
319+
protected ExtendedShareConsumer(Map<String, Object> configProps) {
320+
super(configProps, keyDeserializer(configProps), valueDeserializer(configProps));
321+
322+
if (!DefaultShareConsumerFactory.this.listeners.isEmpty()) {
323+
Iterator<MetricName> metricIterator = metrics().keySet().iterator();
324+
String clientId = "unknown";
325+
if (metricIterator.hasNext()) {
326+
clientId = metricIterator.next().tags().get("client-id");
327+
}
328+
this.idForListeners = DefaultShareConsumerFactory.this.beanName + "." + clientId;
329+
for (Listener<K, V> listener : DefaultShareConsumerFactory.this.listeners) {
330+
listener.consumerAdded(this.idForListeners, this);
331+
}
332+
}
333+
}
334+
335+
@Override
336+
public void close() {
337+
super.close();
338+
notifyConsumerRemoved();
339+
}
340+
341+
@Override
342+
public void close(Duration timeout) {
343+
super.close(timeout);
344+
notifyConsumerRemoved();
345+
}
346+
347+
private void notifyConsumerRemoved() {
348+
for (Listener<K, V> listener : DefaultShareConsumerFactory.this.listeners) {
349+
listener.consumerRemoved(this.idForListeners, this);
350+
}
351+
}
352+
353+
}
354+
355+
}

0 commit comments

Comments
 (0)