diff --git a/quarkus-debezium-parent/quarkus-debezium-db2-parent/runtime/src/main/java/io/quarkus/debezium/engine/Db2EngineProducer.java b/quarkus-debezium-parent/quarkus-debezium-db2-parent/runtime/src/main/java/io/quarkus/debezium/engine/Db2EngineProducer.java index 0508322c04..f47ebbdd25 100644 --- a/quarkus-debezium-parent/quarkus-debezium-db2-parent/runtime/src/main/java/io/quarkus/debezium/engine/Db2EngineProducer.java +++ b/quarkus-debezium-parent/quarkus-debezium-db2-parent/runtime/src/main/java/io/quarkus/debezium/engine/Db2EngineProducer.java @@ -22,19 +22,16 @@ import io.quarkus.datasource.common.runtime.DatabaseKind; import io.quarkus.debezium.agroal.engine.AgroalParser; import io.quarkus.debezium.configuration.DebeziumConfigurationEngineParser; -import io.quarkus.debezium.engine.capture.consumer.SourceRecordConsumerHandler; public class Db2EngineProducer implements ConnectorProducer { public static final Connector DB2 = new Connector(Db2Connector.class.getName()); - private final StateHandler stateHandler; - private final SourceRecordConsumerHandler sourceRecordConsumerHandler; private final AgroalParser agroalParser; + private final DebeziumFactory debeziumFactory; - public Db2EngineProducer(StateHandler stateHandler, SourceRecordConsumerHandler sourceRecordConsumerHandler, AgroalParser agroalParser) { - this.stateHandler = stateHandler; - this.sourceRecordConsumerHandler = sourceRecordConsumerHandler; + public Db2EngineProducer(AgroalParser agroalParser, DebeziumFactory debeziumFactory) { this.agroalParser = agroalParser; + this.debeziumFactory = debeziumFactory; } @Override @@ -46,18 +43,11 @@ public DebeziumConnectorRegistry engine(DebeziumEngineConfiguration debeziumEngi private final Map engines = multiEngineConfigurations .stream() .map(engine -> { - EngineManifest engineManifest = new EngineManifest(engine.engineId()); - - Map debeziumConfiguration = engine.configuration(); - // remove unnecessary configuration for sqlserver - debeziumConfiguration.remove(DATABASE_CONFIG_PREFIX + JdbcConfiguration.DATABASE.name()); + engine.configuration() + .remove(DATABASE_CONFIG_PREFIX + JdbcConfiguration.DATABASE.name()); - return Map.entry(engine.engineId(), new SourceRecordDebezium( - engine.configuration(), - stateHandler, - DB2, - sourceRecordConsumerHandler.get(engineManifest), engineManifest)); + return Map.entry(engine.engineId(), debeziumFactory.get(DB2, engine)); }) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); diff --git a/quarkus-debezium-parent/quarkus-debezium-engine-parent/deployment/src/main/java/io/quarkus/debezium/deployment/engine/EngineProcessor.java b/quarkus-debezium-parent/quarkus-debezium-engine-parent/deployment/src/main/java/io/quarkus/debezium/deployment/engine/EngineProcessor.java index 8854dda45e..cd982b047a 100644 --- a/quarkus-debezium-parent/quarkus-debezium-engine-parent/deployment/src/main/java/io/quarkus/debezium/deployment/engine/EngineProcessor.java +++ b/quarkus-debezium-parent/quarkus-debezium-engine-parent/deployment/src/main/java/io/quarkus/debezium/deployment/engine/EngineProcessor.java @@ -77,6 +77,7 @@ import io.quarkus.debezium.deployment.items.DebeziumGeneratedInvokerBuildItem; import io.quarkus.debezium.deployment.items.DebeziumGeneratedPostProcessorBuildItem; import io.quarkus.debezium.deployment.items.DebeziumMediatorBuildItem; +import io.quarkus.debezium.engine.DebeziumFactory; import io.quarkus.debezium.engine.DebeziumRecorder; import io.quarkus.debezium.engine.DefaultStateHandler; import io.quarkus.debezium.engine.capture.CapturingEventInvokerRegistryProducer; @@ -173,6 +174,12 @@ void engine(BuildProducer additionalBeanProducer, .addBeanClasses(DefaultStateHandler.class) .build()); + additionalBeanProducer.produce(AdditionalBeanBuildItem + .builder() + .setUnremovable() + .addBeanClasses(DebeziumFactory.class) + .build()); + additionalBeanProducer.produce(AdditionalBeanBuildItem.builder() .addBeanClasses( DefaultNotificationHandler.class, diff --git a/quarkus-debezium-parent/quarkus-debezium-engine-parent/runtime/src/main/java/io/quarkus/debezium/engine/DebeziumFactory.java b/quarkus-debezium-parent/quarkus-debezium-engine-parent/runtime/src/main/java/io/quarkus/debezium/engine/DebeziumFactory.java new file mode 100644 index 0000000000..df9fde4dcb --- /dev/null +++ b/quarkus-debezium-parent/quarkus-debezium-engine-parent/runtime/src/main/java/io/quarkus/debezium/engine/DebeziumFactory.java @@ -0,0 +1,49 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.quarkus.debezium.engine; + +import jakarta.enterprise.inject.Instance; +import jakarta.inject.Inject; + +import io.debezium.DebeziumException; +import io.debezium.runtime.Connector; +import io.debezium.runtime.Debezium; +import io.debezium.runtime.DebeziumSerialization; +import io.debezium.runtime.EngineManifest; +import io.quarkus.debezium.configuration.DebeziumConfigurationEngineParser.MultiEngineConfiguration; +import io.quarkus.debezium.engine.capture.consumer.SourceRecordConsumerHandler; + +public class DebeziumFactory { + + private final Instance serialization; + private final StateHandler stateHandler; + private final SourceRecordConsumerHandler sourceRecordConsumerHandler; + + @Inject + public DebeziumFactory( + Instance serialization, + StateHandler stateHandler, + SourceRecordConsumerHandler sourceRecordConsumerHandler) { + this.serialization = serialization; + this.stateHandler = stateHandler; + this.sourceRecordConsumerHandler = sourceRecordConsumerHandler; + } + + public Debezium get(Connector connector, MultiEngineConfiguration engine) { + if (serialization.isResolvable()) { + throw new DebeziumException("not implemented yet engine with configurable serialization"); + } + + EngineManifest engineManifest = new EngineManifest(engine.engineId()); + + return new SourceRecordDebezium( + engine.configuration(), + stateHandler, + connector, + sourceRecordConsumerHandler.get(engineManifest), + engineManifest); + } +} diff --git a/quarkus-debezium-parent/quarkus-debezium-engine-spi-parent/runtime/src/main/java/io/debezium/runtime/DebeziumSerialization.java b/quarkus-debezium-parent/quarkus-debezium-engine-spi-parent/runtime/src/main/java/io/debezium/runtime/DebeziumSerialization.java new file mode 100644 index 0000000000..10bf52a293 --- /dev/null +++ b/quarkus-debezium-parent/quarkus-debezium-engine-spi-parent/runtime/src/main/java/io/debezium/runtime/DebeziumSerialization.java @@ -0,0 +1,40 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.runtime; + +import io.debezium.common.annotation.Incubating; +import io.debezium.engine.format.SerializationFormat; + +/** + * serialization information associated to a running {@link Debezium} engine + */ +@Incubating +public interface DebeziumSerialization { + + /** + * + * @return the key {@link SerializationFormat} + */ + Class> getKeyFormat(); + + /** + * + * @return the value {@link SerializationFormat} + */ + Class> getValueFormat(); + + /** + * + * @return the header {@link SerializationFormat} + */ + Class> getHeaderFormat(); + + /** + * + * @return the id for the {@link Debezium} engine + */ + String getEngineId(); +} diff --git a/quarkus-debezium-parent/quarkus-debezium-mariadb-parent/runtime/src/main/java/io/quarkus/debezium/engine/MariaDbEngineProducer.java b/quarkus-debezium-parent/quarkus-debezium-mariadb-parent/runtime/src/main/java/io/quarkus/debezium/engine/MariaDbEngineProducer.java index c694a5704e..962f3ba047 100644 --- a/quarkus-debezium-parent/quarkus-debezium-mariadb-parent/runtime/src/main/java/io/quarkus/debezium/engine/MariaDbEngineProducer.java +++ b/quarkus-debezium-parent/quarkus-debezium-mariadb-parent/runtime/src/main/java/io/quarkus/debezium/engine/MariaDbEngineProducer.java @@ -27,22 +27,18 @@ import io.quarkus.datasource.common.runtime.DatabaseKind; import io.quarkus.debezium.agroal.engine.AgroalParser; import io.quarkus.debezium.configuration.DebeziumConfigurationEngineParser.MultiEngineConfiguration; -import io.quarkus.debezium.engine.capture.consumer.SourceRecordConsumerHandler; public class MariaDbEngineProducer implements ConnectorProducer { public static final Connector MARIADB = new Connector(MariaDbConnector.class.getName()); - private final StateHandler stateHandler; - private final SourceRecordConsumerHandler sourceRecordConsumerHandler; private final AgroalParser agroalParser; + private final DebeziumFactory debeziumFactory; @Inject - public MariaDbEngineProducer(StateHandler stateHandler, - SourceRecordConsumerHandler sourceRecordConsumerHandler, - AgroalParser agroalParser) { - this.stateHandler = stateHandler; - this.sourceRecordConsumerHandler = sourceRecordConsumerHandler; + public MariaDbEngineProducer(AgroalParser agroalParser, + DebeziumFactory debeziumFactory) { this.agroalParser = agroalParser; + this.debeziumFactory = debeziumFactory; } @Produces @@ -55,18 +51,11 @@ public DebeziumConnectorRegistry engine(DebeziumEngineConfiguration debeziumEngi private final Map engines = multiEngineConfigurations .stream() .map(engine -> { - EngineManifest engineManifest = new EngineManifest(engine.engineId()); - Map debeziumConfiguration = engine.configuration(); - // remove unnecessary configuration for sqlserver debeziumConfiguration.remove(DATABASE_CONFIG_PREFIX + JdbcConfiguration.DATABASE.name()); - return Map.entry(engine.engineId(), new SourceRecordDebezium( - engine.configuration(), - stateHandler, - MARIADB, - sourceRecordConsumerHandler.get(engineManifest), engineManifest)); + return Map.entry(engine.engineId(), debeziumFactory.get(MARIADB, engine)); }) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); diff --git a/quarkus-debezium-parent/quarkus-debezium-mongodb-parent/runtime/src/main/java/io/quarkus/debezium/engine/MongoDbEngineProducer.java b/quarkus-debezium-parent/quarkus-debezium-mongodb-parent/runtime/src/main/java/io/quarkus/debezium/engine/MongoDbEngineProducer.java index f8e5a6e53c..94aa19564e 100644 --- a/quarkus-debezium-parent/quarkus-debezium-mongodb-parent/runtime/src/main/java/io/quarkus/debezium/engine/MongoDbEngineProducer.java +++ b/quarkus-debezium-parent/quarkus-debezium-mongodb-parent/runtime/src/main/java/io/quarkus/debezium/engine/MongoDbEngineProducer.java @@ -29,37 +29,23 @@ import io.quarkus.debezium.configuration.DebeziumConfigurationEngineParser.MultiEngineConfiguration; import io.quarkus.debezium.configuration.MongoDbDatasourceConfiguration; import io.quarkus.debezium.configuration.MultiEngineMongoDbDatasourceConfiguration; -import io.quarkus.debezium.engine.capture.consumer.SourceRecordConsumerHandler; import io.quarkus.debezium.notification.QuarkusNotificationChannel; public class MongoDbEngineProducer implements ConnectorProducer { public static final Connector MONGODB = new Connector(MongoDbConnector.class.getName()); - private final StateHandler stateHandler; private final Map quarkusDatasourceConfigurations; + private DebeziumFactory debeziumFactory; private final QuarkusNotificationChannel channel; - private final SourceRecordConsumerHandler sourceRecordConsumerHandler; private final DebeziumConfigurationEngineParser engineParser = new DebeziumConfigurationEngineParser(); @Inject - public MongoDbEngineProducer(StateHandler stateHandler, - MultiEngineMongoDbDatasourceConfiguration multiEngineMongoDbDatasourceConfiguration, + public MongoDbEngineProducer(MultiEngineMongoDbDatasourceConfiguration multiEngineMongoDbDatasourceConfiguration, QuarkusNotificationChannel channel, - SourceRecordConsumerHandler sourceRecordConsumerHandler) { - this.stateHandler = stateHandler; + DebeziumFactory debeziumFactory) { this.channel = channel; - this.sourceRecordConsumerHandler = sourceRecordConsumerHandler; this.quarkusDatasourceConfigurations = multiEngineMongoDbDatasourceConfiguration.get(); - } - - public MongoDbEngineProducer(StateHandler stateHandler, - Map quarkusDatasourceConfigurations, - QuarkusNotificationChannel channel, - SourceRecordConsumerHandler sourceRecordConsumerHandler) { - this.stateHandler = stateHandler; - this.quarkusDatasourceConfigurations = quarkusDatasourceConfigurations; - this.channel = channel; - this.sourceRecordConsumerHandler = sourceRecordConsumerHandler; + this.debeziumFactory = debeziumFactory; } @Produces @@ -79,15 +65,7 @@ public DebeziumConnectorRegistry engine(DebeziumEngineConfiguration debeziumEngi return new DebeziumConnectorRegistry() { private final Map engines = enrichedMultiEngineConfigurations .stream() - .map(engine -> { - EngineManifest engineManifest = new EngineManifest(engine.engineId()); - - return Map.entry(engine.engineId(), new SourceRecordDebezium( - engine.configuration(), - stateHandler, - MONGODB, - sourceRecordConsumerHandler.get(engineManifest), engineManifest)); - }) + .map(engine -> Map.entry(engine.engineId(), debeziumFactory.get(MONGODB, engine))) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); @Override diff --git a/quarkus-debezium-parent/quarkus-debezium-mysql-parent/runtime/src/main/java/io/quarkus/debezium/engine/MySqlEngineProducer.java b/quarkus-debezium-parent/quarkus-debezium-mysql-parent/runtime/src/main/java/io/quarkus/debezium/engine/MySqlEngineProducer.java index 4bf5280264..d589149d87 100644 --- a/quarkus-debezium-parent/quarkus-debezium-mysql-parent/runtime/src/main/java/io/quarkus/debezium/engine/MySqlEngineProducer.java +++ b/quarkus-debezium-parent/quarkus-debezium-mysql-parent/runtime/src/main/java/io/quarkus/debezium/engine/MySqlEngineProducer.java @@ -26,23 +26,18 @@ import io.quarkus.datasource.common.runtime.DatabaseKind; import io.quarkus.debezium.agroal.engine.AgroalParser; import io.quarkus.debezium.configuration.DebeziumConfigurationEngineParser.MultiEngineConfiguration; -import io.quarkus.debezium.engine.capture.consumer.SourceRecordConsumerHandler; public class MySqlEngineProducer implements ConnectorProducer { public static final Connector MYSQL = new Connector(MySqlConnector.class.getName()); - private final StateHandler stateHandler; - private final SourceRecordConsumerHandler sourceRecordConsumerHandler; private final AgroalParser agroalParser; + private final DebeziumFactory debeziumFactory; @Inject - public MySqlEngineProducer(StateHandler stateHandler, - SourceRecordConsumerHandler sourceRecordConsumerHandler, - AgroalParser agroalParser) { - this.stateHandler = stateHandler; - this.sourceRecordConsumerHandler = sourceRecordConsumerHandler; + public MySqlEngineProducer(AgroalParser agroalParser, DebeziumFactory debeziumFactory) { this.agroalParser = agroalParser; + this.debeziumFactory = debeziumFactory; } @Produces @@ -56,18 +51,11 @@ public DebeziumConnectorRegistry engine(DebeziumEngineConfiguration debeziumEngi private final Map engines = multiEngineConfigurations .stream() .map(engine -> { - EngineManifest engineManifest = new EngineManifest(engine.engineId()); - - Map debeziumConfiguration = engine.configuration(); - // remove unnecessary configuration - debeziumConfiguration.remove(DATABASE_CONFIG_PREFIX + JdbcConfiguration.DATABASE.name()); + engine.configuration() + .remove(DATABASE_CONFIG_PREFIX + JdbcConfiguration.DATABASE.name()); - return Map.entry(engine.engineId(), new SourceRecordDebezium( - engine.configuration(), - stateHandler, - MYSQL, - sourceRecordConsumerHandler.get(engineManifest), engineManifest)); + return Map.entry(engine.engineId(), debeziumFactory.get(MYSQL, engine)); }) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/runtime/src/main/java/io/quarkus/debezium/engine/PostgresEngineProducer.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/runtime/src/main/java/io/quarkus/debezium/engine/PostgresEngineProducer.java index 40e6ec9d4a..8a3e926fb2 100644 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/runtime/src/main/java/io/quarkus/debezium/engine/PostgresEngineProducer.java +++ b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/runtime/src/main/java/io/quarkus/debezium/engine/PostgresEngineProducer.java @@ -44,6 +44,7 @@ public class PostgresEngineProducer implements ConnectorProducer { private final StateHandler stateHandler; private final Map quarkusDatasourceConfigurations; + private DebeziumFactory debeziumFactory; private final QuarkusNotificationChannel channel; private final SourceRecordConsumerHandler sourceRecordConsumerHandler; private final DebeziumConfigurationEngineParser engineParser = new DebeziumConfigurationEngineParser(); @@ -52,13 +53,14 @@ public class PostgresEngineProducer implements ConnectorProducer { public PostgresEngineProducer(StateHandler stateHandler, Instance configurations, QuarkusNotificationChannel channel, - SourceRecordConsumerHandler sourceRecordConsumerHandler) { + SourceRecordConsumerHandler sourceRecordConsumerHandler, DebeziumFactory debeziumFactory) { this.stateHandler = stateHandler; this.channel = channel; this.sourceRecordConsumerHandler = sourceRecordConsumerHandler; this.quarkusDatasourceConfigurations = configurations .stream() .collect(Collectors.toMap(QuarkusDatasourceConfiguration::getSanitizedName, Function.identity())); + this.debeziumFactory = debeziumFactory; } public PostgresEngineProducer(StateHandler stateHandler, @@ -96,15 +98,7 @@ public DebeziumConnectorRegistry engine(DebeziumEngineConfiguration debeziumEngi return new DebeziumConnectorRegistry() { private final Map engines = enrichedMultiEngineConfigurations .stream() - .map(engine -> { - EngineManifest engineManifest = new EngineManifest(engine.engineId()); - - return Map.entry(engine.engineId(), new SourceRecordDebezium( - engine.configuration(), - stateHandler, - POSTGRES, - sourceRecordConsumerHandler.get(engineManifest), engineManifest)); - }) + .map(engine -> Map.entry(engine.engineId(), debeziumFactory.get(POSTGRES, engine))) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); @Override diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/runtime/src/test/java/io/quarkus/debezium/engine/PostgresEngineProducerTest.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/runtime/src/test/java/io/quarkus/debezium/engine/PostgresEngineProducerTest.java deleted file mode 100644 index b5a2acb75e..0000000000 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/runtime/src/test/java/io/quarkus/debezium/engine/PostgresEngineProducerTest.java +++ /dev/null @@ -1,249 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ - -package io.quarkus.debezium.engine; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; - -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; - -import io.debezium.runtime.DebeziumConnectorRegistry; -import io.debezium.runtime.EngineManifest; -import io.debezium.runtime.configuration.DebeziumEngineConfiguration; -import io.debezium.runtime.configuration.DevServicesConfig; -import io.quarkus.debezium.configuration.PostgresDatasourceConfiguration; -import io.quarkus.debezium.notification.QuarkusNotificationChannel; - -class PostgresEngineProducerTest { - - private final QuarkusNotificationChannel quarkusNotificationChannel = mock(QuarkusNotificationChannel.class); - - @Test - @DisplayName("should merge configurations when debezium configuration doesn't contain datasource information") - void shouldMergeConfigurationsWhenDebeziumConfigurationIsWithoutDatasourceInformation() { - when(quarkusNotificationChannel.name()).thenReturn("a_name"); - - var underTest = new PostgresEngineProducer(mock(DefaultStateHandler.class), - Map.of( - "default", - new PostgresDatasourceConfiguration( - "host", - "username", - "password", - "database", - "1926", - true, - "")), - quarkusNotificationChannel, - ignore -> { - return event -> { - }; - }); - - assertThat(underTest.engine(new DebeziumEngineConfiguration() { - @Override - public Map defaultConfiguration() { - return new HashMap<>(Map.of("name", "test")); - } - - @Override - public Map capturing() { - return Map.of(); - } - - @Override - public Map devservices() { - return Map.of(); - } - }).get(new EngineManifest("default")) - .configuration()) - .isEqualTo(Map.of( - "connector.class", "io.debezium.connector.postgresql.PostgresConnector", - "name", "default", - "database.hostname", "host", - "database.port", "1926", - "database.user", "username", - "database.password", "password", - "notification.enabled.channels", "a_name", - "database.dbname", "database")); - } - - @Test - @DisplayName("should use debezium configurations when contains datasource information") - void shouldUseDebeziumConfigurationWhenContainsDatasourceInformation() { - when(quarkusNotificationChannel.name()).thenReturn("a_name"); - - var underTest = new PostgresEngineProducer(mock(DefaultStateHandler.class), - Map.of( - "default", new PostgresDatasourceConfiguration( - "host", - "username", - "password", - "database", - "1926", - true, - "")), - quarkusNotificationChannel, - ignore -> { - return event -> { - }; - }); - - assertThat(underTest.engine(new DebeziumEngineConfiguration() { - @Override - public Map defaultConfiguration() { - return new HashMap<>(Map.of("name", "test", "database.hostname", "native")); - } - - @Override - public Map capturing() { - return Map.of(); - } - - @Override - public Map devservices() { - return Map.of(); - } - }).get(new EngineManifest("default")) - .configuration()) - .isEqualTo(Map.of( - "connector.class", "io.debezium.connector.postgresql.PostgresConnector", - "name", "test", - "notification.enabled.channels", "a_name", - "database.hostname", "native")); - } - - @Test - @DisplayName("should create multiple configurations based on debezium and quarkus datasource") - void shouldCreateMultipleConfigurationsBasedOnDebeziumAndQuarkusDatasource() { - when(quarkusNotificationChannel.name()).thenReturn("a_name"); - - var underTest = new PostgresEngineProducer(mock(DefaultStateHandler.class), - Map.of( - "default", - new PostgresDatasourceConfiguration( - "host", - "username", - "password", - "database", - "1926", - true, - ""), - "another", - new PostgresDatasourceConfiguration( - "another_host", - "another_username", - "another_password", - "another_database", - "1926", - false, - "another")), - quarkusNotificationChannel, - ignore -> event -> { - }); - - DebeziumConnectorRegistry registry = underTest.engine(new DebeziumEngineConfiguration() { - @Override - public Map defaultConfiguration() { - return Collections.emptyMap(); - } - - @Override - public Map capturing() { - return Map.of("default", new Capturing() { - @Override - public Optional engineId() { - return Optional.of("default"); - } - - @Override - public Optional destination() { - return Optional.empty(); - } - - @Override - public Optional deserializer() { - return Optional.empty(); - } - - @Override - public Map deserializers() { - return Map.of(); - } - - @Override - public Map configurations() { - return Map.of("configuration.key", "default_value"); - } - }, "another", new Capturing() { - @Override - public Optional engineId() { - return Optional.of("another"); - } - - @Override - public Optional destination() { - return Optional.empty(); - } - - @Override - public Optional deserializer() { - return Optional.empty(); - } - - @Override - public Map deserializers() { - return Map.of(); - } - - @Override - public Map configurations() { - return Map.of("configuration.key", "another_value"); - } - }); - } - - @Override - public Map devservices() { - return Map.of(); - } - }); - - assertThat(registry.get(new EngineManifest("default")) - .configuration()) - .isEqualTo(Map.of( - "configuration.key", "default_value", - "connector.class", "io.debezium.connector.postgresql.PostgresConnector", - "name", "default", - "database.hostname", "host", - "database.port", "1926", - "database.user", "username", - "database.password", "password", - "notification.enabled.channels", "a_name", - "database.dbname", "database")); - - assertThat(registry.get(new EngineManifest("another")) - .configuration()) - .isEqualTo(Map.of( - "configuration.key", "another_value", - "connector.class", "io.debezium.connector.postgresql.PostgresConnector", - "name", "another", - "database.hostname", "another_host", - "database.port", "1926", - "database.user", "another_username", - "database.password", "another_password", - "notification.enabled.channels", "a_name", - "database.dbname", "another_database")); - } -} diff --git a/quarkus-debezium-parent/quarkus-debezium-sqlserver-parent/runtime/src/main/java/io/quarkus/debezium/engine/SqlServerEngineProducer.java b/quarkus-debezium-parent/quarkus-debezium-sqlserver-parent/runtime/src/main/java/io/quarkus/debezium/engine/SqlServerEngineProducer.java index 2c345dc592..996f09fdc9 100644 --- a/quarkus-debezium-parent/quarkus-debezium-sqlserver-parent/runtime/src/main/java/io/quarkus/debezium/engine/SqlServerEngineProducer.java +++ b/quarkus-debezium-parent/quarkus-debezium-sqlserver-parent/runtime/src/main/java/io/quarkus/debezium/engine/SqlServerEngineProducer.java @@ -27,22 +27,17 @@ import io.quarkus.datasource.common.runtime.DatabaseKind; import io.quarkus.debezium.agroal.engine.AgroalParser; import io.quarkus.debezium.configuration.DebeziumConfigurationEngineParser.MultiEngineConfiguration; -import io.quarkus.debezium.engine.capture.consumer.SourceRecordConsumerHandler; public class SqlServerEngineProducer implements ConnectorProducer { public static final Connector SQLSERVER = new Connector(SqlServerConnector.class.getName()); - private final StateHandler stateHandler; - private final SourceRecordConsumerHandler sourceRecordConsumerHandler; private final AgroalParser agroalParser; + private final DebeziumFactory debeziumFactory; @Inject - public SqlServerEngineProducer(StateHandler stateHandler, - SourceRecordConsumerHandler sourceRecordConsumerHandler, - AgroalParser agroalParser) { - this.stateHandler = stateHandler; - this.sourceRecordConsumerHandler = sourceRecordConsumerHandler; + public SqlServerEngineProducer(AgroalParser agroalParser, DebeziumFactory debeziumFactory) { this.agroalParser = agroalParser; + this.debeziumFactory = debeziumFactory; } @Produces @@ -55,18 +50,11 @@ public DebeziumConnectorRegistry engine(DebeziumEngineConfiguration debeziumEngi private final Map engines = multiEngineConfigurations .stream() .map(engine -> { - EngineManifest engineManifest = new EngineManifest(engine.engineId()); - - Map debeziumConfiguration = engine.configuration(); - // remove unnecessary configuration for sqlserver - debeziumConfiguration.remove(DATABASE_CONFIG_PREFIX + JdbcConfiguration.DATABASE.name()); + engine.configuration() + .remove(DATABASE_CONFIG_PREFIX + JdbcConfiguration.DATABASE.name()); - return Map.entry(engine.engineId(), new SourceRecordDebezium( - engine.configuration(), - stateHandler, - SQLSERVER, - sourceRecordConsumerHandler.get(engineManifest), engineManifest)); + return Map.entry(engine.engineId(), debeziumFactory.get(SQLSERVER, engine)); }) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));