diff --git a/src/main/java/org/springframework/data/redis/stream/StreamListener.java b/src/main/java/org/springframework/data/redis/stream/StreamListener.java index 4122a2d625..bb2f971866 100644 --- a/src/main/java/org/springframework/data/redis/stream/StreamListener.java +++ b/src/main/java/org/springframework/data/redis/stream/StreamListener.java @@ -17,10 +17,13 @@ import org.springframework.data.redis.connection.stream.Record; +import java.util.List; + /** * Listener interface to receive delivery of {@link Record messages}. * * @author Mark Paluch + * @author Yongha Kwon * @param Stream key and Stream field type. * @param Stream value type. * @since 2.2 @@ -33,5 +36,5 @@ public interface StreamListener> { * * @param message never {@literal null}. */ - void onMessage(V message); + void onMessage(List message); } diff --git a/src/main/java/org/springframework/data/redis/stream/StreamMessageListenerContainer.java b/src/main/java/org/springframework/data/redis/stream/StreamMessageListenerContainer.java index 851a1f6d66..d520d088b8 100644 --- a/src/main/java/org/springframework/data/redis/stream/StreamMessageListenerContainer.java +++ b/src/main/java/org/springframework/data/redis/stream/StreamMessageListenerContainer.java @@ -16,6 +16,7 @@ package org.springframework.data.redis.stream; import java.time.Duration; +import java.util.List; import java.util.OptionalInt; import java.util.concurrent.Executor; import java.util.function.Predicate; @@ -82,10 +83,10 @@ *

* {@link StreamMessageListenerContainer} requires a {@link Executor} to fork long-running polling tasks on a different * {@link Thread}. This thread is used as event loop to poll for stream messages and invoke the - * {@link StreamListener#onMessage(Record) listener callback}. + * {@link StreamListener#onMessage(List)} listener callback}. *

* {@link StreamMessageListenerContainer} tasks propagate errors during stream reads and - * {@link StreamListener#onMessage(Record) listener notification} to a configurable {@link ErrorHandler}. Errors stop a + * {@link StreamListener#onMessage(List)} listener notification} to a configurable {@link ErrorHandler}. Errors stop a * {@link Subscription} by default. Configuring a {@link Predicate} for a {@link StreamReadRequest} allows conditional * subscription cancelling or continuing on all errors. *

diff --git a/src/main/java/org/springframework/data/redis/stream/StreamPollTask.java b/src/main/java/org/springframework/data/redis/stream/StreamPollTask.java index 916c276478..61440be2e8 100644 --- a/src/main/java/org/springframework/data/redis/stream/StreamPollTask.java +++ b/src/main/java/org/springframework/data/redis/stream/StreamPollTask.java @@ -16,6 +16,7 @@ package org.springframework.data.redis.stream; import java.time.Duration; +import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.concurrent.CountDownLatch; @@ -147,14 +148,14 @@ private List readRecords() { } private void deserializeAndEmitRecords(List records) { + List messages = new ArrayList<>(); for (ByteRecord raw : records) { try { - pollState.updateReadOffset(raw.getId().getValue()); V record = convertRecord(raw); - listener.onMessage(record); + messages.add(record); } catch (RuntimeException ex) { if (cancelSubscriptionOnError.test(ex)) { @@ -166,8 +167,11 @@ private void deserializeAndEmitRecords(List records) { } errorHandler.handleError(ex); + return; } } + + listener.onMessage(messages); } private V convertRecord(ByteRecord record) { diff --git a/src/test/java/org/springframework/data/redis/stream/AbstractStreamMessageListenerContainerIntegrationTests.java b/src/test/java/org/springframework/data/redis/stream/AbstractStreamMessageListenerContainerIntegrationTests.java index 26318b1448..3696046604 100644 --- a/src/test/java/org/springframework/data/redis/stream/AbstractStreamMessageListenerContainerIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/stream/AbstractStreamMessageListenerContainerIntegrationTests.java @@ -91,7 +91,7 @@ void shouldReceiveMapMessages() throws InterruptedException { BlockingQueue> queue = new LinkedBlockingQueue<>(); container.start(); - Subscription subscription = container.receive(StreamOffset.create("my-stream", ReadOffset.from("0-0")), queue::add); + Subscription subscription = container.receive(StreamOffset.create("my-stream", ReadOffset.from("0-0")), queue::addAll); subscription.await(DEFAULT_TIMEOUT); @@ -119,7 +119,7 @@ void shouldReceiveSimpleObjectHashRecords() throws InterruptedException { BlockingQueue> queue = new LinkedBlockingQueue<>(); container.start(); - Subscription subscription = container.receive(StreamOffset.create("my-stream", ReadOffset.from("0-0")), queue::add); + Subscription subscription = container.receive(StreamOffset.create("my-stream", ReadOffset.from("0-0")), queue::addAll); subscription.await(DEFAULT_TIMEOUT); @@ -143,7 +143,7 @@ void shouldReceiveObjectHashRecords() throws InterruptedException { BlockingQueue> queue = new LinkedBlockingQueue<>(); container.start(); - Subscription subscription = container.receive(StreamOffset.create("my-stream", ReadOffset.from("0-0")), queue::add); + Subscription subscription = container.receive(StreamOffset.create("my-stream", ReadOffset.from("0-0")), queue::addAll); subscription.await(DEFAULT_TIMEOUT); @@ -168,7 +168,7 @@ void shouldReceiveMessagesInConsumerGroup() throws InterruptedException { container.start(); Subscription subscription = container.receive(Consumer.from("my-group", "my-consumer"), - StreamOffset.create("my-stream", ReadOffset.lastConsumed()), queue::add); + StreamOffset.create("my-stream", ReadOffset.lastConsumed()), queue::addAll); subscription.await(DEFAULT_TIMEOUT); @@ -194,7 +194,7 @@ void shouldReceiveAndAckMessagesInConsumerGroup() throws InterruptedException { container.start(); Subscription subscription = container.receiveAutoAck(Consumer.from("my-group", "my-consumer"), - StreamOffset.create("my-stream", ReadOffset.lastConsumed()), queue::add); + StreamOffset.create("my-stream", ReadOffset.lastConsumed()), queue::addAll); subscription.await(DEFAULT_TIMEOUT); @@ -316,7 +316,7 @@ void deserializationShouldContinueStreamRead() throws InterruptedException { redisTemplate.opsForStream().add("my-stream", Collections.singletonMap("payload", "3")); container.start(); - Subscription subscription = container.register(readRequest, records::add); + Subscription subscription = container.register(readRequest, records::addAll); subscription.await(DEFAULT_TIMEOUT); @@ -347,7 +347,7 @@ void cancelledStreamShouldNotReceiveMessages() throws InterruptedException { BlockingQueue> queue = new LinkedBlockingQueue<>(); container.start(); - Subscription subscription = container.receive(StreamOffset.create("my-stream", ReadOffset.from("0-0")), queue::add); + Subscription subscription = container.receive(StreamOffset.create("my-stream", ReadOffset.from("0-0")), queue::addAll); subscription.await(DEFAULT_TIMEOUT); cancelAwait(subscription); @@ -365,7 +365,7 @@ void containerRestartShouldRestartSubscription() throws InterruptedException { BlockingQueue> queue = new LinkedBlockingQueue<>(); container.start(); - Subscription subscription = container.receive(StreamOffset.create("my-stream", ReadOffset.from("0-0")), queue::add); + Subscription subscription = container.receive(StreamOffset.create("my-stream", ReadOffset.from("0-0")), queue::addAll); subscription.await(DEFAULT_TIMEOUT);