Skip to content

Commit 8bcb052

Browse files
authored
GH-1842: Add Conditional Delegating Error Handlers
Resolves #1842 * Checkstyle fixes and polishing. * Polishing - return after a delegate handles the error.
1 parent dd47e2d commit 8bcb052

File tree

6 files changed

+311
-2
lines changed

6 files changed

+311
-2
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

+8
Original file line numberDiff line numberDiff line change
@@ -5314,6 +5314,14 @@ The `ContainerStoppingBatchErrorHandler` (used with batch listeners) stops the c
53145314
After the container stops, an exception that wraps the `ListenerExecutionFailedException` is thrown.
53155315
This is to cause the transaction to roll back (if transactions are enabled).
53165316

5317+
[[cond-eh]]
5318+
===== Conditional Delegating Error Handlers
5319+
5320+
Introduced in version 2.7.4, the `ConditionalDelegatingErrorHandler` can delegate to different error handlers, depending on the exception type.
5321+
For example, you may wish to invoke a `SeekToCurrentErrorHandler` for most exceptions, or a `ContainerStoppingErrorHandler` for others.
5322+
5323+
Similarly, the `ConditionalDelegatingBatchErrorHandler` is provided.
5324+
53175325
[[after-rollback]]
53185326
===== After-rollback Processor
53195327

spring-kafka-docs/src/main/asciidoc/whats-new.adoc

+6
Original file line numberDiff line numberDiff line change
@@ -105,3 +105,9 @@ See <<container-sequencing>> for more information.
105105

