From 05ff4f4e9388d133059e28880ed5b33218bc526e Mon Sep 17 00:00:00 2001
From: prtiwari
Date: Mon, 22 Sep 2025 11:11:13 +0530
Subject: [PATCH 1/3] DBZ-9440- Add connection validator for Milvus.
Signed-off-by: pranavt84
---
debezium-platform-conductor/pom.xml | 9 +-
.../MilvusConnectionValidator.java | 210 ++++++++++++++++++
.../src/main/resources/application.yml | 3 +
.../main/resources/connection-schemas.json | 38 +++-
4 files changed, 258 insertions(+), 2 deletions(-)
create mode 100644 debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/MilvusConnectionValidator.java
diff --git a/debezium-platform-conductor/pom.xml b/debezium-platform-conductor/pom.xml
index 0154f754..c08e6ea2 100644
--- a/debezium-platform-conductor/pom.xml
+++ b/debezium-platform-conductor/pom.xml
@@ -117,7 +117,7 @@
io.debezium
debezium-operator-api
- ${version.debezium}
+ 3.2.2.Final
@@ -364,6 +364,13 @@
${kafka-clients.version}
+
+
+ io.milvus
+ milvus-sdk-java
+ 2.6.4
+
+
io.quarkus
diff --git a/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/MilvusConnectionValidator.java b/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/MilvusConnectionValidator.java
new file mode 100644
index 00000000..bfc60310
--- /dev/null
+++ b/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/MilvusConnectionValidator.java
@@ -0,0 +1,210 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.platform.environment.connection.destination;
+
+import java.util.Map;
+
+import jakarta.inject.Named;
+
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.debezium.platform.data.dto.ConnectionValidationResult;
+import io.debezium.platform.domain.views.Connection;
+import io.debezium.platform.environment.connection.ConnectionValidator;
+import io.milvus.v2.client.ConnectConfig;
+import io.milvus.v2.client.MilvusClientV2;
+
+/**
+ * Implementation of {@link ConnectionValidator} for Milvus vector database connections.
+ *
+ * This validator performs validation of Milvus connection configurations
+ * including network connectivity, authentication, and database accessibility.
+ *
+ *
+ *
+ * The validation process includes:
+ *
+ * - Connection parameter validation (host, port, database)
+ * - Client connection establishment
+ * - Authentication verification if credentials are provided
+ * - Basic database operation to confirm connectivity
+ * - Timeout handling for network issues
+ *
+ *
+ *
+ * @author Auto-generated
+ */
+@Named("MILVUS")
+public class MilvusConnectionValidator implements ConnectionValidator {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(MilvusConnectionValidator.class);
+
+ private final int defaultConnectionTimeout;
+
+ private static final String URI_KEY = "uri";
+ private static final String DATABASE_KEY = "database";
+ private static final String USERNAME_KEY = "username";
+ private static final String PASSWORD_KEY = "password";
+ private static final String TOKEN_KEY = "token";
+
+ public MilvusConnectionValidator(@ConfigProperty(name = "destinations.milvus.connection.timeout") int defaultConnectionTimeout) {
+ this.defaultConnectionTimeout = defaultConnectionTimeout;
+ }
+
+ @Override
+ public ConnectionValidationResult validate(Connection connectionConfig) {
+ if (connectionConfig == null) {
+ return ConnectionValidationResult.failed("Connection configuration cannot be null");
+ }
+
+ try {
+ LOGGER.debug("Starting Milvus connection validation for connection: {}", connectionConfig.getName());
+
+ Map milvusConfig = connectionConfig.getConfig();
+
+ // Validate required configuration parameters
+ ConnectionValidationResult configValidation = validateConfiguration(milvusConfig);
+ if (!configValidation.valid()) {
+ return configValidation;
+ }
+
+ return performConnectionValidation(milvusConfig);
+
+ }
+ catch (Exception e) {
+ LOGGER.error("Unexpected error during Milvus connection validation", e);
+ return ConnectionValidationResult.failed("Validation failed due to unexpected error: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Validates the required Milvus configuration parameters.
+ *
+ * @param milvusConfig the Milvus configuration properties
+ * @return ConnectionValidationResult indicating parameter validation result
+ */
+ private ConnectionValidationResult validateConfiguration(Map milvusConfig) {
+ if (!milvusConfig.containsKey(URI_KEY) ||
+ milvusConfig.get(URI_KEY) == null ||
+ milvusConfig.get(URI_KEY).toString().trim().isEmpty()) {
+ return ConnectionValidationResult.failed("URI must be specified");
+ }
+
+ // Validate URI format
+ String uri = milvusConfig.get(URI_KEY).toString().trim();
+ if (!uri.startsWith("http://") && !uri.startsWith("https://")) {
+ return ConnectionValidationResult.failed("URI must start with http:// or https://");
+ }
+
+ return ConnectionValidationResult.successful();
+ }
+
+ /**
+ * Performs the actual connection validation by attempting to connect to Milvus
+ * using the official Milvus V2 SDK client.
+ *
+ * @param milvusConfig the Milvus configuration properties
+ * @return ConnectionValidationResult indicating success or failure
+ */
+ private ConnectionValidationResult performConnectionValidation(Map milvusConfig) {
+ MilvusClientV2 milvusClient = null;
+
+ try {
+ LOGGER.debug("Creating Milvus V2 client for validation");
+
+ // Use the provided URI directly
+ String uri = milvusConfig.get(URI_KEY).toString().trim();
+ LOGGER.debug("Attempting to connect to Milvus at: {}", uri);
+
+ // Build connection configuration using the official API
+ var configBuilder = ConnectConfig.builder()
+ .uri(uri)
+ .rpcDeadlineMs(defaultConnectionTimeout * 1000L); // Convert seconds to milliseconds
+
+ // Add database if specified
+ if (milvusConfig.containsKey(DATABASE_KEY) && milvusConfig.get(DATABASE_KEY) != null
+ && !milvusConfig.get(DATABASE_KEY).toString().trim().isEmpty()) {
+ configBuilder.dbName(milvusConfig.get(DATABASE_KEY).toString());
+ LOGGER.debug("Using database: {}", milvusConfig.get(DATABASE_KEY).toString());
+ }
+
+ // Add authentication if provided
+ if (milvusConfig.containsKey(TOKEN_KEY) && milvusConfig.get(TOKEN_KEY) != null
+ && !milvusConfig.get(TOKEN_KEY).toString().trim().isEmpty()) {
+ // Token format: "username:password"
+ configBuilder.token(milvusConfig.get(TOKEN_KEY).toString());
+ LOGGER.debug("Using token authentication");
+ }
+ else if (milvusConfig.containsKey(USERNAME_KEY) && milvusConfig.get(USERNAME_KEY) != null
+ && milvusConfig.containsKey(PASSWORD_KEY) && milvusConfig.get(PASSWORD_KEY) != null) {
+ // Separate username and password
+ configBuilder.username(milvusConfig.get(USERNAME_KEY).toString())
+ .password(milvusConfig.get(PASSWORD_KEY).toString());
+ LOGGER.debug("Using username/password authentication for user: {}", milvusConfig.get(USERNAME_KEY).toString());
+ }
+
+ // Create client with the configuration
+ milvusClient = new MilvusClientV2(configBuilder.build());
+
+ LOGGER.debug("Successfully created Milvus client, performing basic validation");
+
+ // Perform a simple operation to verify the connection works
+ // Using listDatabases() as a lightweight operation to test connectivity
+ var databases = milvusClient.listDatabases();
+ LOGGER.debug("Successfully validated Milvus connection. Available databases: {}", databases.getDatabaseNames().size());
+
+ return ConnectionValidationResult.successful();
+
+ }
+ catch (Exception e) {
+ LOGGER.warn("Failed to connect to Milvus server", e);
+
+ String errorMessage = e.getMessage();
+ if (errorMessage == null) {
+ errorMessage = e.getClass().getSimpleName();
+ }
+
+ // Handle specific error types with user-friendly messages
+ if (errorMessage.contains("timeout") || errorMessage.contains("TimeoutException") ||
+ errorMessage.contains("deadline")) {
+ return ConnectionValidationResult.failed(
+ "Connection timeout - please check host, port and network connectivity");
+ }
+ else if (errorMessage.contains("authentication") || errorMessage.contains("auth") ||
+ errorMessage.contains("permission") || errorMessage.contains("credentials") ||
+ errorMessage.contains("Unauthenticated")) {
+ return ConnectionValidationResult.failed(
+ "Authentication failed - please check username, password, or token");
+ }
+ else if (errorMessage.contains("connect") || errorMessage.contains("refused") ||
+ errorMessage.contains("unreachable") || errorMessage.contains("UNAVAILABLE")) {
+ return ConnectionValidationResult.failed(
+ "Cannot connect to Milvus server - please check host and port configuration");
+ }
+ else if (errorMessage.contains("database") && errorMessage.contains("not found")) {
+ return ConnectionValidationResult.failed(
+ "Specified database does not exist - please check database name");
+ }
+ else {
+ return ConnectionValidationResult.failed("Failed to connect to Milvus: " + errorMessage);
+ }
+
+ }
+ finally {
+ if (milvusClient != null) {
+ try {
+ LOGGER.debug("Closing Milvus client");
+ milvusClient.close();
+ }
+ catch (Exception e) {
+ LOGGER.warn("Error closing Milvus client", e);
+ }
+ }
+ }
+ }
+}
diff --git a/debezium-platform-conductor/src/main/resources/application.yml b/debezium-platform-conductor/src/main/resources/application.yml
index 66fd877c..ed5a2f42 100644
--- a/debezium-platform-conductor/src/main/resources/application.yml
+++ b/debezium-platform-conductor/src/main/resources/application.yml
@@ -44,6 +44,9 @@ destinations:
kafka:
connection:
timeout: 60
+ milvus:
+ connection:
+ timeout: 60
quarkus:
rest-client:
diff --git a/debezium-platform-conductor/src/main/resources/connection-schemas.json b/debezium-platform-conductor/src/main/resources/connection-schemas.json
index 936e1801..fdfb03b7 100644
--- a/debezium-platform-conductor/src/main/resources/connection-schemas.json
+++ b/debezium-platform-conductor/src/main/resources/connection-schemas.json
@@ -214,7 +214,43 @@
"properties": {
"bootstrap.servers": {
"type": "list",
- "title": "List of “hostname:port” pairs that address one or more (even all) of the brokers."
+ "title": "List of "hostname:port" pairs that address one or more (even all) of the brokers."
+ }
+ }
+ }
+ },
+ {
+ "type": "MILVUS",
+ "schema": {
+ "title": "Milvus vector database connection properties",
+ "description": "Milvus vector database connection properties",
+ "type": "object",
+ "required": [
+ "uri"
+ ],
+ "additionalProperties": {
+ "type": "string"
+ },
+ "properties": {
+ "uri": {
+ "type": "string",
+ "title": "The URI of the Milvus server (e.g., http://localhost:19530)"
+ },
+ "database": {
+ "type": "string",
+ "title": "The name of the database to connect to (optional)"
+ },
+ "username": {
+ "type": "string",
+ "title": "Username for authentication (optional)"
+ },
+ "password": {
+ "type": "string",
+ "title": "Password for authentication (optional)"
+ },
+ "token": {
+ "type": "string",
+ "title": "Token for authentication (alternative to username/password, format: username:password)"
}
}
}
From 3ab31664eff8208dfb50934cb6629660ea0607ad Mon Sep 17 00:00:00 2001
From: prtiwari
Date: Mon, 22 Sep 2025 11:12:50 +0530
Subject: [PATCH 2/3] DBZ-9440- Add connection validator for Milvus.
Signed-off-by: pranavt84
---
debezium-platform-conductor/pom.xml | 2 +-
.../connection/destination/MilvusConnectionValidator.java | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/debezium-platform-conductor/pom.xml b/debezium-platform-conductor/pom.xml
index c08e6ea2..a89a7b15 100644
--- a/debezium-platform-conductor/pom.xml
+++ b/debezium-platform-conductor/pom.xml
@@ -117,7 +117,7 @@
io.debezium
debezium-operator-api
- 3.2.2.Final
+ ${version.debezium}
diff --git a/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/MilvusConnectionValidator.java b/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/MilvusConnectionValidator.java
index bfc60310..184642f3 100644
--- a/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/MilvusConnectionValidator.java
+++ b/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/MilvusConnectionValidator.java
@@ -37,7 +37,7 @@
*
*
*
- * @author Auto-generated
+ * @author Pranav Tiwari
*/
@Named("MILVUS")
public class MilvusConnectionValidator implements ConnectionValidator {
From 6a5582e783939d5aa0e3fc35c0134b0d4d65026c Mon Sep 17 00:00:00 2001
From: Pranav Tiwari
Date: Sat, 11 Oct 2025 20:01:36 +0530
Subject: [PATCH 3/3] DBZ-9440- Added test classes.
Signed-off-by: pranavt84
---
.../MilvusConnectionValidator.java | 2 +
.../MilvusConnectionValidatorAuthIT.java | 208 +++++++++++++++++
.../MilvusConnectionValidatorIT.java | 136 ++++++++++++
.../MilvusConnectionValidatorTest.java | 210 ++++++++++++++++++
.../database/db/MilvusTestResource.java | 66 ++++++
.../db/MilvusTestResourceAuthenticated.java | 82 +++++++
6 files changed, 704 insertions(+)
create mode 100644 debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/MilvusConnectionValidatorAuthIT.java
create mode 100644 debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/MilvusConnectionValidatorIT.java
create mode 100644 debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/MilvusConnectionValidatorTest.java
create mode 100644 debezium-platform-conductor/src/test/java/io/debezium/platform/environment/database/db/MilvusTestResource.java
create mode 100644 debezium-platform-conductor/src/test/java/io/debezium/platform/environment/database/db/MilvusTestResourceAuthenticated.java
diff --git a/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/MilvusConnectionValidator.java b/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/MilvusConnectionValidator.java
index 184642f3..ead37d16 100644
--- a/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/MilvusConnectionValidator.java
+++ b/debezium-platform-conductor/src/main/java/io/debezium/platform/environment/connection/destination/MilvusConnectionValidator.java
@@ -7,6 +7,7 @@
import java.util.Map;
+import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Named;
import org.eclipse.microprofile.config.inject.ConfigProperty;
@@ -39,6 +40,7 @@
*
* @author Pranav Tiwari
*/
+@ApplicationScoped
@Named("MILVUS")
public class MilvusConnectionValidator implements ConnectionValidator {
diff --git a/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/MilvusConnectionValidatorAuthIT.java b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/MilvusConnectionValidatorAuthIT.java
new file mode 100644
index 00000000..f6090f55
--- /dev/null
+++ b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/MilvusConnectionValidatorAuthIT.java
@@ -0,0 +1,208 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.platform.environment.connection;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import jakarta.inject.Inject;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+import io.debezium.platform.data.dto.ConnectionValidationResult;
+import io.debezium.platform.data.model.ConnectionEntity;
+import io.debezium.platform.domain.views.Connection;
+import io.debezium.platform.environment.connection.destination.MilvusConnectionValidator;
+import io.debezium.platform.environment.database.db.MilvusTestResourceAuthenticated;
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.junit.QuarkusTest;
+
+/**
+ * Integration tests for {@link MilvusConnectionValidator} with authentication.
+ *
+ * These tests use a real Milvus container (via Testcontainers) configured with
+ * authentication to verify connection validation against an authenticated Milvus instance.
+ *
+ *
+ * @author Pranav Tiwari
+ */
+@QuarkusTest
+@QuarkusTestResource(MilvusTestResourceAuthenticated.class)
+class MilvusConnectionValidatorAuthIT {
+
+ @Inject
+ MilvusConnectionValidator connectionValidator;
+
+ @Test
+ @DisplayName("Should successfully connect with username and password")
+ void shouldConnectWithUsernamePassword() {
+ GenericContainer> container = MilvusTestResourceAuthenticated.getContainer();
+
+ Awaitility.await()
+ .atMost(300, TimeUnit.SECONDS)
+ .until(container::isRunning);
+
+ String uri = String.format("http://%s:%d",
+ container.getHost(),
+ container.getMappedPort(19530));
+
+ Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.MILVUS, Map.of(
+ "uri", uri,
+ "username", MilvusTestResourceAuthenticated.DEFAULT_USERNAME,
+ "password", MilvusTestResourceAuthenticated.DEFAULT_PASSWORD));
+
+ ConnectionValidationResult result = connectionValidator.validate(connectionConfig);
+
+ assertTrue(result.valid(), "Connection validation should succeed with correct credentials");
+ }
+
+ @Test
+ @DisplayName("Should successfully connect with token authentication")
+ void shouldConnectWithToken() {
+ GenericContainer> container = MilvusTestResourceAuthenticated.getContainer();
+
+ Awaitility.await()
+ .atMost(300, TimeUnit.SECONDS)
+ .until(container::isRunning);
+
+ String uri = String.format("http://%s:%d",
+ container.getHost(),
+ container.getMappedPort(19530));
+
+ Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.MILVUS, Map.of(
+ "uri", uri,
+ "token", MilvusTestResourceAuthenticated.DEFAULT_TOKEN));
+
+ ConnectionValidationResult result = connectionValidator.validate(connectionConfig);
+
+ assertTrue(result.valid(), "Connection validation should succeed with token");
+ }
+
+ @Test
+ @DisplayName("Should successfully connect with database and authentication")
+ void shouldConnectWithDatabaseAndAuth() {
+ GenericContainer> container = MilvusTestResourceAuthenticated.getContainer();
+
+ Awaitility.await()
+ .atMost(300, TimeUnit.SECONDS)
+ .until(container::isRunning);
+
+ String uri = String.format("http://%s:%d",
+ container.getHost(),
+ container.getMappedPort(19530));
+
+ Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.MILVUS, Map.of(
+ "uri", uri,
+ "database", "default",
+ "username", MilvusTestResourceAuthenticated.DEFAULT_USERNAME,
+ "password", MilvusTestResourceAuthenticated.DEFAULT_PASSWORD));
+
+ ConnectionValidationResult result = connectionValidator.validate(connectionConfig);
+
+ assertTrue(result.valid(), "Connection validation should succeed with database and auth");
+ }
+
+ @Test
+ @DisplayName("Should fail with incorrect password")
+ void shouldFailWithIncorrectPassword() {
+ GenericContainer> container = MilvusTestResourceAuthenticated.getContainer();
+
+ Awaitility.await()
+ .atMost(300, TimeUnit.SECONDS)
+ .until(container::isRunning);
+
+ String uri = String.format("http://%s:%d",
+ container.getHost(),
+ container.getMappedPort(19530));
+
+ Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.MILVUS, Map.of(
+ "uri", uri,
+ "username", MilvusTestResourceAuthenticated.DEFAULT_USERNAME,
+ "password", "wrong-password"));
+
+ ConnectionValidationResult result = connectionValidator.validate(connectionConfig);
+
+ assertFalse(result.valid(), "Connection validation should fail with incorrect password");
+ assertThat(result.message()).containsAnyOf("auth", "Authentication", "permission", "Unauthenticated");
+ }
+
+ @Test
+ @DisplayName("Should fail with incorrect username")
+ void shouldFailWithIncorrectUsername() {
+ GenericContainer> container = MilvusTestResourceAuthenticated.getContainer();
+
+ Awaitility.await()
+ .atMost(300, TimeUnit.SECONDS)
+ .until(container::isRunning);
+
+ String uri = String.format("http://%s:%d",
+ container.getHost(),
+ container.getMappedPort(19530));
+
+ Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.MILVUS, Map.of(
+ "uri", uri,
+ "username", "wrong-user",
+ "password", MilvusTestResourceAuthenticated.DEFAULT_PASSWORD));
+
+ ConnectionValidationResult result = connectionValidator.validate(connectionConfig);
+
+ assertFalse(result.valid(), "Connection validation should fail with incorrect username");
+ assertThat(result.message()).containsAnyOf("auth", "Authentication", "permission", "Unauthenticated");
+ }
+
+ @Test
+ @DisplayName("Should fail with incorrect token")
+ void shouldFailWithIncorrectToken() {
+ GenericContainer> container = MilvusTestResourceAuthenticated.getContainer();
+
+ Awaitility.await()
+ .atMost(300, TimeUnit.SECONDS)
+ .until(container::isRunning);
+
+ String uri = String.format("http://%s:%d",
+ container.getHost(),
+ container.getMappedPort(19530));
+
+ Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.MILVUS, Map.of(
+ "uri", uri,
+ "token", "wrong:token"));
+
+ ConnectionValidationResult result = connectionValidator.validate(connectionConfig);
+
+ assertFalse(result.valid(), "Connection validation should fail with incorrect token");
+ assertThat(result.message()).containsAnyOf("auth", "Authentication", "permission", "Unauthenticated");
+ }
+
+ @Test
+ @DisplayName("Should fail when authentication is required but not provided")
+ void shouldFailWithoutAuthWhenRequired() {
+ GenericContainer> container = MilvusTestResourceAuthenticated.getContainer();
+
+ Awaitility.await()
+ .atMost(300, TimeUnit.SECONDS)
+ .until(container::isRunning);
+
+ String uri = String.format("http://%s:%d",
+ container.getHost(),
+ container.getMappedPort(19530));
+
+ Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.MILVUS, Map.of(
+ "uri", uri));
+
+ ConnectionValidationResult result = connectionValidator.validate(connectionConfig);
+
+ assertFalse(result.valid(), "Connection validation should fail without authentication");
+ assertThat(result.message()).containsAnyOf("auth", "Authentication", "permission", "Unauthenticated");
+ }
+}
+
diff --git a/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/MilvusConnectionValidatorIT.java b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/MilvusConnectionValidatorIT.java
new file mode 100644
index 00000000..9302ee96
--- /dev/null
+++ b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/MilvusConnectionValidatorIT.java
@@ -0,0 +1,136 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.platform.environment.connection;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import jakarta.inject.Inject;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+import io.debezium.platform.data.dto.ConnectionValidationResult;
+import io.debezium.platform.data.model.ConnectionEntity;
+import io.debezium.platform.domain.views.Connection;
+import io.debezium.platform.environment.connection.destination.MilvusConnectionValidator;
+import io.debezium.platform.environment.database.db.MilvusTestResource;
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.junit.QuarkusTest;
+
+/**
+ * Integration tests for {@link MilvusConnectionValidator} without authentication.
+ *
+ * These tests use a real Milvus container (via Testcontainers) to verify
+ * connection validation against an actual Milvus instance without authentication.
+ *
+ *
+ * @author Pranav Tiwari
+ */
+@QuarkusTest
+@QuarkusTestResource(MilvusTestResource.class)
+class MilvusConnectionValidatorIT {
+
+ @Inject
+ MilvusConnectionValidator connectionValidator;
+
+ @Test
+ @DisplayName("Should successfully connect to Milvus without authentication")
+ void shouldConnectWithoutAuth() {
+ GenericContainer> container = MilvusTestResource.getContainer();
+
+ Awaitility.await()
+ .atMost(300, TimeUnit.SECONDS)
+ .until(container::isRunning);
+
+ String uri = String.format("http://%s:%d",
+ container.getHost(),
+ container.getMappedPort(19530));
+
+ Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.MILVUS, Map.of(
+ "uri", uri));
+
+ ConnectionValidationResult result = connectionValidator.validate(connectionConfig);
+
+ assertTrue(result.valid(), "Connection validation should succeed");
+ }
+
+ @Test
+ @DisplayName("Should successfully connect with database parameter")
+ void shouldConnectWithDatabase() {
+ GenericContainer> container = MilvusTestResource.getContainer();
+
+ Awaitility.await()
+ .atMost(300, TimeUnit.SECONDS)
+ .until(container::isRunning);
+
+ String uri = String.format("http://%s:%d",
+ container.getHost(),
+ container.getMappedPort(19530));
+
+ Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.MILVUS, Map.of(
+ "uri", uri,
+ "database", "default"));
+
+ ConnectionValidationResult result = connectionValidator.validate(connectionConfig);
+
+ assertTrue(result.valid(), "Connection validation with database should succeed");
+ }
+
+ @Test
+ @DisplayName("Should fail with invalid host")
+ void shouldFailWithInvalidHost() {
+ Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.MILVUS, Map.of(
+ "uri", "http://invalid-host-12345:19530"));
+
+ ConnectionValidationResult result = connectionValidator.validate(connectionConfig);
+
+ assertFalse(result.valid(), "Connection validation should fail");
+ assertThat(result.message()).containsAnyOf("connect", "UNAVAILABLE", "unreachable", "timeout");
+ }
+
+ @Test
+ @DisplayName("Should fail with invalid port")
+ void shouldFailWithInvalidPort() {
+ GenericContainer> container = MilvusTestResource.getContainer();
+
+ Awaitility.await()
+ .atMost(300, TimeUnit.SECONDS)
+ .until(container::isRunning);
+
+ String uri = String.format("http://%s:%d",
+ container.getHost(),
+ 99999); // Invalid port
+
+ Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.MILVUS, Map.of(
+ "uri", uri));
+
+ ConnectionValidationResult result = connectionValidator.validate(connectionConfig);
+
+ assertFalse(result.valid(), "Connection validation should fail");
+ assertThat(result.message()).containsAnyOf("connect", "refused", "UNAVAILABLE");
+ }
+
+ @Test
+ @DisplayName("Should handle timeout gracefully")
+ void shouldHandleTimeout() {
+ // Use a non-routable IP address to simulate timeout
+ Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.MILVUS, Map.of(
+ "uri", "http://10.255.255.1:19530"));
+
+ ConnectionValidationResult result = connectionValidator.validate(connectionConfig);
+
+ assertFalse(result.valid(), "Connection validation should fail");
+ assertThat(result.message()).containsAnyOf("timeout", "connect", "Connection timeout");
+ }
+}
+
diff --git a/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/MilvusConnectionValidatorTest.java b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/MilvusConnectionValidatorTest.java
new file mode 100644
index 00000000..b5f707c9
--- /dev/null
+++ b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/connection/MilvusConnectionValidatorTest.java
@@ -0,0 +1,210 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.platform.environment.connection;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import io.debezium.platform.data.dto.ConnectionValidationResult;
+import io.debezium.platform.data.model.ConnectionEntity;
+import io.debezium.platform.domain.views.Connection;
+import io.debezium.platform.environment.connection.destination.MilvusConnectionValidator;
+
+/**
+ * Unit tests for {@link MilvusConnectionValidator}.
+ *
+ * These tests validate the configuration validation logic without requiring
+ * an actual Milvus server. They test parameter validation, URI format checking,
+ * and edge cases in the validator implementation.
+ *
+ *
+ * @author Pranav Tiwari
+ */
+class MilvusConnectionValidatorTest {
+
+ private MilvusConnectionValidator connectionValidator;
+
+ @BeforeEach
+ void setup() {
+ // Initialize with a default timeout of 5 seconds
+ connectionValidator = new MilvusConnectionValidator(5);
+ }
+
+ @Test
+ @DisplayName("Should fail validation when connection config is null")
+ void shouldFailWhenConfigIsNull() {
+ ConnectionValidationResult result = connectionValidator.validate(null);
+
+ assertFalse(result.valid(), "Validation should fail for null configuration");
+ Assertions.assertThat(result.message()).contains("cannot be null");
+ }
+
+ @Test
+ @DisplayName("Should fail validation when URI is missing")
+ void shouldFailWhenUriIsMissing() {
+ Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.MILVUS, Map.of());
+
+ ConnectionValidationResult result = connectionValidator.validate(connectionConfig);
+
+ assertFalse(result.valid(), "Validation should fail when URI is missing");
+ Assertions.assertThat(result.message()).contains("URI must be specified");
+ }
+
+ @Test
+ @DisplayName("Should fail validation when URI is empty")
+ void shouldFailWhenUriIsEmpty() {
+ Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.MILVUS, Map.of("uri", ""));
+
+ ConnectionValidationResult result = connectionValidator.validate(connectionConfig);
+
+ assertFalse(result.valid(), "Validation should fail when URI is empty");
+ Assertions.assertThat(result.message()).contains("URI must be specified");
+ }
+
+ @Test
+ @DisplayName("Should fail validation when URI is only whitespace")
+ void shouldFailWhenUriIsWhitespace() {
+ Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.MILVUS, Map.of("uri", " "));
+
+ ConnectionValidationResult result = connectionValidator.validate(connectionConfig);
+
+ assertFalse(result.valid(), "Validation should fail when URI is only whitespace");
+ Assertions.assertThat(result.message()).contains("URI must be specified");
+ }
+
+ @Test
+ @DisplayName("Should fail validation when URI doesn't start with http:// or https://")
+ void shouldFailWhenUriHasInvalidProtocol() {
+ Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.MILVUS,
+ Map.of("uri", "localhost:19530"));
+
+ ConnectionValidationResult result = connectionValidator.validate(connectionConfig);
+
+ assertFalse(result.valid(), "Validation should fail for URI without http:// or https://");
+ Assertions.assertThat(result.message()).contains("URI must start with http:// or https://");
+ }
+
+ @Test
+ @DisplayName("Should accept valid HTTP URI")
+ void shouldAcceptValidHttpUri() {
+ Map config = new HashMap<>();
+ config.put("uri", "http://localhost:19530");
+
+ Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.MILVUS, config);
+
+ // This will fail at connection time, but should pass URI validation
+ ConnectionValidationResult result = connectionValidator.validate(connectionConfig);
+
+ // Since we can't connect to a real server, it will fail, but not with URI validation error
+ if (!result.valid()) {
+ Assertions.assertThat(result.message()).doesNotContain("URI must start with");
+ }
+ }
+
+ @Test
+ @DisplayName("Should accept valid HTTPS URI")
+ void shouldAcceptValidHttpsUri() {
+ Map config = new HashMap<>();
+ config.put("uri", "https://milvus.example.com:19530");
+
+ Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.MILVUS, config);
+
+ // This will fail at connection time, but should pass URI validation
+ ConnectionValidationResult result = connectionValidator.validate(connectionConfig);
+
+ // Since we can't connect to a real server, it will fail, but not with URI validation error
+ if (!result.valid()) {
+ Assertions.assertThat(result.message()).doesNotContain("URI must start with");
+ }
+ }
+
+ @Test
+ @DisplayName("Should handle optional database parameter")
+ void shouldHandleOptionalDatabase() {
+ Map config = new HashMap<>();
+ config.put("uri", "http://10.255.255.1:19530"); // Non-routable IP, will timeout
+ config.put("database", "test_db");
+
+ Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.MILVUS, config);
+
+ ConnectionValidationResult result = connectionValidator.validate(connectionConfig);
+
+ // Will fail to connect, but should pass configuration validation
+ assertFalse(result.valid(), "Connection should fail to non-routable address");
+ }
+
+ @Test
+ @DisplayName("Should handle optional username and password")
+ void shouldHandleUsernamePassword() {
+ Map config = new HashMap<>();
+ config.put("uri", "http://10.255.255.1:19530"); // Non-routable IP
+ config.put("username", "testuser");
+ config.put("password", "testpass");
+
+ Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.MILVUS, config);
+
+ ConnectionValidationResult result = connectionValidator.validate(connectionConfig);
+
+ // Will fail to connect, but should pass configuration validation
+ assertFalse(result.valid(), "Connection should fail to non-routable address");
+ }
+
+ @Test
+ @DisplayName("Should handle optional token authentication")
+ void shouldHandleTokenAuth() {
+ Map config = new HashMap<>();
+ config.put("uri", "http://10.255.255.1:19530"); // Non-routable IP
+ config.put("token", "testuser:testpass");
+
+ Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.MILVUS, config);
+
+ ConnectionValidationResult result = connectionValidator.validate(connectionConfig);
+
+ // Will fail to connect, but should pass configuration validation
+ assertFalse(result.valid(), "Connection should fail to non-routable address");
+ }
+
+ @Test
+ @DisplayName("Should handle all optional parameters together")
+ void shouldHandleAllOptionalParameters() {
+ Map config = new HashMap<>();
+ config.put("uri", "http://10.255.255.1:19530"); // Non-routable IP
+ config.put("database", "test_db");
+ config.put("username", "testuser");
+ config.put("password", "testpass");
+
+ Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.MILVUS, config);
+
+ ConnectionValidationResult result = connectionValidator.validate(connectionConfig);
+
+ // Will fail to connect, but should pass configuration validation
+ assertFalse(result.valid(), "Connection should fail to non-routable address");
+ }
+
+ @Test
+ @DisplayName("Should trim whitespace from URI")
+ void shouldTrimUriWhitespace() {
+ Map config = new HashMap<>();
+ config.put("uri", " http://10.255.255.1:19530 ");
+
+ Connection connectionConfig = new TestConnectionView(ConnectionEntity.Type.MILVUS, config);
+
+ ConnectionValidationResult result = connectionValidator.validate(connectionConfig);
+
+ // Should pass URI validation (trimming happens)
+ if (!result.valid()) {
+ Assertions.assertThat(result.message()).doesNotContain("URI must start with");
+ }
+ }
+}
+
diff --git a/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/database/db/MilvusTestResource.java b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/database/db/MilvusTestResource.java
new file mode 100644
index 00000000..85a4ac0f
--- /dev/null
+++ b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/database/db/MilvusTestResource.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.platform.environment.database.db;
+
+import java.util.Map;
+
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
+
+/**
+ * Test resource for Milvus vector database using Testcontainers.
+ *
+ * This class provides a containerized Milvus instance WITHOUT authentication
+ * for integration testing. It manages the lifecycle of a Docker container running
+ * Milvus server in standalone mode, making it suitable for testing basic
+ * connection validation scenarios.
+ *
+ * The Milvus instance is configured with:
+ *
+ * - Default port 19530 mapped to a random host port
+ * - No authentication required
+ * - Standalone mode (single-node deployment)
+ *
+ *
+ *
+ * @author Pranav Tiwari
+ */
+public class MilvusTestResource implements QuarkusTestResourceLifecycleManager {
+
+ private static final String MILVUS_IMAGE = "milvusdb/milvus:latest";
+ private static final int MILVUS_PORT = 19530;
+
+ private static GenericContainer> milvusContainer;
+
+ @Override
+ public Map start() {
+ milvusContainer = new GenericContainer<>(DockerImageName.parse(MILVUS_IMAGE))
+ .withExposedPorts(MILVUS_PORT)
+ .withCommand("milvus", "run", "standalone")
+ .withEnv("ETCD_USE_EMBED", "true")
+ .withEnv("COMMON_STORAGETYPE", "local");
+
+ milvusContainer.start();
+
+ return Map.of(
+ "milvus.host", milvusContainer.getHost(),
+ "milvus.port", milvusContainer.getMappedPort(MILVUS_PORT).toString());
+ }
+
+ @Override
+ public void stop() {
+ if (milvusContainer != null) {
+ milvusContainer.stop();
+ }
+ }
+
+ public static GenericContainer> getContainer() {
+ return milvusContainer;
+ }
+}
+
diff --git a/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/database/db/MilvusTestResourceAuthenticated.java b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/database/db/MilvusTestResourceAuthenticated.java
new file mode 100644
index 00000000..b0fe8a68
--- /dev/null
+++ b/debezium-platform-conductor/src/test/java/io/debezium/platform/environment/database/db/MilvusTestResourceAuthenticated.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.platform.environment.database.db;
+
+import java.util.Map;
+
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
+
+/**
+ * Test resource for Milvus vector database using Testcontainers WITH authentication.
+ *
+ * This class provides a containerized Milvus instance WITH authentication enabled
+ * for integration testing. It manages the lifecycle of a Docker container running
+ * Milvus server with default root user credentials.
+ *
+ * The Milvus instance is configured with:
+ *
+ * - Default port 19530 mapped to a random host port
+ * - Authentication enabled with root:Milvus credentials
+ * - Standalone mode (single-node deployment)
+ *
+ *
+ *
+ * Authentication Details:
+ * Default credentials:
+ *
+ * - Username: root
+ * - Password: Milvus
+ * - Token format: root:Milvus
+ *
+ *
+ *
+ * @author Pranav Tiwari
+ */
+public class MilvusTestResourceAuthenticated implements QuarkusTestResourceLifecycleManager {
+
+ private static final String MILVUS_IMAGE = "milvusdb/milvus:latest";
+ private static final int MILVUS_PORT = 19530;
+
+ public static final String DEFAULT_USERNAME = "root";
+ public static final String DEFAULT_PASSWORD = "Milvus";
+ public static final String DEFAULT_TOKEN = DEFAULT_USERNAME + ":" + DEFAULT_PASSWORD;
+
+ private static GenericContainer> milvusContainer;
+
+ @Override
+ public Map start() {
+ milvusContainer = new GenericContainer<>(DockerImageName.parse(MILVUS_IMAGE))
+ .withExposedPorts(MILVUS_PORT)
+ .withCommand("milvus", "run", "standalone")
+ .withEnv("ETCD_USE_EMBED", "true")
+ .withEnv("COMMON_STORAGETYPE", "local")
+ .withEnv("COMMON_SECURITY_AUTHORIZATIONENABLED", "true");
+
+ milvusContainer.start();
+
+ return Map.of(
+ "milvus.host", milvusContainer.getHost(),
+ "milvus.port", milvusContainer.getMappedPort(MILVUS_PORT).toString(),
+ "milvus.username", DEFAULT_USERNAME,
+ "milvus.password", DEFAULT_PASSWORD,
+ "milvus.token", DEFAULT_TOKEN);
+ }
+
+ @Override
+ public void stop() {
+ if (milvusContainer != null) {
+ milvusContainer.stop();
+ }
+ }
+
+ public static GenericContainer> getContainer() {
+ return milvusContainer;
+ }
+}
+