diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/pom.xml b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/pom.xml index ed6cf564ee..c1d4b48c92 100644 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/pom.xml +++ b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/pom.xml @@ -30,8 +30,8 @@ quarkus-jdbc-postgresql-deployment - io.quarkus - quarkus-agroal-deployment + io.debezium.quarkus + quarkus-debezium-agroal-deployment @@ -63,6 +63,17 @@ quarkus-jdbc-postgresql test + + io.debezium.quarkus + quarkus-debezium-testsuite-deployment + test-jar + test + + + org.junit.platform + junit-platform-suite + test + diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/main/java/io/quarkus/debezium/postgres/deployment/PostgresEngineProcessor.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/main/java/io/quarkus/debezium/postgres/deployment/PostgresEngineProcessor.java index 763f8db584..15a2516ee2 100644 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/main/java/io/quarkus/debezium/postgres/deployment/PostgresEngineProcessor.java +++ b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/main/java/io/quarkus/debezium/postgres/deployment/PostgresEngineProcessor.java @@ -5,10 +5,6 @@ */ package io.quarkus.debezium.postgres.deployment; -import java.util.List; - -import jakarta.inject.Singleton; - import io.debezium.connector.postgresql.Module; import io.debezium.connector.postgresql.PostgresConnector; import io.debezium.connector.postgresql.PostgresConnectorTask; @@ -16,60 +12,39 @@ import io.debezium.connector.postgresql.snapshot.lock.NoSnapshotLock; import io.debezium.connector.postgresql.snapshot.lock.SharedSnapshotLock; import io.debezium.connector.postgresql.snapshot.query.SelectAllSnapshotQuery; -import io.debezium.runtime.configuration.QuarkusDatasourceConfiguration; -import io.quarkus.agroal.spi.JdbcDataSourceBuildItem; -import io.quarkus.arc.deployment.SyntheticBeanBuildItem; import io.quarkus.datasource.deployment.spi.DevServicesDatasourceResultBuildItem; -import io.quarkus.debezium.configuration.DatasourceRecorder; +import io.quarkus.debezium.agroal.configuration.AgroalDatasourceConfiguration; +import io.quarkus.debezium.deployment.QuarkusEngineProcessor; import io.quarkus.debezium.deployment.items.DebeziumConnectorBuildItem; +import io.quarkus.debezium.deployment.items.DebeziumExtensionNameBuildItem; import io.quarkus.debezium.engine.PostgresEngineProducer; import io.quarkus.deployment.IsNormal; import io.quarkus.deployment.annotations.BuildProducer; import io.quarkus.deployment.annotations.BuildStep; -import io.quarkus.deployment.annotations.ExecutionTime; -import io.quarkus.deployment.annotations.Record; import io.quarkus.deployment.builditem.DevServicesResultBuildItem; -import io.quarkus.deployment.builditem.FeatureBuildItem; import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; import io.quarkus.deployment.dev.devservices.DevServicesConfig; import io.quarkus.deployment.pkg.steps.NativeOrNativeSourcesBuild; -public class PostgresEngineProcessor { +public class PostgresEngineProcessor implements QuarkusEngineProcessor { public static final String POSTGRESQL = Module.name(); @BuildStep - FeatureBuildItem feature() { - return new FeatureBuildItem("debezium-" + POSTGRESQL); + @Override + public DebeziumExtensionNameBuildItem debeziumExtensionNameBuildItem() { + return new DebeziumExtensionNameBuildItem(POSTGRESQL); } @BuildStep + @Override public DebeziumConnectorBuildItem engine() { return new DebeziumConnectorBuildItem(POSTGRESQL, PostgresEngineProducer.class); } - @BuildStep - @Record(ExecutionTime.RUNTIME_INIT) - public void generateDatasourceConfig( - DatasourceRecorder datasourceRecorder, - BuildProducer producer, - List jdbcDataSources) { - - jdbcDataSources - .stream() - .filter(item -> item.getDbKind().equals(POSTGRESQL)) - .forEach(item -> producer.produce(SyntheticBeanBuildItem - .configure(QuarkusDatasourceConfiguration.class) - .scope(Singleton.class) - .supplier(datasourceRecorder.convert(item.getName(), item.isDefault())) - .setRuntimeInit() - .named(item.getDbKind() + item.getName()) - .done())); - } - @BuildStep(onlyIfNot = IsNormal.class, onlyIf = DevServicesConfig.Enabled.class) - void configure(BuildProducer devServicesProducer, - DevServicesDatasourceResultBuildItem devServicesDatasourceResultBuildItem) { + void devServices(BuildProducer devServicesProducer, + DevServicesDatasourceResultBuildItem devServicesDatasourceResultBuildItem) { DevServicesDatasourceResultBuildItem.DbResult datasource = devServicesDatasourceResultBuildItem.getDefaultDatasource(); if (datasource == null) { @@ -82,12 +57,13 @@ void configure(BuildProducer devServicesProducer, devServicesProducer.produce(new DevServicesResultBuildItem("debezium-postgres", "debezium", - QuarkusDatasource.generateDebeziumConfiguration(datasource.getConfigProperties()))); + datasource.getConfigProperties())); } @BuildStep(onlyIf = NativeOrNativeSourcesBuild.class) - void registerClassesThatAreLoadedThroughReflection(BuildProducer reflectiveClasses) { - reflectiveClasses.produce(ReflectiveClassBuildItem.builder( + @Override + public void registerClassesThatAreLoadedThroughReflection(BuildProducer reflectiveClassBuildItemBuildProducer) { + reflectiveClassBuildItemBuildProducer.produce(ReflectiveClassBuildItem.builder( PostgresConnector.class, PostgresSourceInfoStructMaker.class, PostgresConnectorTask.class, @@ -98,4 +74,9 @@ void registerClassesThatAreLoadedThroughReflection(BuildProducer quarkusDatasourceConfiguration() { + return AgroalDatasourceConfiguration.class; + } + } diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/main/java/io/quarkus/debezium/postgres/deployment/QuarkusDatasource.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/main/java/io/quarkus/debezium/postgres/deployment/QuarkusDatasource.java deleted file mode 100644 index bfa5695f24..0000000000 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/main/java/io/quarkus/debezium/postgres/deployment/QuarkusDatasource.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ - -package io.quarkus.debezium.postgres.deployment; - -import java.net.URI; -import java.util.Map; - -public class QuarkusDatasource { - public static Map generateDebeziumConfiguration(Map properties) { - URI uri = URI.create(properties.get("quarkus.datasource.jdbc.url").substring(5)); - - return Map.of( - "quarkus.debezium.database.hostname", uri.getHost(), - "quarkus.debezium.database.user", properties.get("quarkus.datasource.username"), - "quarkus.debezium.database.password", properties.get("quarkus.datasource.password"), - "quarkus.debezium.database.dbname", uri.getPath().substring(1), - "quarkus.debezium.database.port", String.valueOf(uri.getPort())); - } -} \ No newline at end of file diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/test/java/io/quarkus/debezium/postgres/deployment/CapturingTest.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/test/java/io/quarkus/debezium/postgres/deployment/CapturingTest.java deleted file mode 100644 index 5884c01f10..0000000000 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/test/java/io/quarkus/debezium/postgres/deployment/CapturingTest.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ - -package io.quarkus.debezium.postgres.deployment; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.given; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.inject.Inject; - -import org.apache.kafka.connect.source.SourceRecord; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; - -import io.debezium.runtime.Capturing; -import io.debezium.runtime.CapturingEvent; -import io.quarkus.debezium.engine.deserializer.CapturingEventDeserializerRegistry; -import io.quarkus.debezium.engine.deserializer.MutableCapturingEventDeserializerRegistry; -import io.quarkus.debezium.engine.deserializer.ObjectMapperDeserializer; -import io.quarkus.test.QuarkusUnitTest; -import io.quarkus.test.common.QuarkusTestResource; - -@QuarkusTestResource(value = DatabaseTestResource.class) -public class CapturingTest { - - @Inject - CaptureProductsHandler captureProductsHandler; - - @Inject - CapturingEventDeserializerRegistry registry; - - @BeforeEach - void setUp() { - var mutableRegistry = (MutableCapturingEventDeserializerRegistry) registry; - mutableRegistry.register("dbserver1.public.orders", new OrderDeserializer()); - mutableRegistry.register("dbserver1.public.users", new UserDeserializer()); - } - - @RegisterExtension - static final QuarkusUnitTest setup = new QuarkusUnitTest() - .withApplicationRoot((jar) -> jar.addClasses(CaptureProductsHandler.class)) - .overrideConfigKey("quarkus.debezium.offset.storage", "org.apache.kafka.connect.storage.MemoryOffsetBackingStore") - .overrideConfigKey("quarkus.debezium.name", "test") - .overrideConfigKey("quarkus.debezium.topic.prefix", "dbserver1") - .overrideConfigKey("quarkus.debezium.plugin.name", "pgoutput") - .overrideConfigKey("quarkus.debezium.snapshot.mode", "initial") - .overrideConfigKey("quarkus.debezium.capturing.orders.destination", "dbserver1.public.orders") - .overrideConfigKey("quarkus.datasource.devservices.enabled", "false"); - - @Test - @DisplayName("should invoke the default capture") - void shouldInvokeDefaultCapture() { - given().await() - .atMost(100, TimeUnit.SECONDS) - .untilAsserted(() -> assertThat(captureProductsHandler.isInvoked()).isTrue()); - - } - - @Test - @DisplayName("should call the filtered by destination capture") - void shouldInvokeFilteredByDestinationCapture() { - given().await() - .atMost(100, TimeUnit.SECONDS) - .untilAsserted(() -> assertThat(captureProductsHandler.filteredEvent()).isEqualTo(6)); - } - - @Test - @DisplayName("should map and capture 'capturing' orders filtered by destination") - void shouldMapAndCaptureOrdersFilteredByDestination() { - given().await() - .atMost(100, TimeUnit.SECONDS) - .untilAsserted(() -> assertThat(captureProductsHandler.getOrders()).containsExactlyInAnyOrder( - new Order(1, "one"), - new Order(2, "two"))); - } - - @Test - @DisplayName("should map and capture users filtered by destination") - void shouldMapAndCaptureUsersFilteredByDestination() { - given().await() - .atMost(100, TimeUnit.SECONDS) - .untilAsserted(() -> assertThat(captureProductsHandler.getUsers()).containsExactlyInAnyOrder( - new User(1, "giovanni", "developer"), - new User(2, "mario", "developer"))); - } - - @ApplicationScoped - static class CaptureProductsHandler { - private final AtomicBoolean isInvoked = new AtomicBoolean(false); - private final AtomicInteger isCapturingFilteredEvent = new AtomicInteger(0); - private final List orders = new ArrayList<>(); - private final List users = new ArrayList<>(); - - @Capturing() - public void newCapture(CapturingEvent event) { - isInvoked.set(true); - } - - @Capturing(destination = "dbserver1.public.injected") - public void anotherCapture(CapturingEvent event) { - isCapturingFilteredEvent.incrementAndGet(); - } - - @Capturing(destination = "dbserver1.public.orders") - public void deserializedCapture(CapturingEvent event) { - orders.add(event.record()); - } - - @Capturing(destination = "dbserver1.public.users") - public void deserialized(User user) { - users.add(user); - } - - public boolean isInvoked() { - return isInvoked.getAndSet(false); - } - - public List getOrders() { - return orders; - } - - public List getUsers() { - return users; - } - - public int filteredEvent() { - return isCapturingFilteredEvent.getAndSet(0); - } - - } - - public record Order(int id, String name) { - } - - public record User(int id, String name, String description) { - - } - - public static class OrderDeserializer extends ObjectMapperDeserializer { - public OrderDeserializer() { - super(Order.class); - } - } - - public static class UserDeserializer extends ObjectMapperDeserializer { - - public UserDeserializer() { - super(User.class); - } - } - -} diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/test/java/io/quarkus/debezium/postgres/deployment/CustomConverterTest.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/test/java/io/quarkus/debezium/postgres/deployment/CustomConverterTest.java deleted file mode 100644 index e7c2686101..0000000000 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/test/java/io/quarkus/debezium/postgres/deployment/CustomConverterTest.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ - -package io.quarkus.debezium.postgres.deployment; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.given; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.inject.Inject; - -import org.apache.kafka.connect.data.SchemaBuilder; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; - -import io.debezium.relational.CustomConverterRegistry.ConverterDefinition; -import io.debezium.runtime.CustomConverter; -import io.debezium.runtime.DebeziumConnectorRegistry; -import io.debezium.runtime.DebeziumStatus; -import io.debezium.runtime.EngineManifest; -import io.debezium.runtime.FieldFilterStrategy; -import io.debezium.spi.converter.ConvertedField; -import io.quarkus.test.QuarkusUnitTest; -import io.quarkus.test.common.QuarkusTestResource; - -@QuarkusTestResource(value = DatabaseTestResource.class) -public class CustomConverterTest { - - @Inject - CustomConverterBinder customConverterBinder; - - @Inject - CustomFieldFilterStrategy fieldFilterStrategy; - - @Inject - DebeziumConnectorRegistry registry; - - @RegisterExtension - static final QuarkusUnitTest setup = new QuarkusUnitTest() - .withApplicationRoot((jar) -> jar - .addClasses(CustomConverterTest.CustomConverterBinder.class)) - .overrideConfigKey("quarkus.debezium.offset.storage", "org.apache.kafka.connect.storage.MemoryOffsetBackingStore") - .overrideConfigKey("quarkus.debezium.name", "test") - .overrideConfigKey("quarkus.debezium.topic.prefix", "dbserver1") - .overrideConfigKey("quarkus.debezium.plugin.name", "pgoutput") - .overrideConfigKey("quarkus.debezium.snapshot.mode", "initial") - .overrideConfigKey("quarkus.datasource.devservices.enabled", "false"); - - @BeforeEach - void setUp() { - given().await() - .atMost(10, TimeUnit.SECONDS) - .untilAsserted(() -> assertThat(registry.get(new EngineManifest("default")).status()) - .isEqualTo(new DebeziumStatus(DebeziumStatus.State.POLLING))); - } - - @Test - @DisplayName("should apply custom converter") - void shouldApplyCustomConverter() { - given().await() - .atMost(100, TimeUnit.SECONDS) - .untilAsserted(() -> assertThat(customConverterBinder.isInvoked()).isTrue()); - } - - @Test - @DisplayName("should evaluate filter strategy for custom converter if defined") - void shouldUseFilterWhenDefinedInCustomConverter() { - given().await() - .atMost(100, TimeUnit.SECONDS) - .untilAsserted(() -> assertThat(fieldFilterStrategy.isInvoked()).isTrue()); - } - - @ApplicationScoped - public static class CustomConverterBinder { - private final AtomicBoolean invoked = new AtomicBoolean(false); - - @CustomConverter - public ConverterDefinition bind(ConvertedField field) { - invoked.set(true); - return new ConverterDefinition<>(SchemaBuilder.string(), String::valueOf); - } - - @CustomConverter(filter = CustomFieldFilterStrategy.class) - public ConverterDefinition filteredBind(ConvertedField field) { - return new ConverterDefinition<>(SchemaBuilder.string(), String::valueOf); - } - - public boolean isInvoked() { - return invoked.getAndSet(false); - } - } - - @ApplicationScoped - public static class CustomFieldFilterStrategy implements FieldFilterStrategy { - private final AtomicBoolean invoked = new AtomicBoolean(false); - - @Override - public boolean filter(ConvertedField field) { - invoked.set(true); - return false; - } - - public boolean isInvoked() { - return invoked.getAndSet(false); - } - } -} diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/test/java/io/quarkus/debezium/postgres/deployment/DatabaseTestResource.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/test/java/io/quarkus/debezium/postgres/deployment/DatabaseTestResource.java deleted file mode 100644 index 369c5aa880..0000000000 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/test/java/io/quarkus/debezium/postgres/deployment/DatabaseTestResource.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ - -package io.quarkus.debezium.postgres.deployment; - -import java.time.Duration; -import java.util.Map; - -import org.testcontainers.containers.PostgreSQLContainer; -import org.testcontainers.containers.wait.strategy.Wait; -import org.testcontainers.utility.DockerImageName; - -import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; - -public class DatabaseTestResource implements QuarkusTestResourceLifecycleManager { - private static final String POSTGRES_IMAGE = "quay.io/debezium/postgres:15"; - - private static final DockerImageName POSTGRES_DOCKER_IMAGE_NAME = DockerImageName.parse(POSTGRES_IMAGE) - .asCompatibleSubstituteFor("postgres"); - - private static final PostgreSQLContainer postgresContainer = new PostgreSQLContainer<>(POSTGRES_DOCKER_IMAGE_NAME) - .waitingFor(Wait.forLogMessage(".*database system is ready to accept connections.*", 2)) - .withEnv("POSTGRES_INITDB_ARGS", "-E UTF8") - .withEnv("LANG", "en_US.utf8") - .withUsername("postgres") - .withPassword("postgres") - .withDatabaseName("postgres") - .withInitScript("init.sql") - .withStartupTimeout(Duration.ofSeconds(30)); - - @Override - public Map start() { - try { - postgresContainer.start(); - - return Map.of( - "quarkus.datasource.db-kind", "postgresql", - "quarkus.debezium.database.hostname", postgresContainer.getHost(), - "quarkus.debezium.database.user", postgresContainer.getUsername(), - "quarkus.debezium.database.password", postgresContainer.getPassword(), - "quarkus.debezium.database.dbname", postgresContainer.getDatabaseName(), - "quarkus.debezium.database.port", postgresContainer.getMappedPort(5432).toString()); - } - catch (Exception e) { - throw new RuntimeException(e); - } - } - - @Override - public void stop() { - try { - if (postgresContainer != null) { - postgresContainer.stop(); - } - } - catch (Exception ignore) { - } - } -} diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/test/java/io/quarkus/debezium/postgres/deployment/DebeziumLifeCycleTest.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/test/java/io/quarkus/debezium/postgres/deployment/DebeziumLifeCycleTest.java deleted file mode 100644 index e08371667f..0000000000 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/test/java/io/quarkus/debezium/postgres/deployment/DebeziumLifeCycleTest.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ - -package io.quarkus.debezium.postgres.deployment; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.given; - -import java.util.concurrent.TimeUnit; - -import jakarta.inject.Inject; - -import org.assertj.core.api.Assertions; -import org.jboss.shrinkwrap.api.ShrinkWrap; -import org.jboss.shrinkwrap.api.spec.JavaArchive; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; - -import io.debezium.runtime.DebeziumConnectorRegistry; -import io.debezium.runtime.DebeziumStatus; -import io.debezium.runtime.EngineManifest; -import io.quarkus.runtime.Application; -import io.quarkus.test.QuarkusUnitTest; -import io.quarkus.test.common.QuarkusTestResource; - -@QuarkusTestResource(value = DatabaseTestResource.class) -public class DebeziumLifeCycleTest { - - @Inject - DebeziumConnectorRegistry registry; - - @RegisterExtension - static final QuarkusUnitTest application = new QuarkusUnitTest() - .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)) - .overrideConfigKey("quarkus.debezium.offset.storage", "org.apache.kafka.connect.storage.MemoryOffsetBackingStore") - .overrideConfigKey("quarkus.debezium.name", "test") - .overrideConfigKey("quarkus.debezium.topic.prefix", "dbserver1") - .overrideConfigKey("quarkus.debezium.table.include.list", "inventory.products") - .overrideConfigKey("quarkus.debezium.plugin.name", "pgoutput") - .overrideConfigKey("quarkus.debezium.snapshot.mode", "no_data") - .overrideConfigKey("quarkus.datasource.devservices.enabled", "false") - .setLogRecordPredicate(record -> record.getLoggerName().equals("io.quarkus.debezium.engine.DebeziumRunner")) - .assertLogRecords((records) -> { - assertThat(records.getFirst().getMessage()).isEqualTo("Starting Debezium Engine debezium-connector-quarkus-extension-default-0"); - assertThat(records.get(1).getMessage()).isEqualTo("Shutting down Debezium Engine debezium-connector-quarkus-extension-default-0"); - }); - - @Test - @DisplayName("debezium should be integrated in the quarkus lifecycle") - void shouldDebeziumBeIntegratedInTheQuarkusLifeCycle() { - Assertions.assertThat(registry.get(new EngineManifest("default")).configuration().get("connector.class")) - .isEqualTo("io.debezium.connector.postgresql.PostgresConnector"); - - given().await() - .atMost(10, TimeUnit.SECONDS) - .untilAsserted(() -> Assertions.assertThat(registry.get(new EngineManifest("default")).status()) - .isEqualTo(new DebeziumStatus(DebeziumStatus.State.POLLING))); - - Application.currentApplication().close(); - - given().await() - .atMost(30, TimeUnit.SECONDS) - .untilAsserted(() -> Assertions.assertThat(registry.get(new EngineManifest("default")).status()) - .isEqualTo(new DebeziumStatus(DebeziumStatus.State.STOPPED))); - } -} diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/test/java/io/quarkus/debezium/postgres/deployment/HeartbeatTest.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/test/java/io/quarkus/debezium/postgres/deployment/HeartbeatTest.java deleted file mode 100644 index 9c864436e5..0000000000 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/test/java/io/quarkus/debezium/postgres/deployment/HeartbeatTest.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ - -package io.quarkus.debezium.postgres.deployment; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.given; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.enterprise.event.Observes; -import jakarta.inject.Inject; - -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; - -import io.debezium.runtime.events.DebeziumHeartbeat; -import io.quarkus.test.QuarkusUnitTest; -import io.quarkus.test.common.QuarkusTestResource; - -@QuarkusTestResource(value = DatabaseTestResource.class) -public class HeartbeatTest { - - @Inject - private HeartbeatHandler heartbeatHandler; - - @RegisterExtension - static final QuarkusUnitTest setup = new QuarkusUnitTest() - .withApplicationRoot((jar) -> jar - .addClasses(CapturingTest.CaptureProductsHandler.class)) - .overrideConfigKey("quarkus.debezium.offset.storage", "org.apache.kafka.connect.storage.MemoryOffsetBackingStore") - .overrideConfigKey("quarkus.debezium.heartbeat.interval.ms", "5") - .overrideConfigKey("quarkus.debezium.name", "test") - .overrideConfigKey("quarkus.debezium.topic.prefix", "dbserver1") - .overrideConfigKey("quarkus.debezium.plugin.name", "pgoutput") - .overrideConfigKey("quarkus.debezium.snapshot.mode", "initial") - .overrideConfigKey("quarkus.datasource.devservices.enabled", "false"); - - @Test - @DisplayName("should observe heartbeat events") - void shouldObserveHeartbeatEvents() { - given().await() - .atMost(100, TimeUnit.SECONDS) - .untilAsserted(() -> assertThat(heartbeatHandler.isInvoked()).isTrue()); - } - - @ApplicationScoped - public static class HeartbeatHandler { - private final AtomicBoolean invoked = new AtomicBoolean(false); - - public void observe(@Observes DebeziumHeartbeat heartBeat) { - invoked.set(true); - } - - public boolean isInvoked() { - return invoked.getAndSet(false); - } - } -} diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/test/java/io/quarkus/debezium/postgres/deployment/NotificationTest.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/test/java/io/quarkus/debezium/postgres/deployment/NotificationTest.java deleted file mode 100644 index 6cae205f47..0000000000 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/test/java/io/quarkus/debezium/postgres/deployment/NotificationTest.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ - -package io.quarkus.debezium.postgres.deployment; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.given; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.enterprise.event.Observes; -import jakarta.inject.Inject; - -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; - -import io.quarkus.debezium.notification.DebeziumNotification; -import io.quarkus.debezium.notification.SnapshotEvent; -import io.quarkus.test.QuarkusUnitTest; -import io.quarkus.test.common.QuarkusTestResource; - -@QuarkusTestResource(value = DatabaseTestResource.class) -public class NotificationTest { - - @Inject - private SnapshotNotificationsHandler snapshotNotificationsHandler; - - @Inject - private DebeziumNotificationsHandler debeziumNotificationsHandler; - - @RegisterExtension - static final QuarkusUnitTest setup = new QuarkusUnitTest() - .withApplicationRoot((jar) -> jar - .addClasses(CapturingTest.CaptureProductsHandler.class)) - .overrideConfigKey("quarkus.debezium.offset.storage", "org.apache.kafka.connect.storage.MemoryOffsetBackingStore") - .overrideConfigKey("quarkus.debezium.name", "test") - .overrideConfigKey("quarkus.debezium.topic.prefix", "dbserver1") - .overrideConfigKey("quarkus.debezium.plugin.name", "pgoutput") - .overrideConfigKey("quarkus.debezium.snapshot.mode", "initial") - .overrideConfigKey("quarkus.datasource.devservices.enabled", "false"); - - @Test - @DisplayName("should observe events for snapshot") - void shouldObserveSnapshotEvents() { - given().await() - .atMost(100, TimeUnit.SECONDS) - .untilAsserted(() -> assertThat(snapshotNotificationsHandler.isInvoked()).isTrue()); - - } - - @Test - @DisplayName("should observe general events") - void shouldObserveDebeziumEvents() { - given().await() - .atMost(100, TimeUnit.SECONDS) - .untilAsserted(() -> assertThat(debeziumNotificationsHandler.isInvoked()).isTrue()); - - } - - @ApplicationScoped - static class SnapshotNotificationsHandler { - private final AtomicBoolean invoked = new AtomicBoolean(false); - - public void observe(@Observes SnapshotEvent event) { - invoked.set(true); - } - - public boolean isInvoked() { - return invoked.getAndSet(false); - } - } - - @ApplicationScoped - static class DebeziumNotificationsHandler { - private final AtomicBoolean invoked = new AtomicBoolean(false); - - public void observe(@Observes DebeziumNotification event) { - invoked.set(true); - } - - public boolean isInvoked() { - return invoked.getAndSet(false); - } - } -} diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/test/java/io/quarkus/debezium/postgres/deployment/PostProcessingTest.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/test/java/io/quarkus/debezium/postgres/deployment/PostProcessingTest.java deleted file mode 100644 index 7af72dcbdc..0000000000 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/test/java/io/quarkus/debezium/postgres/deployment/PostProcessingTest.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ - -package io.quarkus.debezium.postgres.deployment; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.given; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.inject.Inject; - -import org.apache.kafka.connect.data.Struct; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; - -import io.debezium.runtime.DebeziumConnectorRegistry; -import io.debezium.runtime.DebeziumStatus; -import io.debezium.runtime.EngineManifest; -import io.debezium.runtime.PostProcessing; -import io.quarkus.test.QuarkusUnitTest; -import io.quarkus.test.common.QuarkusTestResource; - -@QuarkusTestResource(value = DatabaseTestResource.class) -public class PostProcessingTest { - - @Inject - PostProcessingHandler postProcessingHandler; - - @Inject - DebeziumConnectorRegistry debeziumConnectorRegistry; - - @BeforeEach - void setUp() { - given().await() - .atMost(10, TimeUnit.SECONDS) - .untilAsserted(() -> assertThat(debeziumConnectorRegistry.get(new EngineManifest("default")).status()) - .isEqualTo(new DebeziumStatus(DebeziumStatus.State.POLLING))); - } - - @RegisterExtension - static final QuarkusUnitTest setup = new QuarkusUnitTest() - .withApplicationRoot((jar) -> jar - .addClasses(PostProcessingTest.PostProcessingHandler.class)) - .overrideConfigKey("quarkus.debezium.offset.storage", "org.apache.kafka.connect.storage.MemoryOffsetBackingStore") - .overrideConfigKey("quarkus.debezium.name", "test") - .overrideConfigKey("quarkus.debezium.topic.prefix", "dbserver1") - .overrideConfigKey("quarkus.debezium.plugin.name", "pgoutput") - .overrideConfigKey("quarkus.debezium.snapshot.mode", "initial") - .overrideConfigKey("quarkus.datasource.devservices.enabled", "false"); - - @Test - @DisplayName("should use post processor") - void shouldApplyPostProcessor() { - given().await() - .atMost(100, TimeUnit.SECONDS) - .untilAsserted(() -> assertThat(postProcessingHandler.key()).isEqualTo(1)); - - } - - @ApplicationScoped - static class PostProcessingHandler { - private final List ids = new ArrayList<>(); - - @PostProcessing() - public void postProcessing(Object key, Struct struct) { - this.ids.add(((Struct) key) - .getInt32("id")); - } - - public int key() { - return ids.isEmpty() ? 0 : ids.getFirst(); - } - } -} diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/test/java/io/quarkus/debezium/postgres/deployment/PostgresDeploymentExtensionTest.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/test/java/io/quarkus/debezium/postgres/deployment/PostgresDeploymentExtensionTest.java new file mode 100644 index 0000000000..9553b4bc2c --- /dev/null +++ b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/test/java/io/quarkus/debezium/postgres/deployment/PostgresDeploymentExtensionTest.java @@ -0,0 +1,28 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.quarkus.debezium.postgres.deployment; + +import org.junit.platform.suite.api.AfterSuite; +import org.junit.platform.suite.api.BeforeSuite; +import org.junit.platform.suite.api.SuiteDisplayName; + +import io.quarkus.debezium.testsuite.deployment.QuarkusDebeziumSqlExtensionTestSuite; + +@SuiteDisplayName("Sql Server Debezium Extensions for Quarkus Test Suite") +public class PostgresDeploymentExtensionTest implements QuarkusDebeziumSqlExtensionTestSuite { + private static final PostgresResource postgresResource = new PostgresResource(); + + @BeforeSuite + public static void init() { + postgresResource.start(); + } + + @AfterSuite + public static void close() { + postgresResource.stop(); + } +} diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/test/java/io/quarkus/debezium/postgres/deployment/PostgresResource.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/test/java/io/quarkus/debezium/postgres/deployment/PostgresResource.java new file mode 100644 index 0000000000..2b48e9cba1 --- /dev/null +++ b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/test/java/io/quarkus/debezium/postgres/deployment/PostgresResource.java @@ -0,0 +1,43 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.quarkus.debezium.postgres.deployment; + +import java.time.Duration; + +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; + +public class PostgresResource { + + private static final String POSTGRES_IMAGE = "quay.io/debezium/postgres:15"; + + private static final DockerImageName POSTGRES_DOCKER_IMAGE_NAME = DockerImageName.parse(POSTGRES_IMAGE) + .asCompatibleSubstituteFor("postgres"); + + private static final PostgreSQLContainer POSTGRES_CONTAINER = new PostgreSQLContainer<>(POSTGRES_DOCKER_IMAGE_NAME) + .waitingFor(Wait.forLogMessage(".*database system is ready to accept connections.*", 2)) + .withEnv("POSTGRES_INITDB_ARGS", "-E UTF8") + .withEnv("LANG", "en_US.utf8") + .withUsername("postgres") + .withPassword("postgres") + .withDatabaseName("postgres") + .withInitScript("initialize-postgres-database.sql") + .withStartupTimeout(Duration.ofSeconds(30)); + + public void start() { + POSTGRES_CONTAINER.start(); + + System.setProperty("POSTGRES_JDBC", POSTGRES_CONTAINER.getJdbcUrl()); + System.setProperty("POSTGRES_PASSWORD", POSTGRES_CONTAINER.getPassword()); + System.setProperty("POSTGRES_USERNAME", POSTGRES_CONTAINER.getUsername()); + } + + public void stop() { + POSTGRES_CONTAINER.stop(); + } +} diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/test/resources/init.sql b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/test/resources/init.sql deleted file mode 100644 index 62bc42a926..0000000000 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/test/resources/init.sql +++ /dev/null @@ -1,38 +0,0 @@ -CREATE TABLE products( - id SERIAL PRIMARY KEY, - name TEXT NOT NULL -); - -CREATE TABLE orders( - id SERIAL PRIMARY KEY, - name TEXT NOT NULL -); - -CREATE TABLE users( - id SERIAL PRIMARY KEY, - name TEXT NOT NULL, - description TEXT NOT NULL -); - -CREATE TABLE not_assigned( - id SERIAL PRIMARY KEY, - name TEXT NOT NULL -); - -CREATE TABLE injected( - id SERIAL PRIMARY KEY, - name TEXT NOT NULL -); - -CREATE TABLE convert_table( - id SERIAL PRIMARY KEY, - num FLOAT NOT NULL -); - -INSERT INTO products (id, name) VALUES (1, 't-shirt'), (2, 'smartphone'); -INSERT INTO orders (id, name) VALUES (1, 'one'), (2,'two'); -INSERT INTO users (id, name, description) VALUES (1,'giovanni', 'developer'), (2,'mario', 'developer'); -INSERT INTO not_assigned (name) VALUES ('something'), ('should'), ('happens'); -INSERT INTO injected (name) VALUES ('called'), ('without'), ('injecting'); -INSERT INTO injected (name) VALUES ('called'), ('without'), ('injecting'); -INSERT INTO convert_table (num) VALUES (1), (2), (3); \ No newline at end of file diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/test/resources/initialize-postgres-database.sql b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/test/resources/initialize-postgres-database.sql new file mode 100644 index 0000000000..42b6103d87 --- /dev/null +++ b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/test/resources/initialize-postgres-database.sql @@ -0,0 +1,11 @@ +CREATE SCHEMA IF NOT EXISTS inventory; + +CREATE TABLE inventory.products(id INT NOT NULL PRIMARY KEY, name VARCHAR(255) NOT NULL); +CREATE TABLE inventory.general_table(id INT NOT NULL PRIMARY KEY); +CREATE TABLE inventory.orders(id INT NOT NULL PRIMARY KEY, "key" INT NOT NULL, name VARCHAR(255) NOT NULL); +CREATE TABLE inventory.users(id INT NOT NULL PRIMARY KEY, name VARCHAR(255) NOT NULL, description VARCHAR(255) NOT NULL); + +INSERT INTO inventory.general_table(id) VALUES (1); +INSERT INTO inventory.orders (id, "key", name) VALUES (1,1, 'one'), (2,2,'two'); +INSERT INTO inventory.users (id, name, description) VALUES (1,'giovanni', 'developer'), (2,'mario', 'developer'); +INSERT INTO inventory.products (id, name) VALUES (1,'t-shirt'), (2,'thinkpad'); \ No newline at end of file diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/test/resources/quarkus-debezium-testsuite.properties b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/test/resources/quarkus-debezium-testsuite.properties new file mode 100644 index 0000000000..4efd6bf607 --- /dev/null +++ b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/deployment/src/test/resources/quarkus-debezium-testsuite.properties @@ -0,0 +1,16 @@ +quarkus.debezium.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore +quarkus.debezium.name=test +quarkus.debezium.topic.prefix=topic +quarkus.debezium.database.names=postgres +quarkus.debezium.snapshot.mode=initial +quarkus.debezium.plugin.name=pgoutput +quarkus.debezium.capturing.orders.destination=topic.inventory.orders +quarkus.debezium.include.schema.changes=false +quarkus.debezium.heartbeat.interval.ms=5 + +quarkus.datasource.db-kind=postgresql +quarkus.datasource.username=${POSTGRES_USERNAME} +quarkus.datasource.password=${POSTGRES_PASSWORD} + +quarkus.datasource.jdbc.url=${POSTGRES_JDBC} +quarkus.datasource.jdbc.max-size=16 \ No newline at end of file diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/pom.xml b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/pom.xml index f9bc93eb67..c158ba1f6f 100644 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/pom.xml +++ b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/pom.xml @@ -46,6 +46,21 @@ postgresql test + + io.debezium.quarkus + quarkus-debezium-testsuite-integration-tests + test-jar + test + + + io.debezium.quarkus + quarkus-debezium-testsuite-integration-tests + + + org.junit.platform + junit-platform-suite + test + @@ -125,7 +140,7 @@ - ${project.basedir}/src/main/resources/sql/default.sql:/docker-entrypoint-initdb.d/default.sql + ${project.basedir}/src/main/resources/sql/default/default.sql:/docker-entrypoint-initdb.d/default.sql @@ -144,7 +159,7 @@ - ${project.basedir}/src/main/resources/sql/alternative.sql:/docker-entrypoint-initdb.d/alternative.sql + ${project.basedir}/src/main/resources/sql/alternative/alternative.sql:/docker-entrypoint-initdb.d/alternative.sql diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/deserializer/OrderDeserializer.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/deserializer/OrderDeserializer.java deleted file mode 100644 index dcd3098c75..0000000000 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/deserializer/OrderDeserializer.java +++ /dev/null @@ -1,16 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ - -package io.quarkus.sample.app.deserializer; - -import io.quarkus.debezium.engine.deserializer.ObjectMapperDeserializer; -import io.quarkus.sample.app.dto.Order; - -public class OrderDeserializer extends ObjectMapperDeserializer { - public OrderDeserializer() { - super(Order.class); - } -} diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/deserializer/ProductDeserializer.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/deserializer/ProductDeserializer.java deleted file mode 100644 index 978004935c..0000000000 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/deserializer/ProductDeserializer.java +++ /dev/null @@ -1,16 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ - -package io.quarkus.sample.app.deserializer; - -import io.quarkus.debezium.engine.deserializer.ObjectMapperDeserializer; -import io.quarkus.sample.app.dto.Product; - -public class ProductDeserializer extends ObjectMapperDeserializer { - public ProductDeserializer() { - super(Product.class); - } -} diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/dto/EngineInformation.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/dto/EngineInformation.java deleted file mode 100644 index 9e78f9ecd8..0000000000 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/dto/EngineInformation.java +++ /dev/null @@ -1,12 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.quarkus.sample.app.dto; - -import io.quarkus.runtime.annotations.RegisterForReflection; - -@RegisterForReflection -public record EngineInformation(String id, String connector) { -} diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/events/HeartbeatEventObserver.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/events/HeartbeatEventObserver.java deleted file mode 100644 index b75839dc91..0000000000 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/events/HeartbeatEventObserver.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ - -package io.quarkus.sample.app.events; - -import java.util.concurrent.atomic.AtomicReference; - -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.enterprise.event.Observes; - -import io.debezium.runtime.events.DebeziumHeartbeat; -import io.debezium.runtime.events.DefaultEngine; -import io.debezium.runtime.events.Engine; - -@ApplicationScoped -public class HeartbeatEventObserver { - - private final AtomicReference defaultHeartbeats = new AtomicReference<>(); - private final AtomicReference alternativeHeartbeats = new AtomicReference<>(); - - public void observeDefaultHeartbeat(@Observes @DefaultEngine DebeziumHeartbeat heartbeat) { - defaultHeartbeats.set(heartbeat); - } - - public void observeAlternativeHeartbeat(@Observes @Engine("alternative") DebeziumHeartbeat heartbeat) { - alternativeHeartbeats.set(heartbeat); - } - - public DebeziumHeartbeat get(String engine) { - if (engine.equals("default")) { - return defaultHeartbeats.get(); - } - - return alternativeHeartbeats.get(); - } -} diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/events/LifecycleEventObserver.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/events/LifecycleEventObserver.java deleted file mode 100644 index 5f838bcd82..0000000000 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/events/LifecycleEventObserver.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.quarkus.sample.app.events; - -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; - -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.enterprise.event.Observes; - -import io.debezium.runtime.events.AbstractDebeziumLifecycleEvent; -import io.debezium.runtime.events.ConnectorStartedEvent; -import io.debezium.runtime.events.ConnectorStoppedEvent; -import io.debezium.runtime.events.DefaultEngine; -import io.debezium.runtime.events.Engine; -import io.debezium.runtime.events.PollingStartedEvent; -import io.debezium.runtime.events.PollingStoppedEvent; -import io.debezium.runtime.events.TasksStartedEvent; -import io.debezium.runtime.events.TasksStoppedEvent; - -/** - * A simple observer for Debezium lifecycle events, where they get recorded. - * - * @author Chris Cranford - */ -@ApplicationScoped -public class LifecycleEventObserver { - - private final List defaultLifecycleEvents = new CopyOnWriteArrayList<>(); - private final List alternativeLifecycleEvents = new CopyOnWriteArrayList<>(); - - void defaultOnConnectorStarted(@Observes @Engine("default") ConnectorStartedEvent connectorStartedEvent) { - defaultLifecycleEvents.add(connectorStartedEvent); - } - - void defaultOnConnectorStopped(@Observes @Engine("default") ConnectorStoppedEvent connectorStoppedEvent) { - defaultLifecycleEvents.add(connectorStoppedEvent); - } - - void defaultOnTaskStarted(@Observes @DefaultEngine TasksStartedEvent tasksStartedEvent) { - defaultLifecycleEvents.add(tasksStartedEvent); - } - - void defaultOnTaskStopped(@Observes @Engine("default") TasksStoppedEvent tasksStoppedEvent) { - defaultLifecycleEvents.add(tasksStoppedEvent); - } - - void defaultOnPollingStarted(@Observes @Engine("default") PollingStartedEvent pollingStartedEvent) { - defaultLifecycleEvents.add(pollingStartedEvent); - } - - void defaultOnPollingStopped(@Observes @Engine("default") PollingStoppedEvent pollingStoppedEvent) { - defaultLifecycleEvents.add(pollingStoppedEvent); - } - - void alternativeOnConnectorStarted(@Observes @Engine("alternative") ConnectorStartedEvent connectorStartedEvent) { - alternativeLifecycleEvents.add(connectorStartedEvent); - } - - void alternativeOnConnectorStopped(@Observes @Engine("alternative") ConnectorStoppedEvent connectorStoppedEvent) { - alternativeLifecycleEvents.add(connectorStoppedEvent); - } - - void alternativeOnTaskStarted(@Observes @Engine("alternative") TasksStartedEvent tasksStartedEvent) { - alternativeLifecycleEvents.add(tasksStartedEvent); - } - - void alternativeOnTaskStopped(@Observes @Engine("alternative") TasksStoppedEvent tasksStoppedEvent) { - alternativeLifecycleEvents.add(tasksStoppedEvent); - } - - void alternativeOnPollingStarted(@Observes @Engine("alternative") PollingStartedEvent pollingStartedEvent) { - alternativeLifecycleEvents.add(pollingStartedEvent); - } - - void alternativeOnPollingStopped(@Observes @Engine("alternative") PollingStoppedEvent pollingStoppedEvent) { - alternativeLifecycleEvents.add(pollingStoppedEvent); - } - - public List getLifecycleEvents(String engine) { - if (engine.equals("default")) { - return defaultLifecycleEvents; - } - return alternativeLifecycleEvents; - } - -} diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/events/SnapshotEventObserver.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/events/SnapshotEventObserver.java deleted file mode 100644 index b0351338b6..0000000000 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/events/SnapshotEventObserver.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.quarkus.sample.app.events; - -import java.util.ArrayList; -import java.util.List; - -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.enterprise.event.Observes; - -import io.debezium.runtime.events.DefaultEngine; -import io.debezium.runtime.events.Engine; -import io.quarkus.debezium.notification.SnapshotEvent; - -@ApplicationScoped -public class SnapshotEventObserver { - - private final List defaultSnapshotEvents = new ArrayList<>(); - private final List alternativeSnapshotEvents = new ArrayList<>(); - - public void defaultSnapshot(@Observes @DefaultEngine SnapshotEvent snapshot) { - defaultSnapshotEvents.add(snapshot); - } - - public void alternativeSnapshot(@Observes @Engine("alternative") SnapshotEvent snapshot) { - alternativeSnapshotEvents.add(snapshot); - } - - public List getDefaultSnapshotEvents(String engine) { - if (engine.equals("default")) { - return defaultSnapshotEvents; - } - return alternativeSnapshotEvents; - } - -} diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/resources/CapturingResource.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/resources/CapturingResource.java deleted file mode 100644 index 79ea7240de..0000000000 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/resources/CapturingResource.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.quarkus.sample.app.resources; - -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.ws.rs.GET; -import jakarta.ws.rs.Path; -import jakarta.ws.rs.core.Response; - -import io.quarkus.sample.app.services.OrderService; -import io.quarkus.sample.app.services.ProductService; - -@Path("/captured") -@ApplicationScoped -public class CapturingResource { - - private final ProductService productService; - private final OrderService orderService; - - public CapturingResource(ProductService productService, OrderService orderService) { - this.productService = productService; - this.orderService = orderService; - } - - @GET - @Path("all") - public Response captured() { - if (productService.isInvoked()) { - return Response.status(Response.Status.FOUND).build(); - } - - return Response.status(Response.Status.NOT_FOUND).build(); - } - - @GET - @Path("products") - public Response products() { - if (productService.products().isEmpty()) { - return Response.status(Response.Status.NOT_FOUND).build(); - } - return Response.ok(productService.products()).build(); - } - - @GET - @Path("orders") - public Response orders() { - if (orderService.orders().isEmpty()) { - return Response.status(Response.Status.NOT_FOUND).build(); - } - return Response.ok(orderService.orders()).build(); - } - -} diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/resources/EngineResource.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/resources/EngineResource.java deleted file mode 100644 index 3c60f96380..0000000000 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/resources/EngineResource.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.quarkus.sample.app.resources; - -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.ws.rs.GET; -import jakarta.ws.rs.Path; -import jakarta.ws.rs.core.Response; - -import io.debezium.runtime.DebeziumConnectorRegistry; -import io.debezium.runtime.DebeziumStatus; -import io.quarkus.sample.app.dto.EngineInformation; - -@Path("engine") -@ApplicationScoped -public class EngineResource { - - private final DebeziumConnectorRegistry registry; - - public EngineResource(DebeziumConnectorRegistry registry) { - this.registry = registry; - } - - @GET - @Path("manifest") - public Response engines() { - return Response.ok(registry.engines() - .stream() - .map(engine -> new EngineInformation(engine.manifest().id(), engine.connector().name())) - .toList()).build(); - } - - @GET - @Path("status") - public DebeziumStatus getState() { - return registry.engines().getFirst().status(); - } -} diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/resources/HeartbeatEventResource.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/resources/HeartbeatEventResource.java deleted file mode 100644 index 3f140df4f8..0000000000 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/resources/HeartbeatEventResource.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ - -package io.quarkus.sample.app.resources; - -import jakarta.inject.Inject; -import jakarta.ws.rs.DefaultValue; -import jakarta.ws.rs.GET; -import jakarta.ws.rs.Path; -import jakarta.ws.rs.Produces; -import jakarta.ws.rs.QueryParam; -import jakarta.ws.rs.core.MediaType; - -import io.debezium.runtime.events.DebeziumHeartbeat; -import io.quarkus.sample.app.events.HeartbeatEventObserver; - -@Path("/heartbeat") -public class HeartbeatEventResource { - private final HeartbeatEventObserver observer; - - @Inject - public HeartbeatEventResource(HeartbeatEventObserver observer) { - this.observer = observer; - } - - @GET - @Produces(MediaType.APPLICATION_JSON) - public DebeziumHeartbeat get(@QueryParam("engine") @DefaultValue("default") String engine) { - return observer.get(engine); - } - -} diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/resources/LifecycleEventResource.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/resources/LifecycleEventResource.java deleted file mode 100644 index 847090bdd7..0000000000 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/resources/LifecycleEventResource.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.quarkus.sample.app.resources; - -import java.util.List; - -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.inject.Inject; -import jakarta.ws.rs.DefaultValue; -import jakarta.ws.rs.GET; -import jakarta.ws.rs.Path; -import jakarta.ws.rs.Produces; -import jakarta.ws.rs.QueryParam; -import jakarta.ws.rs.core.MediaType; - -import io.debezium.runtime.DebeziumConnectorRegistry; -import io.quarkus.sample.app.events.LifecycleEventObserver; - -/** - * @author Chris Cranford - */ -@Path("/lifecycle-events") -@ApplicationScoped -public class LifecycleEventResource { - - @Inject - private DebeziumConnectorRegistry registry; - - @Inject - LifecycleEventObserver observer; - - @GET - @Produces(MediaType.APPLICATION_JSON) - public List getEvents(@QueryParam("engine") @DefaultValue("default") String engine) { - // Returns a list of all observed event names - return observer.getLifecycleEvents(engine) - .stream() - .map(o -> o.getClass().getName()) - .toList(); - } - -} diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/resources/SnapshotEventResource.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/resources/SnapshotEventResource.java deleted file mode 100644 index 5fd6927606..0000000000 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/resources/SnapshotEventResource.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.quarkus.sample.app.resources; - -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.ws.rs.DefaultValue; -import jakarta.ws.rs.GET; -import jakarta.ws.rs.Path; -import jakarta.ws.rs.Produces; -import jakarta.ws.rs.QueryParam; -import jakarta.ws.rs.core.MediaType; -import jakarta.ws.rs.core.Response; - -import io.quarkus.sample.app.events.SnapshotEventObserver; - -@Path("/notifications") -@ApplicationScoped -public class SnapshotEventResource { - - private final SnapshotEventObserver snapshotEventObserver; - - public SnapshotEventResource(SnapshotEventObserver snapshotEventObserver) { - this.snapshotEventObserver = snapshotEventObserver; - } - - @GET - @Produces(MediaType.APPLICATION_JSON) - public Response notification(@QueryParam("engine") @DefaultValue("default") String engine) { - if (snapshotEventObserver.getDefaultSnapshotEvents("default").isEmpty()) { - return Response.status(Response.Status.NOT_FOUND).build(); - } - - return Response.ok(snapshotEventObserver - .getDefaultSnapshotEvents(engine) - .stream() - .map(a -> a.getClass().getName())) - .build(); - } -} diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/services/CaptureHandler.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/services/CaptureHandler.java deleted file mode 100644 index d7e0033ab3..0000000000 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/services/CaptureHandler.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ - -package io.quarkus.sample.app.services; - -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.inject.Inject; - -import org.apache.kafka.connect.source.SourceRecord; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import io.debezium.runtime.Capturing; -import io.debezium.runtime.CapturingEvent; -import io.quarkus.sample.app.dto.Order; -import io.quarkus.sample.app.dto.Product; - -@ApplicationScoped -public class CaptureHandler { - - private final ProductService productService; - private final OrderService orderService; - private final Logger logger = LoggerFactory.getLogger(CaptureHandler.class); - - @Inject - public CaptureHandler(ProductService productService, OrderService orderService) { - this.productService = productService; - this.orderService = orderService; - } - - @Capturing - public void capture(CapturingEvent event) { - productService.captured(); - } - - @Capturing(destination = "dbserver1.public.products") - public void products(CapturingEvent event) { - logger.info("getting a product event for destination {} from capturing id {}", event.destination(), event.engine()); - productService.add(event.record()); - } - - @Capturing(destination = "dbserver2.public.orders", engine = "alternative") - public void orders(CapturingEvent event) { - logger.info("getting a order event for destination {} from capturing id {}", event.destination(), event.engine()); - orderService.add(event.record()); - } -} diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/services/OrderService.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/services/OrderService.java deleted file mode 100644 index 52b8a43238..0000000000 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/services/OrderService.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ - -package io.quarkus.sample.app.services; - -import java.util.ArrayList; -import java.util.List; - -import jakarta.enterprise.context.ApplicationScoped; - -import io.quarkus.sample.app.dto.Order; - -@ApplicationScoped -public class OrderService { - private final List orders = new ArrayList<>(); - - public void add(Order order) { - orders.add(order); - } - - public List orders() { - return orders; - } -} diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/services/ProductService.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/services/ProductService.java deleted file mode 100644 index f487435f1e..0000000000 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/services/ProductService.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ - -package io.quarkus.sample.app.services; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; - -import jakarta.enterprise.context.ApplicationScoped; - -import io.quarkus.sample.app.dto.Product; - -@ApplicationScoped -public class ProductService { - private final List products = new ArrayList<>(); - private final AtomicBoolean invoked = new AtomicBoolean(false); - - public void captured() { - invoked.set(true); - } - - public boolean isInvoked() { - return invoked.get(); - } - - public void add(Product product) { - products.add(product); - } - - public List products() { - return products; - } -} diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/resources/application-multi.properties b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/resources/application-multi.properties index 176c241eb9..36ce4d4495 100644 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/resources/application-multi.properties +++ b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/resources/application-multi.properties @@ -1,28 +1,18 @@ -# Debezium CDC configuration for alternative - -quarkus.debezium.capturing.alternative.engine-id=alternative quarkus.debezium.capturing.alternative.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore +quarkus.debezium.capturing.alternative.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory +quarkus.debezium.capturing.alternative.heartbeat.interval.ms=5 +quarkus.debezium.capturing.alternative.engine-id=alternative quarkus.debezium.capturing.alternative.name=alternative -quarkus.debezium.capturing.alternative.topic.prefix=dbserver2 -quarkus.debezium.capturing.alternative.plugin.name=pgoutput +quarkus.debezium.capturing.alternative.topic.prefix=integration2 +quarkus.debezium.capturing.alternative.database.names=alternative quarkus.debezium.capturing.alternative.snapshot.mode=initial +quarkus.debezium.capturing.alternative.orders.destination=integration2.alternative.orders +quarkus.debezium.capturing.alternative.orders.deserializer=io.quarkus.sample.app.deserializer.OrderDeserializer -# Default Quarkus datasource configuration - -quarkus.datasource.devservices.enabled=false -quarkus.datasource.db-kind=postgresql -quarkus.datasource.username=native -quarkus.datasource.password=native -quarkus.datasource.jdbc.url=jdbc:postgresql://localhost:5432/native - -# Alternative Quarkus datasource configuration quarkus.datasource.alternative.db-kind=postgresql quarkus.datasource.alternative.username=alternative quarkus.datasource.alternative.password=alternative quarkus.datasource.alternative.jdbc.url=jdbc:postgresql://localhost:5433/alternative -quarkus.debezium.capturing.alternative.heartbeat.interval.ms=5 - -quarkus.debezium.capturing.alternative.orders.destination=dbserver2.public.orders -quarkus.debezium.capturing.alternative.orders.deserializer=io.quarkus.sample.app.deserializer.OrderDeserializer \ No newline at end of file +quarkus.datasource.alternative.jdbc.max-size=16 \ No newline at end of file diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/resources/application.properties b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/resources/application.properties index 56253505cd..f82b035edd 100644 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/resources/application.properties +++ b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/resources/application.properties @@ -1,45 +1,40 @@ # Debezium CDC configuration quarkus.debezium.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore quarkus.debezium.name=native -quarkus.debezium.topic.prefix=dbserver1 -quarkus.debezium.plugin.name=pgoutput +quarkus.debezium.topic.prefix=integration +quarkus.debezium.database.names=native quarkus.debezium.snapshot.mode=initial - - +quarkus.debezium.plugin.name=pgoutput +quarkus.debezium.include.schema.changes=false +quarkus.debezium.heartbeat.interval.ms=5 + # Transformation quarkus.debezium.transforms.t0.add.fields=op,table quarkus.debezium.transforms.t0.add.headers=db,table quarkus.debezium.transforms.t0.negate=false -quarkus.debezium.transforms.t0.predicate=p2 quarkus.debezium.transforms.t0.type=io.debezium.transforms.ExtractNewRecordState quarkus.debezium.transforms=t0 -quarkus.debezium.predicates.p2.pattern=inventory.inventory.products -quarkus.debezium.predicates.p2.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches -quarkus.debezium.predicates=p2 - -# PostProcessor -quarkus.debezium.post.processors=reselector -quarkus.debezium.post.processors.reselector.type=io.debezium.processors.reselect.ReselectColumnsPostProcessor -quarkus.debezium.post.processors.reselector.reselect.unavailable.values=true -quarkus.debezium.post.processors.reselector.reselect.null.values=true -quarkus.debezium.post.processors.reselector.reselect.use.event.key=false -quarkus.debezium.post.processors.reselector.reselect.error.handling.mode=WARN -quarkus.debezium.heartbeat.interval.ms=5 - - ## deserializer -quarkus.debezium.capturing.default.destination=dbserver1.public.products +quarkus.debezium.capturing.default.destination=integration.native.products quarkus.debezium.capturing.default.deserializer=io.quarkus.sample.app.deserializer.ProductDeserializer -quarkus.datasource.devservices.enabled=false quarkus.datasource.db-kind=postgresql quarkus.datasource.username=native quarkus.datasource.password=native + quarkus.datasource.jdbc.url=jdbc:postgresql://localhost:5432/native +quarkus.datasource.jdbc.max-size=16 +quarkus.devservices.enabled=false +quarkus.datasource.devservices.enabled=false + +%test.quarkus.datasource.devservices.enabled=false +%test.quarkus.devservices.enabled=false quarkus.datasource.alternative.db-kind=postgresql quarkus.datasource.alternative.username=alternative quarkus.datasource.alternative.password=alternative + quarkus.datasource.alternative.jdbc.url=jdbc:postgresql://localhost:5433/alternative +quarkus.datasource.alternative.jdbc.max-size=16 \ No newline at end of file diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/resources/sql/alternative.sql b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/resources/sql/alternative.sql deleted file mode 100644 index 67628a6084..0000000000 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/resources/sql/alternative.sql +++ /dev/null @@ -1,12 +0,0 @@ -CREATE TABLE orders( - id SERIAL PRIMARY KEY, - name TEXT NOT NULL, - description TEXT NOT NULL -); - -CREATE TABLE users( - id SERIAL PRIMARY KEY, - name TEXT NOT NULL -); - -INSERT INTO orders (id, name, description) VALUES (1, 'pizza', 'pizza with peperoni'), (2, 'kebab', 'kebab with mayonnaise'); diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/resources/sql/alternative/alternative.sql b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/resources/sql/alternative/alternative.sql new file mode 100644 index 0000000000..4def69fab7 --- /dev/null +++ b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/resources/sql/alternative/alternative.sql @@ -0,0 +1,11 @@ +CREATE SCHEMA IF NOT EXISTS alternative; + +CREATE TABLE alternative.orders( + id SERIAL PRIMARY KEY, + name TEXT NOT NULL, + description TEXT NOT NULL +); + +INSERT INTO alternative.orders(id, name, description) +VALUES (1, 'pizza', 'pizza with peperoni'), + (2, 'kebab', 'kebab with mayonnaise') \ No newline at end of file diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/resources/sql/default.sql b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/resources/sql/default.sql deleted file mode 100644 index b7315915ea..0000000000 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/resources/sql/default.sql +++ /dev/null @@ -1,13 +0,0 @@ -CREATE TABLE products( - id SERIAL PRIMARY KEY, - name TEXT NOT NULL, - description TEXT NOT NULL -); - -CREATE TABLE users( - id SERIAL PRIMARY KEY, - name TEXT NOT NULL -); - -INSERT INTO products (id, name, description) VALUES (1, 't-shirt', 'red hat t-shirt'), (2, 'sweatshirt', 'blue ibm sweatshirt'); -INSERT INTO users (name) VALUES ('alvar'), ('anisha'), ('chris'), ('indra'), ('jiri'), ('giovanni'), ('mario'), ('rené'), ('Vojtěch'); diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/resources/sql/default/default.sql b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/resources/sql/default/default.sql new file mode 100644 index 0000000000..20c03cd3ee --- /dev/null +++ b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/resources/sql/default/default.sql @@ -0,0 +1,15 @@ +CREATE SCHEMA IF NOT EXISTS native; + +CREATE TABLE native.products( + id SERIAL PRIMARY KEY, + name TEXT NOT NULL, + description TEXT NOT NULL +); + +CREATE TABLE native.users( + id SERIAL PRIMARY KEY, + name TEXT NOT NULL +); + +INSERT INTO native.users (name) VALUES ('alvar'), ('anisha'), ('chris'), ('indra'), ('jiri'), ('giovanni'), ('mario'), ('rené'), ('Vojtech'); +INSERT INTO native.products (id, name, description) VALUES (1, 't-shirt','red hat t-shirt'), (2,'sweatshirt','blue ibm sweatshirt'); diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/test/java/io/quarkus/sample/app/CapturingIT.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/test/java/io/quarkus/sample/app/CapturingIT.java deleted file mode 100644 index e12ab7eef9..0000000000 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/test/java/io/quarkus/sample/app/CapturingIT.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.quarkus.sample.app; - -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasSize; -import static org.testcontainers.shaded.org.awaitility.Awaitility.await; - -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; - -import io.quarkus.sample.app.test.DisableIfSingleEngine; -import io.quarkus.test.junit.QuarkusIntegrationTest; -import io.restassured.RestAssured; - -@QuarkusIntegrationTest -public class CapturingIT { - - @Test - @DisplayName("Debezium should capture events") - void shouldDebeziumSendCaptureEvents() { - await().untilAsserted(() -> RestAssured - .given() - .redirects().follow(false) - .when() - .get("/captured/all") - .then() - .statusCode(302)); - } - - @Test - @DisplayName("Debezium should capture deserialized events") - void shouldCaptureDeserializedEvents() { - await().untilAsserted(() -> RestAssured - .given() - .redirects().follow(false) - .when() - .get("/captured/products") - .then() - .statusCode(200) - .body("$", hasSize(2)) - .body("[0].id", equalTo(1)) - .body("[0].name", equalTo("t-shirt")) - .body("[0].description", equalTo("red hat t-shirt")) - .body("[1].id", equalTo(2)) - .body("[1].name", equalTo("sweatshirt")) - .body("[1].description", equalTo("blue ibm sweatshirt"))); - } - - @Test - @DisplayName("Debezium should capture deserialized events from another engine") - @DisableIfSingleEngine - void shouldCaptureDeserializedEventsFromAnotherEngine() { - await().untilAsserted(() -> RestAssured - .given() - .redirects().follow(false) - .when() - .get("/captured/orders") - .then() - .statusCode(200) - .body("$", hasSize(2)) - .body("[0].id", equalTo(1)) - .body("[0].name", equalTo("pizza")) - .body("[0].description", equalTo("pizza with peperoni")) - .body("[1].id", equalTo(2)) - .body("[1].name", equalTo("kebab")) - .body("[1].description", equalTo("kebab with mayonnaise"))); - } -} diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/test/java/io/quarkus/sample/app/EngineIT.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/test/java/io/quarkus/sample/app/EngineIT.java deleted file mode 100644 index b12ae9c855..0000000000 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/test/java/io/quarkus/sample/app/EngineIT.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.quarkus.sample.app; - -import static io.restassured.RestAssured.get; -import static org.assertj.core.api.AssertionsForClassTypes.assertThat; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasSize; -import static org.testcontainers.shaded.org.awaitility.Awaitility.await; - -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; - -import io.debezium.runtime.DebeziumStatus; -import io.quarkus.sample.app.test.DisableIfMultiEngine; -import io.quarkus.sample.app.test.DisableIfSingleEngine; -import io.quarkus.test.junit.QuarkusIntegrationTest; -import io.restassured.RestAssured; - -@QuarkusIntegrationTest -public class EngineIT { - - @Test - @DisplayName("should get single engine") - @DisableIfMultiEngine - void shouldGetSingleEngine() { - await().untilAsserted(() -> RestAssured - .given() - .redirects().follow(false) - .when() - .get("/engine/manifest") - .then() - .statusCode(200) - .body("$", hasSize(1)) - .body("[0].id", equalTo("default")) - .body("[0].connector", equalTo("io.debezium.connector.postgresql.PostgresConnector"))); - } - - @Test - @DisplayName("should get multiple engines") - @DisableIfSingleEngine - void shouldGetMultipleEngines() { - await().untilAsserted(() -> RestAssured - .given() - .redirects().follow(false) - .when() - .get("/engine/manifest") - .then() - .statusCode(200) - .body("$", hasSize(2)) - .body("[0].id", equalTo("default")) - .body("[0].connector", equalTo("io.debezium.connector.postgresql.PostgresConnector")) - .body("[1].id", equalTo("alternative")) - .body("[1].connector", equalTo("io.debezium.connector.postgresql.PostgresConnector"))); - } - - @Test - @DisplayName("Debezium should start polling") - void shouldDebeziumStartPolling() { - await().untilAsserted(() -> assertThat( - get("/engine/status") - .then() - .statusCode(200) - .extract().body().as(DebeziumStatus.class)) - .isEqualTo(new DebeziumStatus(DebeziumStatus.State.POLLING))); - } -} diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/test/java/io/quarkus/sample/app/NotificationIT.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/test/java/io/quarkus/sample/app/NotificationIT.java deleted file mode 100644 index 7e6ec41e0e..0000000000 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/test/java/io/quarkus/sample/app/NotificationIT.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.quarkus.sample.app; - -import static io.restassured.RestAssured.get; -import static org.testcontainers.shaded.org.awaitility.Awaitility.await; - -import org.assertj.core.api.Assertions; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; - -import io.quarkus.debezium.notification.SnapshotCompleted; -import io.quarkus.debezium.notification.SnapshotInProgress; -import io.quarkus.debezium.notification.SnapshotStarted; -import io.quarkus.debezium.notification.SnapshotTableScanCompleted; -import io.quarkus.sample.app.test.DisableIfMultiEngine; -import io.quarkus.sample.app.test.DisableIfSingleEngine; -import io.quarkus.test.junit.QuarkusIntegrationTest; - -@QuarkusIntegrationTest -public class NotificationIT { - - @Test - @DisplayName("should get snapshot notifications") - @DisableIfMultiEngine - void shouldGetSnapshotNotifications() { - await().untilAsserted(() -> Assertions.assertThat( - get("/notifications?engine=default") - .then() - .statusCode(200) - .extract().body().jsonPath().getList(".", String.class)) - .containsExactlyInAnyOrder( - SnapshotStarted.class.getName(), - SnapshotInProgress.class.getName(), - SnapshotInProgress.class.getName(), - SnapshotTableScanCompleted.class.getName(), - SnapshotTableScanCompleted.class.getName(), - SnapshotCompleted.class.getName())); - } - - @Test - @DisplayName("should get snapshot notifications fromAnotherEngine") - @DisableIfSingleEngine - void shouldGetSnapshotNotificationsFromAnotherEngine() { - await().untilAsserted(() -> Assertions.assertThat( - get("/notifications?engine=alternative") - .then() - .statusCode(200) - .extract().body().jsonPath().getList(".", String.class)) - .containsExactlyInAnyOrder( - SnapshotStarted.class.getName(), - SnapshotInProgress.class.getName(), - SnapshotInProgress.class.getName(), - SnapshotTableScanCompleted.class.getName(), - SnapshotTableScanCompleted.class.getName(), - SnapshotCompleted.class.getName())); - } -} diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/dto/Product.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/test/java/io/quarkus/sample/app/PostgresDebeziumMultiEngineTestSuiteIT.java similarity index 54% rename from quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/dto/Product.java rename to quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/test/java/io/quarkus/sample/app/PostgresDebeziumMultiEngineTestSuiteIT.java index ef9dae8b30..e74663c7d3 100644 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/dto/Product.java +++ b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/test/java/io/quarkus/sample/app/PostgresDebeziumMultiEngineTestSuiteIT.java @@ -3,8 +3,7 @@ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 */ +package io.quarkus.sample.app; -package io.quarkus.sample.app.dto; - -public record Product(int id, String name, String description) { +public class PostgresDebeziumMultiEngineTestSuiteIT implements QuarkusDebeziumMultiEngineTestSuite { } diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/dto/Order.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/test/java/io/quarkus/sample/app/PostgresDebeziumSingleEngineTestSuiteIT.java similarity index 54% rename from quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/dto/Order.java rename to quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/test/java/io/quarkus/sample/app/PostgresDebeziumSingleEngineTestSuiteIT.java index 13b9c624f2..05acf36ead 100644 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/main/java/io/quarkus/sample/app/dto/Order.java +++ b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/test/java/io/quarkus/sample/app/PostgresDebeziumSingleEngineTestSuiteIT.java @@ -3,8 +3,7 @@ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 */ +package io.quarkus.sample.app; -package io.quarkus.sample.app.dto; - -public record Order(int id, String name, String description) { +public class PostgresDebeziumSingleEngineTestSuiteIT implements QuarkusDebeziumSingleEngineTestSuite { } diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/test/java/io/quarkus/sample/app/events/HeartbeatEventIT.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/test/java/io/quarkus/sample/app/events/HeartbeatEventIT.java deleted file mode 100644 index 93ba0d7236..0000000000 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/test/java/io/quarkus/sample/app/events/HeartbeatEventIT.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ - -package io.quarkus.sample.app.events; - -import static io.restassured.RestAssured.get; -import static org.assertj.core.api.AssertionsForClassTypes.assertThat; -import static org.testcontainers.shaded.org.awaitility.Awaitility.await; - -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; - -import io.debezium.runtime.events.DebeziumHeartbeat; -import io.quarkus.sample.app.test.DisableIfSingleEngine; -import io.quarkus.test.junit.QuarkusIntegrationTest; - -@QuarkusIntegrationTest -class HeartbeatEventIT { - - @Test - @DisplayName("should get an heartbeat") - void shouldGetHeartbeat() { - await().untilAsserted(() -> assertThat( - get("/heartbeat?engine=default") - .then() - .statusCode(200) - .extract() - .body() - .as(DebeziumHeartbeat.class).connector().name()) - .isEqualTo("io.debezium.connector.postgresql.PostgresConnector")); - } - - @Test - @DisplayName("should get an heartbeat from another engine") - @DisableIfSingleEngine - void shouldGetHeartbeatFromAnotherEngine() { - await().untilAsserted(() -> assertThat( - get("/heartbeat?engine=alternative") - .then() - .statusCode(200) - .extract() - .body() - .as(DebeziumHeartbeat.class).connector().name()) - .isEqualTo("io.debezium.connector.postgresql.PostgresConnector")); - } -} diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/test/java/io/quarkus/sample/app/events/LifecycleEventIT.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/test/java/io/quarkus/sample/app/events/LifecycleEventIT.java deleted file mode 100644 index 2f8b0993fe..0000000000 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/test/java/io/quarkus/sample/app/events/LifecycleEventIT.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.quarkus.sample.app.events; - -import static io.restassured.RestAssured.get; -import static org.assertj.core.api.Assertions.assertThat; -import static org.testcontainers.shaded.org.awaitility.Awaitility.await; - -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; - -import io.debezium.runtime.events.ConnectorStartedEvent; -import io.debezium.runtime.events.PollingStartedEvent; -import io.debezium.runtime.events.TasksStartedEvent; -import io.quarkus.sample.app.test.DisableIfSingleEngine; -import io.quarkus.test.junit.QuarkusIntegrationTest; - -/** - * @author Chris Cranford - */ -@QuarkusIntegrationTest -public class LifecycleEventIT { - - /** - * TODO: the following test show the heartbeat functionality should be expressed by engine - */ - @Test - @DisplayName("Test Lifecycle Events for SingleEngine") - public void testLifecycleEventsForSingleEngine() { - // Only concerned with up to polling started because we don't stop the connector - await().untilAsserted(() -> assertThat( - get("/lifecycle-events") - .then() - .statusCode(200) - .extract().body().jsonPath().getList(".", String.class)) - .containsExactly( - ConnectorStartedEvent.class.getName(), - TasksStartedEvent.class.getName(), - PollingStartedEvent.class.getName())); - } - - @Test - @DisplayName("Test Lifecycle Events for multiEngine") - @DisableIfSingleEngine - public void testLifecycleEventsForMultiEngine() { - // Only concerned with up to polling started because we don't stop the connector - await().untilAsserted(() -> assertThat( - get("/lifecycle-events?engine=alternative") - .then() - .statusCode(200) - .extract().body().jsonPath().getList(".", String.class)) - .containsExactlyInAnyOrder( - ConnectorStartedEvent.class.getName(), - TasksStartedEvent.class.getName(), - PollingStartedEvent.class.getName())); - } - -} diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/test/java/io/quarkus/sample/app/test/DisableIfMultiEngine.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/test/java/io/quarkus/sample/app/test/DisableIfMultiEngine.java deleted file mode 100644 index 5e2679b361..0000000000 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/test/java/io/quarkus/sample/app/test/DisableIfMultiEngine.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.quarkus.sample.app.test; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -import org.junit.jupiter.api.extension.ExtendWith; - -@Target({ ElementType.TYPE, ElementType.METHOD }) -@Retention(RetentionPolicy.RUNTIME) -@ExtendWith(MultiEngineCondition.class) -public @interface DisableIfMultiEngine { -} diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/test/java/io/quarkus/sample/app/test/DisableIfSingleEngine.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/test/java/io/quarkus/sample/app/test/DisableIfSingleEngine.java deleted file mode 100644 index 597ad216d0..0000000000 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/test/java/io/quarkus/sample/app/test/DisableIfSingleEngine.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.quarkus.sample.app.test; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -import org.junit.jupiter.api.extension.ExtendWith; - -@Target({ ElementType.TYPE, ElementType.METHOD }) -@Retention(RetentionPolicy.RUNTIME) -@ExtendWith(SingleEngineCondition.class) -public @interface DisableIfSingleEngine { -} diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/test/java/io/quarkus/sample/app/test/MultiEngineCondition.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/test/java/io/quarkus/sample/app/test/MultiEngineCondition.java deleted file mode 100644 index c557e9b2c9..0000000000 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/test/java/io/quarkus/sample/app/test/MultiEngineCondition.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.quarkus.sample.app.test; - -import org.eclipse.microprofile.config.ConfigProvider; -import org.junit.jupiter.api.extension.ConditionEvaluationResult; -import org.junit.jupiter.api.extension.ExecutionCondition; -import org.junit.jupiter.api.extension.ExtensionContext; - -public class MultiEngineCondition implements ExecutionCondition { - private static final ConditionEvaluationResult ENABLED = ConditionEvaluationResult.enabled("MultiEngine enabled with profile --multi"); - private static final ConditionEvaluationResult DISABLED = ConditionEvaluationResult.disabled("MultiEngine disable with profile --prod"); - - @Override - public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context) { - if (ConfigProvider.getConfig() - .getConfigValue("quarkus.test.integration-test-profile") - .getValue() - .equals("multi")) { - return DISABLED; - } - - return ENABLED; - } -} diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/test/java/io/quarkus/sample/app/test/SingleEngineCondition.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/test/java/io/quarkus/sample/app/test/SingleEngineCondition.java deleted file mode 100644 index 8c98cfa6c4..0000000000 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/integration-tests/src/test/java/io/quarkus/sample/app/test/SingleEngineCondition.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.quarkus.sample.app.test; - -import org.eclipse.microprofile.config.ConfigProvider; -import org.junit.jupiter.api.extension.ConditionEvaluationResult; -import org.junit.jupiter.api.extension.ExecutionCondition; -import org.junit.jupiter.api.extension.ExtensionContext; - -public class SingleEngineCondition implements ExecutionCondition { - private static final ConditionEvaluationResult ENABLED = ConditionEvaluationResult.enabled("SingleEngine enabled with profile --prod"); - private static final ConditionEvaluationResult DISABLED = ConditionEvaluationResult.disabled("SingleEngine disable with profile --multi"); - - @Override - public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context) { - if (ConfigProvider.getConfig() - .getConfigValue("quarkus.test.integration-test-profile") - .getValue() - .equals("prod")) { - return DISABLED; - } - - return ENABLED; - } -} diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/runtime/pom.xml b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/runtime/pom.xml index 29c7a6fc62..5f47ec3c06 100644 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/runtime/pom.xml +++ b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/runtime/pom.xml @@ -155,6 +155,10 @@ + + io.debezium.quarkus + quarkus-debezium-agroal + io.quarkus diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/runtime/src/main/java/io/quarkus/debezium/configuration/DatasourceParser.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/runtime/src/main/java/io/quarkus/debezium/configuration/DatasourceParser.java deleted file mode 100644 index 828fcca0d3..0000000000 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/runtime/src/main/java/io/quarkus/debezium/configuration/DatasourceParser.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ - -package io.quarkus.debezium.configuration; - -import java.util.Optional; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import io.debezium.jdbc.JdbcConfiguration; - -public class DatasourceParser { - - private static final Logger logger = LoggerFactory.getLogger(DatasourceParser.class); - public static final String REGEX = "jdbc:[a-z]+://(?[^:/;?]+)(:(?\\d+))?([/;](?[^?;]+))?"; - private static final Pattern pattern = Pattern.compile(REGEX); - private final String value; - - public DatasourceParser(String value) { - this.value = value; - } - - public Optional asString() { - Matcher matcher = pattern.matcher(value); - - if (matcher.find()) { - logger.trace("Found datasource definition for {}", value); - - String host = matcher.group(JdbcConfiguration.HOSTNAME.name()); - String port = matcher.group(JdbcConfiguration.PORT.name()); - String database = matcher.group(JdbcConfiguration.DATABASE.name()); - - return Optional.of(new JdbcDatasource(host, port, database)); - } - - logger.warn("Unable to parse datasource: {}", value); - return Optional.empty(); - } - - public record JdbcDatasource(String host, String port, String database) { - - } -} diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/runtime/src/main/java/io/quarkus/debezium/configuration/DatasourceRecorder.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/runtime/src/main/java/io/quarkus/debezium/configuration/DatasourceRecorder.java deleted file mode 100644 index b8bcfcc296..0000000000 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/runtime/src/main/java/io/quarkus/debezium/configuration/DatasourceRecorder.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ - -package io.quarkus.debezium.configuration; - -import java.util.function.Supplier; - -import org.eclipse.microprofile.config.ConfigProvider; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import io.debezium.runtime.configuration.QuarkusDatasourceConfiguration; -import io.quarkus.runtime.annotations.Recorder; - -@Recorder -public class DatasourceRecorder { - - private static final Logger LOGGER = LoggerFactory.getLogger(DatasourceRecorder.class); - public static final String PREFIX = "quarkus.datasource"; - public static final String JDBC_URL = ".jdbc.url"; - public static final String USERNAME = ".username"; - public static final String PASSWORD = ".password"; - public static final String DOT = "."; - - public Supplier convert(String name, boolean defaultConfiguration) { - if (defaultConfiguration) { - LOGGER.trace("Extracting default configuration for data source {}", name); - - String jdbcUrl = ConfigProvider.getConfig().getConfigValue(PREFIX + JDBC_URL).getValue(); - String username = ConfigProvider.getConfig().getConfigValue(PREFIX + USERNAME).getValue(); - String password = ConfigProvider.getConfig().getConfigValue(PREFIX + PASSWORD).getValue(); - - return createConfiguration(QuarkusDatasourceConfiguration.DEFAULT, jdbcUrl, username, password, true); - } - LOGGER.trace("Extracting datasource configuration for {}", name); - - String jdbcUrl = ConfigProvider.getConfig().getConfigValue(PREFIX + DOT + name + JDBC_URL).getValue(); - String username = ConfigProvider.getConfig().getConfigValue(PREFIX + DOT + name + USERNAME).getValue(); - String password = ConfigProvider.getConfig().getConfigValue(PREFIX + DOT + name + PASSWORD).getValue(); - - if (jdbcUrl == null) { - LOGGER.warn("JDBC URL is null"); - return null; - } - - return createConfiguration(name, jdbcUrl, username, password, false); - } - - private Supplier createConfiguration(String name, - String jdbcUrl, - String username, - String password, - boolean isDefault) { - return () -> new DatasourceParser(jdbcUrl) - .asString() - .map(datasource -> new PostgresDatasourceConfiguration( - datasource.host(), - username, - password, - datasource.database(), - datasource.port(), - isDefault, - name)) - .orElse(null); - } -} diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/runtime/src/main/java/io/quarkus/debezium/configuration/PostgresDatasourceConfiguration.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/runtime/src/main/java/io/quarkus/debezium/configuration/PostgresDatasourceConfiguration.java deleted file mode 100644 index c5ab46f0f9..0000000000 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/runtime/src/main/java/io/quarkus/debezium/configuration/PostgresDatasourceConfiguration.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ - -package io.quarkus.debezium.configuration; - -import static io.debezium.config.CommonConnectorConfig.DATABASE_CONFIG_PREFIX; - -import java.util.Map; - -import io.debezium.jdbc.JdbcConfiguration; -import io.debezium.runtime.configuration.QuarkusDatasourceConfiguration; - -public class PostgresDatasourceConfiguration implements QuarkusDatasourceConfiguration { - private final String host; - private final String username; - private final String password; - private final String database; - private final String port; - private final boolean isDefault; - private final String name; - - public PostgresDatasourceConfiguration(String host, - String username, - String password, - String database, - String port, - boolean isDefault, - String name) { - this.host = host; - this.username = username; - this.password = password; - this.database = database; - this.port = port; - this.isDefault = isDefault; - this.name = name; - } - - @Override - public Map asDebezium() { - return Map.of( - "name", name.replaceAll("[<>]", ""), - DATABASE_CONFIG_PREFIX + JdbcConfiguration.HOSTNAME.name(), host, - DATABASE_CONFIG_PREFIX + JdbcConfiguration.PORT.name(), port, - DATABASE_CONFIG_PREFIX + JdbcConfiguration.USER.name(), username, - DATABASE_CONFIG_PREFIX + JdbcConfiguration.PASSWORD.name(), password, - DATABASE_CONFIG_PREFIX + JdbcConfiguration.DATABASE.name(), database); - } - - @Override - public boolean isDefault() { - return isDefault; - } - - public String getName() { - return name; - } - - @Override - public String getSanitizedName() { - return name.replaceAll("[<>]", ""); - } -} diff --git a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/runtime/src/main/java/io/quarkus/debezium/engine/PostgresEngineProducer.java b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/runtime/src/main/java/io/quarkus/debezium/engine/PostgresEngineProducer.java index 8a3e926fb2..078ca44c9e 100644 --- a/quarkus-debezium-parent/quarkus-debezium-postgres-parent/runtime/src/main/java/io/quarkus/debezium/engine/PostgresEngineProducer.java +++ b/quarkus-debezium-parent/quarkus-debezium-postgres-parent/runtime/src/main/java/io/quarkus/debezium/engine/PostgresEngineProducer.java @@ -6,97 +6,47 @@ package io.quarkus.debezium.engine; -import static io.debezium.config.CommonConnectorConfig.DATABASE_CONFIG_PREFIX; -import static io.debezium.config.CommonConnectorConfig.NOTIFICATION_ENABLED_CHANNELS; -import static io.debezium.embedded.EmbeddedEngineConfig.CONNECTOR_CLASS; - -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.function.Function; import java.util.stream.Collectors; import jakarta.enterprise.context.ApplicationScoped; -import jakarta.enterprise.inject.Instance; import jakarta.enterprise.inject.Produces; import jakarta.inject.Inject; import jakarta.inject.Singleton; import io.debezium.connector.postgresql.PostgresConnector; -import io.debezium.jdbc.JdbcConfiguration; import io.debezium.runtime.Connector; import io.debezium.runtime.ConnectorProducer; import io.debezium.runtime.Debezium; import io.debezium.runtime.DebeziumConnectorRegistry; import io.debezium.runtime.EngineManifest; import io.debezium.runtime.configuration.DebeziumEngineConfiguration; -import io.debezium.runtime.configuration.QuarkusDatasourceConfiguration; -import io.quarkus.debezium.configuration.DebeziumConfigurationEngineParser; +import io.quarkus.datasource.common.runtime.DatabaseKind; +import io.quarkus.debezium.agroal.engine.AgroalParser; import io.quarkus.debezium.configuration.DebeziumConfigurationEngineParser.MultiEngineConfiguration; -import io.quarkus.debezium.engine.capture.consumer.SourceRecordConsumerHandler; -import io.quarkus.debezium.notification.QuarkusNotificationChannel; @ApplicationScoped public class PostgresEngineProducer implements ConnectorProducer { public static final Connector POSTGRES = new Connector(PostgresConnector.class.getName()); - public static final String DEBEZIUM_DATASOURCE_HOSTNAME = DATABASE_CONFIG_PREFIX + JdbcConfiguration.HOSTNAME.name(); - private final StateHandler stateHandler; - private final Map quarkusDatasourceConfigurations; - private DebeziumFactory debeziumFactory; - private final QuarkusNotificationChannel channel; - private final SourceRecordConsumerHandler sourceRecordConsumerHandler; - private final DebeziumConfigurationEngineParser engineParser = new DebeziumConfigurationEngineParser(); + private final AgroalParser agroalParser; + private final DebeziumFactory debeziumFactory; @Inject - public PostgresEngineProducer(StateHandler stateHandler, - Instance configurations, - QuarkusNotificationChannel channel, - SourceRecordConsumerHandler sourceRecordConsumerHandler, DebeziumFactory debeziumFactory) { - this.stateHandler = stateHandler; - this.channel = channel; - this.sourceRecordConsumerHandler = sourceRecordConsumerHandler; - this.quarkusDatasourceConfigurations = configurations - .stream() - .collect(Collectors.toMap(QuarkusDatasourceConfiguration::getSanitizedName, Function.identity())); + public PostgresEngineProducer(AgroalParser agroalParser, DebeziumFactory debeziumFactory) { + this.agroalParser = agroalParser; this.debeziumFactory = debeziumFactory; } - public PostgresEngineProducer(StateHandler stateHandler, - Map quarkusDatasourceConfigurations, - QuarkusNotificationChannel channel, - SourceRecordConsumerHandler sourceRecordConsumerHandler) { - this.stateHandler = stateHandler; - this.quarkusDatasourceConfigurations = quarkusDatasourceConfigurations; - this.channel = channel; - this.sourceRecordConsumerHandler = sourceRecordConsumerHandler; - } - @Produces @Singleton public DebeziumConnectorRegistry engine(DebeziumEngineConfiguration debeziumEngineConfiguration) { - - /* - * creates a debezium engine using the database coordinates taken from Debezium convention (legacy way) - * in the legacy way we do not support multi-engine - */ - if (debeziumEngineConfiguration.defaultConfiguration().get(DEBEZIUM_DATASOURCE_HOSTNAME) != null) { - return createRegistryFromLegacyConfiguration(debeziumEngineConfiguration.defaultConfiguration()); - } - - List multiEngineConfigurations = engineParser.parse(debeziumEngineConfiguration); - - /* - * enrich Quarkus-like debezium configuration with quarkus datasource configuration - */ - List enrichedMultiEngineConfigurations = multiEngineConfigurations - .stream() - .map(engine -> enrichConfiguration(engine, quarkusDatasourceConfigurations)) - .toList(); + List multiEngineConfigurations = agroalParser.parse(debeziumEngineConfiguration, DatabaseKind.POSTGRESQL, POSTGRES); return new DebeziumConnectorRegistry() { - private final Map engines = enrichedMultiEngineConfigurations + private final Map engines = multiEngineConfigurations .stream() .map(engine -> Map.entry(engine.engineId(), debeziumFactory.get(POSTGRES, engine))) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); @@ -118,58 +68,4 @@ public List engines() { }; } - private MultiEngineConfiguration enrichConfiguration(MultiEngineConfiguration engine, Map collect) { - HashMap mutableMap = new HashMap<>(engine.configuration()); - - mutableMap.compute(NOTIFICATION_ENABLED_CHANNELS.name(), - (key, value) -> value == null ? channel.name() : value.concat("," + channel.name())); - - mutableMap.putAll(getQuarkusDatasourceConfigurationByEngineId(engine.engineId(), collect).asDebezium()); - mutableMap.put(CONNECTOR_CLASS.name(), POSTGRES.name()); - - return new MultiEngineConfiguration(engine.engineId(), mutableMap); - } - - private QuarkusDatasourceConfiguration getQuarkusDatasourceConfigurationByEngineId(String engineId, Map collect) { - QuarkusDatasourceConfiguration configuration = collect.get(engineId); - - if (configuration == null) { - throw new IllegalArgumentException("No datasource configuration found for engine " + engineId); - } - - return configuration; - } - - private DebeziumConnectorRegistry createRegistryFromLegacyConfiguration(Map configuration) { - configuration.compute(NOTIFICATION_ENABLED_CHANNELS.name(), - (key, value) -> value == null ? channel.name() : value.concat("," + channel.name())); - configuration.put(CONNECTOR_CLASS.name(), POSTGRES.name()); - - return new DebeziumConnectorRegistry() { - private final SourceRecordDebezium engine = new SourceRecordDebezium(configuration, - stateHandler, - POSTGRES, - sourceRecordConsumerHandler.get(EngineManifest.DEFAULT), EngineManifest.DEFAULT); - - @Override - public Connector connector() { - return POSTGRES; - } - - @Override - public Debezium get(EngineManifest manifest) { - if (manifest == null || manifest.id() == null || !manifest.id().equals("default")) { - return null; - } - - return engine; - } - - @Override - public List engines() { - return List.of(engine); - } - }; - } - } diff --git a/quarkus-debezium-parent/quarkus-debezium-testsuite-parent/integration-tests/src/main/resources/application.properties b/quarkus-debezium-parent/quarkus-debezium-testsuite-parent/integration-tests/src/main/resources/application.properties index c389645049..34d2d7e6d3 100644 --- a/quarkus-debezium-parent/quarkus-debezium-testsuite-parent/integration-tests/src/main/resources/application.properties +++ b/quarkus-debezium-parent/quarkus-debezium-testsuite-parent/integration-tests/src/main/resources/application.properties @@ -4,4 +4,8 @@ # Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 # -quarkus.debezium.devservices.*.enabled=false \ No newline at end of file +quarkus.debezium.devservices.*.enabled=false +%test.quarkus.datasource.devservices.enabled=false +%test.quarkus.devservices.enabled=false +quarkus.devservices.enabled=false +quarkus.datasource.devservices.enabled=false \ No newline at end of file