Skip to content

Commit 7c2178a

Browse files
authored
GH-2089: Remove 2.x Deprecations
Resolves #2089 - remove deprecations - remove implementations of legacy error handlers - deprecate legacy error handler interfaces * Fix deprecation warnings. * Fix more deprecation warnings.
1 parent b30e4b3 commit 7c2178a

File tree

73 files changed

+327
-3456
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

73 files changed

+327
-3456
lines changed

spring-kafka-docs/src/main/asciidoc/whats-new.adoc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ For changes in earlier version, see <<history>>.
66
[[x28-kafka-client]]
77
==== Kafka Client Version
88

9-
This version requires the 3.0.0 `kafka-clients`
9+
This version requires the 3.1.0 `kafka-clients`
1010

11-
IMPORTANT: When using transactions, `kafka-clients` 3.0.0 and later no longer support `EOSMode.V2` (aka `BETA`) (and automatic fallback to `V1` - aka `ALPHA`) with brokers earlier than 2.5; you must therefore override the default `EOSMode` (`V2`) with `V1` if your brokers are older (or upgrade your brokers).
11+
IMPORTANT: When using transactions, the minimum broker version is 2.5.
1212

1313
See <<exactly-once>> and https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics[KIP-447] for more information.
1414

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java

Lines changed: 10 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,9 @@
3535
import org.springframework.kafka.core.KafkaTemplate;
3636
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
3737
import org.springframework.kafka.listener.AfterRollbackProcessor;
38-
import org.springframework.kafka.listener.BatchErrorHandler;
3938
import org.springframework.kafka.listener.BatchInterceptor;
4039
import org.springframework.kafka.listener.CommonErrorHandler;
4140
import org.springframework.kafka.listener.ContainerProperties;
42-
import org.springframework.kafka.listener.ErrorHandler;
43-
import org.springframework.kafka.listener.GenericErrorHandler;
4441
import org.springframework.kafka.listener.RecordInterceptor;
4542
import org.springframework.kafka.listener.adapter.BatchToRecordAdapter;
4643
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
@@ -49,7 +46,6 @@
4946
import org.springframework.kafka.support.JavaUtils;
5047
import org.springframework.kafka.support.TopicPartitionOffset;
5148
import org.springframework.kafka.support.converter.MessageConverter;
52-
import org.springframework.retry.RecoveryCallback;
5349
import org.springframework.retry.support.RetryTemplate;
5450
import org.springframework.util.Assert;
5551

@@ -74,7 +70,8 @@ public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMe
7470

7571
private final ContainerProperties containerProperties = new ContainerProperties((Pattern) null); // NOSONAR
7672

77-
private GenericErrorHandler<?> errorHandler;
73+
@SuppressWarnings("deprecation")
74+
private org.springframework.kafka.listener.GenericErrorHandler<?> errorHandler;
7875

7976
private CommonErrorHandler commonErrorHandler;
8077

@@ -90,10 +87,6 @@ public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMe
9087

9188
private Boolean ackDiscarded;
9289

93-
private RetryTemplate retryTemplate;
94-
95-
private RecoveryCallback<? extends Object> recoveryCallback;
96-
9790
private Boolean statefulRetry;
9891

9992
private Boolean batchListener;
@@ -177,25 +170,6 @@ public void setAckDiscarded(Boolean ackDiscarded) {
177170
this.ackDiscarded = ackDiscarded;
178171
}
179172

