Skip to content

Commit b30e4b3

Browse files
garyrussellartembilan
authored andcommitted
GH-2099: Deprecate RetryingBatchErrorHandler
Resolves #2099 Similar to other legacy error handlers, deprecate this one because its functionality is replaced by the `DefaultErrorHandler`. Eventually move logic to `FallbackBatchErrorHandler` which is package private. **cherry-pick to 2.8.x**
1 parent 38d3413 commit b30e4b3

File tree

6 files changed

+82
-27
lines changed

6 files changed

+82
-27
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 the original author or authors.
2+
* Copyright 2021-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -36,11 +36,11 @@
3636
* the partitions for the remaining records will be repositioned and/or the failed record
3737
* can be recovered and skipped. If some other exception is thrown, or a valid record is
3838
* not provided in the exception, error handling is delegated to a
39-
* {@link RetryingBatchErrorHandler} with this handler's {@link BackOff}. If the record is
39+
* {@link FallbackBatchErrorHandler} with this handler's {@link BackOff}. If the record is
4040
* recovered, its offset is committed. This is a replacement for the legacy
4141
* {@link SeekToCurrentErrorHandler} and {@link SeekToCurrentBatchErrorHandler} (but the
4242
* fallback now can send the messages to a recoverer after retries are completed instead
43-
* of retring indefinitely).
43+
* of retrying indefinitely).
4444
*
4545
* @author Gary Russell
4646
*
@@ -90,7 +90,7 @@ public DefaultErrorHandler(@Nullable ConsumerRecordRecoverer recoverer, BackOff
9090
}
9191

9292
private static CommonErrorHandler createFallback(BackOff backOff, @Nullable ConsumerRecordRecoverer recoverer) {
93-
return new ErrorHandlerAdapter(new RetryingBatchErrorHandler(backOff, recoverer));
93+
return new ErrorHandlerAdapter(new FallbackBatchErrorHandler(backOff, recoverer));
9494
}
9595

9696
/**

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 the original author or authors.
2+
* Copyright 2021-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -54,7 +54,7 @@ public abstract class FailedBatchProcessor extends FailedRecordProcessor {
5454

5555
private static final LoggingCommitCallback LOGGING_COMMIT_CALLBACK = new LoggingCommitCallback();
5656

57-
private final CommonErrorHandler fallbackHandler;
57+
private final CommonErrorHandler fallbackBatchHandler;
5858

5959
/**
6060
* Construct an instance with the provided properties.
@@ -66,7 +66,7 @@ public FailedBatchProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception
6666
CommonErrorHandler fallbackHandler) {
6767

6868
super(recoverer, backOff);
69-
this.fallbackHandler = fallbackHandler;
69+
this.fallbackBatchHandler = fallbackHandler;
7070
}
7171

7272
protected void doHandle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
@@ -75,7 +75,7 @@ protected void doHandle(Exception thrownException, ConsumerRecords<?, ?> data, C
7575
BatchListenerFailedException batchListenerFailedException = getBatchListenerFailedException(thrownException);
7676
if (batchListenerFailedException == null) {
7777
this.logger.debug(thrownException, "Expected a BatchListenerFailedException; re-seeking batch");
78-
this.fallbackHandler.handleBatch(thrownException, data, consumer, container, invokeListener);
78+
this.fallbackBatchHandler.handleBatch(thrownException, data, consumer, container, invokeListener);
7979
}
8080
else {
8181
ConsumerRecord<?, ?> record = batchListenerFailedException.getRecord();
@@ -84,7 +84,7 @@ protected void doHandle(Exception thrownException, ConsumerRecords<?, ?> data, C
8484
this.logger.warn(batchListenerFailedException, () ->
8585
String.format("Record not found in batch: %s-%d@%d; re-seeking batch",
8686
record.topic(), record.partition(), record.offset()));
87-
this.fallbackHandler.handleBatch(thrownException, data, consumer, container, invokeListener);
87+
this.fallbackBatchHandler.handleBatch(thrownException, data, consumer, container, invokeListener);
8888
}
8989
else {
9090
seekOrRecover(thrownException, data, consumer, container, index);
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import org.springframework.lang.Nullable;
20+
import org.springframework.util.backoff.BackOff;
21+
import org.springframework.util.backoff.FixedBackOff;
22+
23+
/**
24+
* A batch error handler used by the default error handler when the listener does
25+
* not throw a {@link BatchListenerFailedException}.
26+
*
27+
* @author Gary Russell
28+
* @since 2.8.3
29+
*
30+
*/
31+
@SuppressWarnings("deprecation")
32+
class FallbackBatchErrorHandler extends RetryingBatchErrorHandler {
33+
34+
/**
35+
* Construct an instance with a default {@link FixedBackOff} (unlimited attempts with
36+
* a 5 second back off).
37+
*/
38+
FallbackBatchErrorHandler() {
39+
super();
40+
}
41+
42+
/**
43+
* Construct an instance with the provided {@link BackOff} and
44+
* {@link ConsumerRecordRecoverer}. If the recoverer is {@code null}, the discarded
45+
* records (topic-partition{@literal @}offset) will be logged.
46+
* @param backOff the back off.
47+
* @param recoverer the recoverer.
48+
*/
49+
FallbackBatchErrorHandler(BackOff backOff, @Nullable ConsumerRecordRecoverer recoverer) {
50+
super(backOff, recoverer);
51+
}
52+
53+
}

