Skip to content

Commit 25ce6ed

Browse files
authored
Add Kinesis Producer Library with Spring Integration (#1493)
* Add `KplMessageHandler` - a `MessageHandler` implementation to produce into Kinesis via KPL * Add dedicated to Spring Integration with KPL starter - `spring-cloud-aws-starter-integration-kinesis-producer` * Document the `KplMessageHandler`
1 parent aaedf69 commit 25ce6ed

File tree

8 files changed

+720
-3
lines changed

8 files changed

+720
-3
lines changed

docs/src/main/asciidoc/kinesis.adoc

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,42 @@ public PollableChannel errorChannel() {
114114
}
115115
----
116116

117+
The `KplMessageHandler` is an `AbstractMessageHandler` to perform put record to the Kinesis stream using https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html[Kinesis Producer Library (KPL)].
118+
The configuration and behavior are similar to the `KinesisMessageHandler` described above, with the difference that it supports only a single `UserRecord` production according to KPL API.
119+
The request message payload could be as a `UserRecord`.
120+
Otherwise, such an instance is built against request messages and respective `KplMessageHandler` options, include https://docs.aws.amazon.com/streams/latest/dev/kpl-with-schemaregistry.html[AWS Glue Schema] for the record serialization.
121+
122+
Due to asynchronous behavior and buffer of the `KinesisProducer`, a `KplBackpressureException` could be thrown from the `KplMessageHandler` when `backPressureThreshold` as the number of outstanding records is provided.
123+
124+
The configuration of the `KplMessageHandler` is straightforward:
125+
126+
[source,java]
127+
----
128+
@Bean
129+
RequestHandlerRetryAdvice retryAdvice() {
130+
RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice();
131+
requestHandlerRetryAdvice.setRetryTemplate(RetryTemplate.builder()
132+
.retryOn(KplBackpressureException.class)
133+
.exponentialBackoff(100, 2.0, 1000)
134+
.maxAttempts(3)
135+
.build());
136+
return requestHandlerRetryAdvice;
137+
}
138+
139+
@Bean
140+
@ServiceActivator(inputChannel = "kinesisSendChannel", adviceChain = "retryAdvice")
141+
MessageHandler kplMessageHandler(KinesisProducer kinesisProducer, Schema schema) {
142+
KplMessageHandler kplMessageHandler = new KplMessageHandler(kinesisProducer);
143+
kplMessageHandler.setAsync(true);
144+
kplMessageHandler.setStream("someStream");
145+
kplMessageHandler.setBackPressureThreshold(2);
146+
kplMessageHandler.setGlueSchema(schema);
147+
return kplMessageHandler;
148+
}
149+
----
117150

118151
=== Spring Integration Starters
119152

120153
The Spring Integration dependency in the `spring-cloud-aws-kinesis` module is `optional` to avoid unnecessary artifacts on classpath when Spring Integration is not used.
121154
For convenience, a dedicated `spring-cloud-aws-starter-integration-kinesis` is provided managing all the required dependencies for Spring Integration support with a classical Amazon Kinesis client.
155+
The `spring-cloud-aws-starter-integration-kinesis-producer` artifact is dedicated for dependencies related to the Kinesis Producer Library.

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
<module>spring-cloud-aws-dynamodb</module>
4747
<module>spring-cloud-aws-kinesis</module>
4848
<module>spring-cloud-aws-starters/spring-cloud-aws-starter-integration-kinesis</module>
49+
<module>spring-cloud-aws-starters/spring-cloud-aws-starter-integration-kinesis-producer</module>
4950
<module>spring-cloud-aws-s3</module>
5051
<module>spring-cloud-aws-testcontainers</module>
5152
<module>spring-cloud-aws-starters/spring-cloud-aws-starter</module>

spring-cloud-aws-dependencies/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,11 @@
123123
<artifactId>spring-cloud-aws-starter-integration-kinesis</artifactId>
124124
<version>${project.version}</version>
125125
</dependency>
126+
<dependency>
127+
<groupId>io.awspring.cloud</groupId>
128+
<artifactId>spring-cloud-aws-starter-integration-kinesis-producer</artifactId>
129+
<version>${project.version}</version>
130+
</dependency>
126131

127132
<dependency>
128133
<groupId>io.awspring.cloud</groupId>
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright 2013-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.kinesis.integration;
17+
18+
import java.io.Serial;
19+
import software.amazon.kinesis.producer.UserRecord;
20+
21+
/**
22+
* An exception triggered from the {@link KplMessageHandler} while sending records to Kinesis when maximum number of
23+
* records in flight exceeds the backpressure threshold.
24+
*
25+
* @author Siddharth Jain
26+
* @author Artem Bilan
27+
*
28+
* @since 4.0
29+
*/
30+
public class KplBackpressureException extends RuntimeException {
31+
32+
@Serial
33+
private static final long serialVersionUID = 1L;
34+
35+
private final transient UserRecord userRecord;
36+
37+
public KplBackpressureException(String message, UserRecord userRecord) {
38+
super(message);
39+
this.userRecord = userRecord;
40+
}
41+
42+
/**
43+
* Get the {@link UserRecord} when this exception has been thrown.
44+
* @return the {@link UserRecord} when this exception has been thrown.
45+
*/
46+
public UserRecord getUserRecord() {
47+
return this.userRecord;
48+
}
49+
50+
}

0 commit comments

Comments
 (0)