180-
/**
181-
* Set a retryTemplate.
182-
* @param retryTemplate the template.
183-
* @deprecated since 2.8 - use a suitably configured error handler instead.
184-
*/
185-
@Deprecated
186-
public void setRetryTemplate(RetryTemplate retryTemplate) {
187-
this.retryTemplate = retryTemplate;
188-
}
189-
190-
/**
191-
* Set a callback to be used with the {@link #setRetryTemplate(RetryTemplate)
192-
* retryTemplate}.
193-
* @param recoveryCallback the callback.
194-
*/
195-
public void setRecoveryCallback(RecoveryCallback<? extends Object> recoveryCallback) {
196-
this.recoveryCallback = recoveryCallback;
197-
}
198-
199173
/**
200174
* When using a {@link RetryTemplate} Set to true to enable stateful retry. Use in
201175
* conjunction with a
@@ -257,7 +231,7 @@ public void setReplyTemplate(KafkaTemplate<?, ?> replyTemplate) {
257231
* @see #setCommonErrorHandler(CommonErrorHandler)
258232
*/
259233
@Deprecated
260-
public void setErrorHandler(ErrorHandler errorHandler) {
234+
public void setErrorHandler(org.springframework.kafka.listener.ErrorHandler errorHandler) {
261235
this.errorHandler = errorHandler;
262236
}
263237

@@ -269,13 +243,14 @@ public void setErrorHandler(ErrorHandler errorHandler) {
269243
* @see #setCommonErrorHandler(CommonErrorHandler)
270244
*/
271245
@Deprecated
272-
public void setBatchErrorHandler(BatchErrorHandler errorHandler) {
246+
public void setBatchErrorHandler(org.springframework.kafka.listener.BatchErrorHandler errorHandler) {
273247
this.errorHandler = errorHandler;
274248
}
275249

276250
/**
277-
* Set the {@link CommonErrorHandler} which can handle errors for both record
278-
* and batch listeners. Replaces the use of {@link GenericErrorHandler}s.
251+
* Set the {@link CommonErrorHandler} which can handle errors for both record and
252+
* batch listeners. Replaces the use of
253+
* {@link org.springframework.kafka.listener.GenericErrorHandler}s.
279254
* @param commonErrorHandler the handler.
280255
* @since 2.8
281256
*/
@@ -361,16 +336,17 @@ public void setContainerCustomizer(ContainerCustomizer<K, V, C> containerCustomi
361336
this.containerCustomizer = containerCustomizer;
362337
}
363338

339+
@SuppressWarnings("deprecation")
364340
@Override
365341
public void afterPropertiesSet() {
366342
if (this.commonErrorHandler == null && this.errorHandler != null) {
367343
if (Boolean.TRUE.equals(this.batchListener)) {
368-
Assert.state(this.errorHandler instanceof BatchErrorHandler,
344+
Assert.state(this.errorHandler instanceof org.springframework.kafka.listener.BatchErrorHandler,
369345
() -> "The error handler must be a BatchErrorHandler, not " +
370346
this.errorHandler.getClass().getName());
371347
}
372348
else {
373-
Assert.state(this.errorHandler instanceof ErrorHandler,
349+
Assert.state(this.errorHandler instanceof org.springframework.kafka.listener.ErrorHandler,
374350
() -> "The error handler must be an ErrorHandler, not " +
375351
this.errorHandler.getClass().getName());
376352
}
@@ -398,8 +374,6 @@ private void configureEndpoint(AbstractKafkaListenerEndpoint<K, V> aklEndpoint)
398374
JavaUtils.INSTANCE
399375
.acceptIfNotNull(this.recordFilterStrategy, aklEndpoint::setRecordFilterStrategy)
400376
.acceptIfNotNull(this.ackDiscarded, aklEndpoint::setAckDiscarded)
401-
.acceptIfNotNull(this.retryTemplate, aklEndpoint::setRetryTemplate)
402-
.acceptIfNotNull(this.recoveryCallback, aklEndpoint::setRecoveryCallback)
403377
.acceptIfNotNull(this.statefulRetry, aklEndpoint::setStatefulRetry)
404378
.acceptIfNotNull(this.replyTemplate, aklEndpoint::setReplyTemplate)
405379
.acceptIfNotNull(this.replyHeadersConfigurer, aklEndpoint::setReplyHeadersConfigurer)

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java

Lines changed: 2 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2021 the original author or authors.
2+
* Copyright 2014-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -94,8 +94,6 @@ public abstract class AbstractKafkaListenerEndpoint<K, V>
9494

9595
private boolean ackDiscarded;
9696

97-
private RetryTemplate retryTemplate;
98-
9997
private RecoveryCallback<? extends Object> recoveryCallback;
10098

10199
private boolean statefulRetry;
@@ -326,34 +324,6 @@ public void setAckDiscarded(boolean ackDiscarded) {
326324
this.ackDiscarded = ackDiscarded;
327325
}
328326

329-
@Nullable
330-
protected RetryTemplate getRetryTemplate() {
331-
return this.retryTemplate;
332-
}
333-
334-
/**
335-
* Set a retryTemplate.
336-
* @param retryTemplate the template.
337-
* @deprecated since 2.8 - use a suitably configured error handler instead.
338-
*/
339-
@Deprecated
340-
public void setRetryTemplate(RetryTemplate retryTemplate) {
341-
this.retryTemplate = retryTemplate;
342-
}
343-
344-
@Nullable
345-
protected RecoveryCallback<?> getRecoveryCallback() {
346-
return this.recoveryCallback;
347-
}
348-
349-
/**
350-
* Set a callback to be used with the {@link #setRetryTemplate(RetryTemplate)}.
351-
* @param recoveryCallback the callback.
352-
*/
353-
public void setRecoveryCallback(RecoveryCallback<? extends Object> recoveryCallback) {
354-
this.recoveryCallback = recoveryCallback;
355-
}
356-
357327
protected boolean isStatefulRetry() {
358328
return this.statefulRetry;
359329
}
@@ -510,7 +480,7 @@ public void setupListenerContainer(MessageListenerContainer listenerContainer,
510480
protected abstract MessagingMessageListenerAdapter<K, V> createMessageListener(MessageListenerContainer container,
511481
@Nullable MessageConverter messageConverter);
512482

513-
@SuppressWarnings({ "unchecked", "deprecation" })
483+
@SuppressWarnings("unchecked")
514484
private void setupMessageListener(MessageListenerContainer container,
515485
@Nullable MessageConverter messageConverter) {
516486

@@ -523,14 +493,6 @@ private void setupMessageListener(MessageListenerContainer container,
523493
boolean isBatchListener = isBatchListener();
524494
Assert.state(messageListener != null,
525495
() -> "Endpoint [" + this + "] must provide a non null message listener");
526-
Assert.state(this.retryTemplate == null || !isBatchListener,
527-
"A 'RetryTemplate' is not supported with a batch listener; consider configuring the container "
528-
+ "with a suitably configured 'SeekToCurrentBatchErrorHandler' instead");
529-
if (this.retryTemplate != null) {
530-
messageListener = new org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter<>(
531-
(MessageListener<K, V>) messageListener,
532-
this.retryTemplate, this.recoveryCallback, this.statefulRetry);
533-
}
534496
if (this.recordFilterStrategy != null) {
535497
if (isBatchListener) {
536498
if (((MessagingMessageListenerAdapter<K, V>) messageListener).isConsumerRecords()) {

spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2021 the original author or authors.
2+
* Copyright 2017-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -89,8 +89,6 @@ public class StreamsBuilderFactoryBean extends AbstractFactoryBean<StreamsBuilde
8989

9090
private StateRestoreListener stateRestoreListener;
9191

92-
private Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
93-
9492
private StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler;
9593

9694
private boolean autoStartup = true;
@@ -191,17 +189,6 @@ public void setStateListener(KafkaStreams.StateListener stateListener) {
191189
this.stateListener = stateListener; // NOSONAR (sync)
192190
}
193191

194-
/**
195-
* Obsolete.
196-
* @param exceptionHandler the handler.
197-
* @deprecated in favor of
198-
* {@link #setStreamsUncaughtExceptionHandler(StreamsUncaughtExceptionHandler)}.
199-
*/
200-
@Deprecated
201-
public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler exceptionHandler) {
202-
this.uncaughtExceptionHandler = exceptionHandler; // NOSONAR (sync)
203-
}
204-
205192
/**
206193
* Set a {@link StreamsUncaughtExceptionHandler}. Supercedes
207194
* {@link #setUncaughtExceptionHandler(java.lang.Thread.UncaughtExceptionHandler)}.
@@ -340,9 +327,6 @@ public synchronized void start() {
340327
if (this.streamsUncaughtExceptionHandler != null) {
341328
this.kafkaStreams.setUncaughtExceptionHandler(this.streamsUncaughtExceptionHandler);
342329
}
343-
else {
344-
this.kafkaStreams.setUncaughtExceptionHandler(this.uncaughtExceptionHandler);
345-
}
346330
if (this.kafkaStreamsCustomizer != null) {
347331
this.kafkaStreamsCustomizer.customize(this.kafkaStreams);
348332
}

spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanCustomizer.java

Lines changed: 0 additions & 41 deletions
This file was deleted.

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java

Lines changed: 1 addition & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2021 the original author or authors.
2+
* Copyright 2015-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -198,35 +198,6 @@ public interface KafkaOperations<K, V> {
198198
*/
199199
void flush();
200200

201-
/**
202-
* When running in a transaction, send the consumer offset(s) to the transaction. The
203-
* group id is obtained from
204-
* {@link org.springframework.kafka.support.KafkaUtils#getConsumerGroupId()}. It is
205-
* not necessary to call this method if the operations are invoked on a listener
206-
* container thread (and the listener container is configured with a
207-
* {@link org.springframework.kafka.transaction.KafkaAwareTransactionManager}) since
208-
* the container will take care of sending the offsets to the transaction.
209-
* @param offsets The offsets.
210-
* @since 1.3
211-
* @deprecated in the 3.0.0 KafkaProducer.
212-
*/
213-
@Deprecated
214-
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets);
215-
216-
/**
217-
* When running in a transaction, send the consumer offset(s) to the transaction. It
218-
* is not necessary to call this method if the operations are invoked on a listener
219-
* container thread (and the listener container is configured with a
220-
* {@link org.springframework.kafka.transaction.KafkaAwareTransactionManager}) since
221-
* the container will take care of sending the offsets to the transaction.
222-
* @param offsets The offsets.
223-
* @param consumerGroupId the consumer's group.id.
224-
* @since 1.3
225-
* @deprecated in the 3.0.0 KafkaProducer.
226-
*/
227-
@Deprecated
228-
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId);
229-
230201
/**
231202
* When running in a transaction, send the consumer offset(s) to the transaction. It
232203
* is not necessary to call this method if the operations are invoked on a listener

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2021 the original author or authors.
2+
* Copyright 2015-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -546,20 +546,6 @@ public void flush() {
546546
}
547547
}
548548

549-
550-
@Override
551-
@Deprecated
552-
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets) {
553-
sendOffsetsToTransaction(offsets, KafkaUtils.getConsumerGroupId());
554-
}
555-
556-
@SuppressWarnings("deprecation")
557-
@Override
558-
@Deprecated
559-
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) {
560-
producerForOffsets().sendOffsetsToTransaction(offsets, consumerGroupId);
561-
}
562-
563549
@Override
564550
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
565551
ConsumerGroupMetadata groupMetadata) {

spring-kafka/src/main/java/org/springframework/kafka/core/RoutingKafkaTemplate.java

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2021 the original author or authors.
2+
* Copyright 2020-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -102,18 +102,6 @@ public <T> T executeInTransaction(OperationsCallback<Object, Object, T> callback
102102
throw new UnsupportedOperationException(THIS_METHOD_IS_NOT_SUPPORTED);
103103
}
104104

105-
@Override
106-
@Deprecated
107-
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) {
108-
throw new UnsupportedOperationException(THIS_METHOD_IS_NOT_SUPPORTED);
109-
}
110-
111-
@Override
112-
@Deprecated
113-
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets) {
114-
throw new UnsupportedOperationException(THIS_METHOD_IS_NOT_SUPPORTED);
115-
}
116-
117105
@Override
118106
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
119107
ConsumerGroupMetadata groupMetadata) {

0 commit comments

Comments
 (0)