spring-kafka/src/main/java/org/springframework/kafka/listener/RetryingBatchErrorHandler.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2021 the original author or authors.
2+
* Copyright 2020-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -37,12 +37,14 @@
3737
*
3838
* @author Gary Russell
3939
* @since 2.3.7
40+
* @deprecated in favor of {@link DefaultErrorHandler}.
4041
*
4142
*/
43+
@Deprecated
4244
public class RetryingBatchErrorHandler extends KafkaExceptionLogLevelAware
4345
implements ListenerInvokingBatchErrorHandler {
4446

45-
private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(RetryingBatchErrorHandler.class));
47+
private final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass()));
4648

4749
private final BackOff backOff;
4850

@@ -72,7 +74,7 @@ public RetryingBatchErrorHandler(BackOff backOff, @Nullable ConsumerRecordRecove
7274
this.backOff = backOff;
7375
this.recoverer = (crs, ex) -> {
7476
if (recoverer == null) {
75-
LOGGER.error(ex, () -> "Records discarded: " + ErrorHandlingUtils.recordsToString(crs));
77+
this.logger.error(ex, () -> "Records discarded: " + ErrorHandlingUtils.recordsToString(crs));
7678
}
7779
else {
7880
crs.spliterator().forEachRemaining(rec -> recoverer.accept(rec, ex));
@@ -95,11 +97,11 @@ public void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> re
9597
Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) {
9698

9799
if (records == null || records.count() == 0) {
98-
LOGGER.error(thrownException, "Called with no records; consumer exception");
100+
this.logger.error(thrownException, "Called with no records; consumer exception");
99101
return;
100102
}
101103
ErrorHandlingUtils.retryBatch(thrownException, records, consumer, container, invokeListener, this.backOff,
102-
this.seeker, this.recoverer, LOGGER, getLogLevel());
104+
this.seeker, this.recoverer, this.logger, getLogLevel());
103105
}
104106

105107
}
Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,11 @@
5656
*
5757
*/
5858
@EmbeddedKafka(topics = {
59-
RetryingBatchErrorHandlerIntegrationTests.topic1,
60-
RetryingBatchErrorHandlerIntegrationTests.topic1DLT,
61-
RetryingBatchErrorHandlerIntegrationTests.topic2,
62-
RetryingBatchErrorHandlerIntegrationTests.topic2DLT})
63-
public class RetryingBatchErrorHandlerIntegrationTests {
59+
FallbackBatchErrorHandlerIntegrationTests.topic1,
60+
FallbackBatchErrorHandlerIntegrationTests.topic1DLT,
61+
FallbackBatchErrorHandlerIntegrationTests.topic2,
62+
FallbackBatchErrorHandlerIntegrationTests.topic2DLT})
63+
public class FallbackBatchErrorHandlerIntegrationTests {
6464

6565
public static final String topic1 = "retryTopic1";
6666

@@ -115,7 +115,7 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
115115
}
116116