106106
A new `BackOff` implementation is provided, making it more convenient to configure the max retries.
107107
See <<exp-backoff>> for more information.
108+
109+
[[x27-delegating-eh]]
110+
==== Conditional Delegating Error Handlers
111+
112+
These new error handlers can be configured to delegate to different error handlers, depending on the exception type.
113+
See <<cond-eh>> for more information.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.util.LinkedHashMap;
20+
import java.util.Map;
21+
import java.util.Map.Entry;
22+
23+
import org.apache.kafka.clients.consumer.Consumer;
24+
import org.apache.kafka.clients.consumer.ConsumerRecords;
25+
26+
import org.springframework.lang.Nullable;
27+
import org.springframework.util.Assert;
28+
29+
/**
30+
* An error handler that delegates to different error handlers, depending on the exception
31+
* type.
32+
*
33+
* @author Gary Russell
34+
* @since 2.7.4
35+
*
36+
*/
37+
public class ConditionalDelegatingBatchErrorHandler implements ContainerAwareBatchErrorHandler {
38+
39+
private final ContainerAwareBatchErrorHandler defaultErrorHandler;
40+
41+
private final Map<Class<? extends Throwable>, ContainerAwareBatchErrorHandler> delegates = new LinkedHashMap<>();
42+
43+
/**
44+
* Construct an instance with a default error handler that will be invoked if the
45+
* exception has no matches.
46+
* @param defaultErrorHandler the default error handler.
47+
*/
48+
public ConditionalDelegatingBatchErrorHandler(ContainerAwareBatchErrorHandler defaultErrorHandler) {
49+
Assert.notNull(defaultErrorHandler, "'defaultErrorHandler' cannot be null");
50+
this.defaultErrorHandler = defaultErrorHandler;
51+
}
52+
53+
/**
54+
* Set the delegate error handlers; a {@link LinkedHashMap} argument is recommended so
55+
* that the delegates are searched in a known order.
56+
* @param delegates the delegates.
57+
*/
58+
public void setErrorHandlers(Map<Class<? extends Throwable>, ContainerAwareBatchErrorHandler> delegates) {
59+
this.delegates.clear();
60+
this.delegates.putAll(delegates);
61+
}
62+
63+
/**
64+
* Add a delegate to the end of the current collection.
65+
* @param throwable the throwable for this handler.
66+
* @param handler the handler.
67+
*/
68+
public void addDelegate(Class<? extends Throwable> throwable, ContainerAwareBatchErrorHandler handler) {
69+
this.delegates.put(throwable, handler);
70+
}
71+
72+
@Override
73+
public void handle(Exception thrownException, ConsumerRecords<?, ?> records, Consumer<?, ?> consumer,
74+
MessageListenerContainer container) {
75+
76+
// Never called but, just in case
77+
doHandle(thrownException, records, consumer, container, null);
78+
}
79+
80+
@Override
81+
public void handle(Exception thrownException, ConsumerRecords<?, ?> records, Consumer<?, ?> consumer,
82+
MessageListenerContainer container, Runnable invokeListener) {
83+
84+
doHandle(thrownException, records, consumer, container, invokeListener);
85+
}
86+
87+
protected void doHandle(Exception thrownException, ConsumerRecords<?, ?> records, Consumer<?, ?> consumer,
88+
MessageListenerContainer container, @Nullable Runnable invokeListener) {
89+
90+
Throwable cause = thrownException;
91+
if (cause instanceof ListenerExecutionFailedException) {
92+
cause = thrownException.getCause();
93+
}
94+
if (cause != null) {
95+
Class<? extends Throwable> causeClass = cause.getClass();
96+
for (Entry<Class<? extends Throwable>, ContainerAwareBatchErrorHandler> entry : this.delegates.entrySet()) {
97+
if (entry.getKey().equals(causeClass)) {
98+
entry.getValue().handle(thrownException, records, consumer, container, invokeListener);
99+
return;
100+
}
101+
}
102+
}
103+
this.defaultErrorHandler.handle(thrownException, records, consumer, container, invokeListener);
104+
}
105+
106+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.util.LinkedHashMap;
20+
import java.util.List;
21+
import java.util.Map;
22+
import java.util.Map.Entry;
23+
24+
import org.apache.kafka.clients.consumer.Consumer;
25+
import org.apache.kafka.clients.consumer.ConsumerRecord;
26+
27+
import org.springframework.lang.Nullable;
28+
import org.springframework.util.Assert;
29+
30+
/**
31+
* An error handler that delegates to different error handlers, depending on the exception
32+
* type.
33+
*
34+
* @author Gary Russell
35+
* @since 2.7.4
36+
*
37+
*/
38+
public class ConditionalDelegatingErrorHandler implements ContainerAwareErrorHandler {
39+
40+
private final ContainerAwareErrorHandler defaultErrorHandler;
41+
42+
private final Map<Class<? extends Throwable>, ContainerAwareErrorHandler> delegates = new LinkedHashMap<>();
43+
44+
/**
45+
* Construct an instance with a default error handler that will be invoked if the
46+
* exception has no matches.
47+
* @param defaultErrorHandler the default error handler.
48+
*/
49+
public ConditionalDelegatingErrorHandler(ContainerAwareErrorHandler defaultErrorHandler) {
50+
Assert.notNull(defaultErrorHandler, "'defaultErrorHandler' cannot be null");
51+
this.defaultErrorHandler = defaultErrorHandler;
52+
}
53+
54+
/**
55+
* Set the delegate error handlers; a {@link LinkedHashMap} argument is recommended so
56+
* that the delegates are searched in a known order.
57+
* @param delegates the delegates.
58+
*/
59+
public void setErrorHandlers(Map<Class<? extends Throwable>, ContainerAwareErrorHandler> delegates) {
60+
this.delegates.clear();
61+
this.delegates.putAll(delegates);
62+
}
63+
64+
/**
65+
* Add a delegate to the end of the current collection.
66+
* @param throwable the throwable for this handler.
67+
* @param handler the handler.
68+
*/
69+
public void addDelegate(Class<? extends Throwable> throwable, ContainerAwareErrorHandler handler) {
70+
this.delegates.put(throwable, handler);
71+
}
72+
73+
@Override
74+
public void handle(Exception thrownException, @Nullable List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
75+
MessageListenerContainer container) {
76+
77+
boolean handled = false;
78+
Throwable cause = thrownException;
79+
if (cause instanceof ListenerExecutionFailedException) {
80+
cause = thrownException.getCause();
81+
}
82+
if (cause != null) {
83+
Class<? extends Throwable> causeClass = cause.getClass();
84+
for (Entry<Class<? extends Throwable>, ContainerAwareErrorHandler> entry : this.delegates.entrySet()) {
85+
if (entry.getKey().equals(causeClass)) {
86+
handled = true;
87+
entry.getValue().handle(thrownException, records, consumer, container);
88+
return;
89+
}
90+
}
91+
}
92+
this.defaultErrorHandler.handle(thrownException, records, consumer, container);
93+
}
94+
95+
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerAwareErrorHandler.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import org.apache.kafka.clients.consumer.Consumer;
2222
import org.apache.kafka.clients.consumer.ConsumerRecord;
2323

24+
import org.springframework.lang.Nullable;
25+
2426
/**
2527
* An error handler that has access to the unprocessed records from the last poll
2628
* (including the failed record), the consumer, and the container.
@@ -35,12 +37,14 @@
3537
public interface ContainerAwareErrorHandler extends RemainingRecordsErrorHandler {
3638

3739
@Override
38-
default void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer) {
40+
default void handle(Exception thrownException, @Nullable List<ConsumerRecord<?, ?>> records,
41+
Consumer<?, ?> consumer) {
42+
3943
throw new UnsupportedOperationException("Container should never call this");
4044
}
4145

4246
@Override
43-
void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
47+
void handle(Exception thrownException, @Nullable List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
4448
MessageListenerContainer container);
4549

4650
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import static org.mockito.ArgumentMatchers.any;
20+
import static org.mockito.Mockito.mock;
21+
import static org.mockito.Mockito.verify;
22+
23+
import java.io.IOException;
24+
import java.util.Collections;
25+
import java.util.Map;
26+
27+
import org.apache.kafka.clients.consumer.Consumer;
28+
import org.apache.kafka.clients.consumer.ConsumerRecords;
29+
import org.junit.jupiter.api.Test;
30+
31+
/**
32+
* @author Gary Russell
33+
* @since 2.7.4
34+
*
35+
*/
36+
public class ConditionalDelegatingErrorHandlerTests {
37+
38+
@Test
39+
void testRecordDelegates() {
40+
var def = mock(ContainerAwareErrorHandler.class);
41+
var one = mock(ContainerAwareErrorHandler.class);
42+
var two = mock(ContainerAwareErrorHandler.class);
43+
var three = mock(ContainerAwareErrorHandler.class);
44+
var eh = new ConditionalDelegatingErrorHandler(def);
45+
eh.setErrorHandlers(Map.of(IllegalStateException.class, one, IllegalArgumentException.class, two));
46+
eh.addDelegate(RuntimeException.class, three);
47+
48+
eh.handle(wrap(new IOException()), Collections.emptyList(), mock(Consumer.class),
49+
mock(MessageListenerContainer.class));
50+
verify(def).handle(any(), any(), any(), any());
51+
eh.handle(wrap(new RuntimeException()), Collections.emptyList(), mock(Consumer.class),
52+
mock(MessageListenerContainer.class));
53+
verify(three).handle(any(), any(), any(), any());
54+
eh.handle(wrap(new IllegalArgumentException()), Collections.emptyList(), mock(Consumer.class),
55+
mock(MessageListenerContainer.class));
56+
verify(two).handle(any(), any(), any(), any());
57+
eh.handle(wrap(new IllegalStateException()), Collections.emptyList(), mock(Consumer.class),
58+
mock(MessageListenerContainer.class));
59+
verify(one).handle(any(), any(), any(), any());
60+
}
61+
62+
@Test
63+
void testBatchDelegates() {
64+
var def = mock(ContainerAwareBatchErrorHandler.class);
65+
var one = mock(ContainerAwareBatchErrorHandler.class);
66+
var two = mock(ContainerAwareBatchErrorHandler.class);
67+
var three = mock(ContainerAwareBatchErrorHandler.class);
68+
var eh = new ConditionalDelegatingBatchErrorHandler(def);
69+
eh.setErrorHandlers(Map.of(IllegalStateException.class, one, IllegalArgumentException.class, two));
70+
eh.addDelegate(RuntimeException.class, three);
71+
72+
eh.handle(wrap(new IOException()), mock(ConsumerRecords.class), mock(Consumer.class),
73+
mock(MessageListenerContainer.class), mock(Runnable.class));
74+
verify(def).handle(any(), any(), any(), any(), any());
75+
eh.handle(wrap(new RuntimeException()), mock(ConsumerRecords.class), mock(Consumer.class),
76+
mock(MessageListenerContainer.class), mock(Runnable.class));
77+
verify(three).handle(any(), any(), any(), any(), any());
78+
eh.handle(wrap(new IllegalArgumentException()), mock(ConsumerRecords.class), mock(Consumer.class),
79+
mock(MessageListenerContainer.class), mock(Runnable.class));
80+
verify(two).handle(any(), any(), any(), any(), any());
81+
eh.handle(wrap(new IllegalStateException()), mock(ConsumerRecords.class), mock(Consumer.class),
82+
mock(MessageListenerContainer.class), mock(Runnable.class));
83+
verify(one).handle(any(), any(), any(), any(), any());
84+
}
85+
86+
private Exception wrap(Exception ex) {
87+
return new ListenerExecutionFailedException("test", ex);
88+
}
89+
90+
}

0 commit comments

Comments
 (0)