Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions debezium-platform-conductor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,12 @@
<version>${kafka-clients.version}</version>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kinesis</artifactId>
<version>1.12.791</version>
</dependency>


<dependency>
<groupId>io.quarkus</groupId>
Expand Down Expand Up @@ -419,6 +425,12 @@
<artifactId>mssqlserver</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>localstack</artifactId>
<version>${test-containers.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
Expand Down Expand Up @@ -459,6 +471,12 @@
<artifactId>quarkus-test-kafka-companion</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>kinesis</artifactId>
<version>2.29.23</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/

package io.debezium.platform.environment.connection.destination;

import java.net.URI;
import java.util.Map;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Named;

import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.platform.data.dto.ConnectionValidationResult;
import io.debezium.platform.domain.views.Connection;
import io.debezium.platform.environment.connection.ConnectionValidator;

import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;

/**
* Implementation of {@link ConnectionValidator} for Amazon Kinesis.
* Validates stream name, region, and tests by sending a simple record.
*
* Author: Pranav Tiwari
*/
@ApplicationScoped
@Named("AMAZON_KINESIS")
public class AmazonKinesisConnectionValidator implements ConnectionValidator {

private static final Logger LOGGER = LoggerFactory.getLogger(AmazonKinesisConnectionValidator.class);

private static final String REGION_KEY = "region";
private static final String STREAM_NAME_KEY = "stream";
private static final String ENDPOINT_KEY = "endpoint";
private static final String PARTITION_KEY = "partitionKey"; // Optional
private static final String TEST_MESSAGE = "Kinesis validation test message";

private final int defaultTimeout;

public AmazonKinesisConnectionValidator(
@ConfigProperty(name = "destinations.kinesis.connection.timeout") int defaultTimeout) {
this.defaultTimeout = defaultTimeout;
}

@Override
public ConnectionValidationResult validate(Connection connectionConfig) {
if (connectionConfig == null) {
return ConnectionValidationResult.failed("Connection configuration cannot be null");
}

try {
LOGGER.debug("Starting Kinesis connection validation for: {}", connectionConfig.getName());

Map<String, Object> kinesisConfig = connectionConfig.getConfig();

ConnectionValidationResult configValidation = validateConfiguration(kinesisConfig);
if (!configValidation.valid()) {
return configValidation;
}

return performConnectionValidation(kinesisConfig);
}
catch (Exception e) {
LOGGER.error("Unexpected error during Kinesis connection validation", e);
return ConnectionValidationResult.failed("Unexpected error: " + e.getMessage());
}
}

private ConnectionValidationResult validateConfiguration(Map<String, Object> config) {
if (!config.containsKey(REGION_KEY) || config.get(REGION_KEY) == null ||
config.get(REGION_KEY).toString().trim().isEmpty()) {
return ConnectionValidationResult.failed("Region must be specified");
}

if (!config.containsKey(STREAM_NAME_KEY) || config.get(STREAM_NAME_KEY) == null ||
config.get(STREAM_NAME_KEY).toString().trim().isEmpty()) {
return ConnectionValidationResult.failed("Stream name must be specified");
}

return ConnectionValidationResult.successful();
}

private ConnectionValidationResult performConnectionValidation(Map<String, Object> config) {
KinesisClient kinesisClient = null;

try {
String regionName = config.get(REGION_KEY).toString().trim();
String streamName = config.get(STREAM_NAME_KEY).toString().trim();

LOGGER.debug("Connecting to Kinesis in region: {}, stream: {}", regionName, streamName);

var builder = KinesisClient.builder()
.region(Region.of(regionName));

// ✅ Allow custom endpoint for testing (LocalStack)
if (config.containsKey(ENDPOINT_KEY) && config.get(ENDPOINT_KEY) != null) {
builder.endpointOverride(URI.create(config.get(ENDPOINT_KEY).toString().trim()));
}

kinesisClient = builder.build();

var response = kinesisClient.describeStreamSummary(r -> r.streamName(streamName));

LOGGER.debug("Successfully described Kinesis stream '{}'. Status: {}",
streamName, response.streamDescriptionSummary().streamStatusAsString());

return ConnectionValidationResult.successful();

}
catch (software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException e) {
String messages = "Stream not found: Please verify the stream name and region.";
LOGGER.error(messages, e);
return ConnectionValidationResult.failed(messages);
}
catch (software.amazon.awssdk.services.kinesis.model.AccessDeniedException e) {
String messages = "Access denied: Check IAM permissions or credentials.";
LOGGER.error(messages, e);
return ConnectionValidationResult.failed(messages);
}
catch (software.amazon.awssdk.core.exception.SdkClientException e) {
String messages = "Client error: " + e.getMessage();
LOGGER.error(messages, e);
return ConnectionValidationResult.failed(messages);
}
catch (Exception e) {
LOGGER.warn("Generic exception during validation", e);
return ConnectionValidationResult.failed("Failed to validate Kinesis connection: " + e.getMessage());
}
finally {
if (kinesisClient != null) {
try {
kinesisClient.close();
}
catch (Exception ex) {
LOGGER.warn("Error closing Kinesis client", ex);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ destinations:
kafka:
connection:
timeout: 60
kinesis:
connection:
timeout: 5

quarkus:
rest-client:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,5 +218,34 @@
}
}
}
},
{
"type": "AMAZON_KINESIS",
"schema": {
"title": "Amazon Kinesis stream connection properties",
"description": "Amazon Kinesis stream connection properties",
"type": "object",
"required": [
"region",
"stream"
],
"additionalProperties": {
"type": "string"
},
"properties": {
"region": {
"type": "string",
"title": "AWS region where the Kinesis stream is located (e.g., us-east-1)"
},
"stream": {
"type": "string",
"title": "The name of the Kinesis stream to connect to"
},
"partitionKey": {
"type": "string",
"title": "The partition key used when sending a test record (optional, defaults to 'test-partition')"
}
}
}
}
]
Loading
Loading