117117
};
118-
RetryingBatchErrorHandler errorHandler = new RetryingBatchErrorHandler(new FixedBackOff(0L, 3), recoverer);
118+
FallbackBatchErrorHandler errorHandler = new FallbackBatchErrorHandler(new FixedBackOff(0L, 3), recoverer);
119119
container.setBatchErrorHandler(errorHandler);
120120
final CountDownLatch stopLatch = new CountDownLatch(1);
121121
container.setApplicationEventPublisher(e -> {
@@ -186,7 +186,7 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
186186
}
187187

188188
};
189-
RetryingBatchErrorHandler errorHandler = new RetryingBatchErrorHandler(new FixedBackOff(0L, 3), recoverer);
189+
FallbackBatchErrorHandler errorHandler = new FallbackBatchErrorHandler(new FixedBackOff(0L, 3), recoverer);
190190
container.setBatchErrorHandler(errorHandler);
191191
final CountDownLatch stopLatch = new CountDownLatch(1);
192192
container.setApplicationEventPublisher(e -> {
@@ -226,7 +226,7 @@ void consumerEx() throws InterruptedException {
226226
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf,
227227
containerProps);
228228
CountDownLatch called = new CountDownLatch(1);
229-
container.setBatchErrorHandler(new RetryingBatchErrorHandler() {
229+
container.setBatchErrorHandler(new FallbackBatchErrorHandler() {
230230

231231
@Override
232232
public void handle(Exception thrownException, ConsumerRecords<?, ?> records, Consumer<?, ?> consumer,

spring-kafka/src/test/java/org/springframework/kafka/listener/RetryingBatchErrorHandlerTests.java renamed to spring-kafka/src/test/java/org/springframework/kafka/listener/FallbackBatchErrorHandlerTests.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020 the original author or authors.
2+
* Copyright 2020-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -43,15 +43,15 @@
4343
* @since 2.3.7
4444
*
4545
*/
46-
public class RetryingBatchErrorHandlerTests {
46+
public class FallbackBatchErrorHandlerTests {
4747

4848
private int invoked;
4949

5050
@Test
5151
void recover() {
5252
this.invoked = 0;
5353
List<ConsumerRecord<?, ?>> recovered = new ArrayList<>();
54-
RetryingBatchErrorHandler eh = new RetryingBatchErrorHandler(new FixedBackOff(0L, 3L), (cr, ex) -> {
54+
FallbackBatchErrorHandler eh = new FallbackBatchErrorHandler(new FixedBackOff(0L, 3L), (cr, ex) -> {
5555
recovered.add(cr);
5656
});
5757
Map<TopicPartition, List<ConsumerRecord<Object, Object>>> map = new HashMap<>();
@@ -79,7 +79,7 @@ void recover() {
7979
void successOnRetry() {
8080
this.invoked = 0;
8181
List<ConsumerRecord<?, ?>> recovered = new ArrayList<>();
82-
RetryingBatchErrorHandler eh = new RetryingBatchErrorHandler(new FixedBackOff(0L, 3L), (cr, ex) -> {
82+
FallbackBatchErrorHandler eh = new FallbackBatchErrorHandler(new FixedBackOff(0L, 3L), (cr, ex) -> {
8383
recovered.add(cr);
8484
});
8585
Map<TopicPartition, List<ConsumerRecord<Object, Object>>> map = new HashMap<>();
@@ -104,7 +104,7 @@ void successOnRetry() {
104104
void recoveryFails() {
105105
this.invoked = 0;
106106
List<ConsumerRecord<?, ?>> recovered = new ArrayList<>();
107-
RetryingBatchErrorHandler eh = new RetryingBatchErrorHandler(new FixedBackOff(0L, 3L), (cr, ex) -> {
107+
FallbackBatchErrorHandler eh = new FallbackBatchErrorHandler(new FixedBackOff(0L, 3L), (cr, ex) -> {
108108
recovered.add(cr);
109109
throw new RuntimeException("can't recover");
110110
});

0 commit comments

Comments
 (0)