Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,16 @@
import io.quarkus.datasource.common.runtime.DatabaseKind;
import io.quarkus.debezium.agroal.engine.AgroalParser;
import io.quarkus.debezium.configuration.DebeziumConfigurationEngineParser;
import io.quarkus.debezium.engine.capture.consumer.SourceRecordConsumerHandler;

public class Db2EngineProducer implements ConnectorProducer {
public static final Connector DB2 = new Connector(Db2Connector.class.getName());

private final StateHandler stateHandler;
private final SourceRecordConsumerHandler sourceRecordConsumerHandler;
private final AgroalParser agroalParser;
private final DebeziumFactory debeziumFactory;

public Db2EngineProducer(StateHandler stateHandler, SourceRecordConsumerHandler sourceRecordConsumerHandler, AgroalParser agroalParser) {
this.stateHandler = stateHandler;
this.sourceRecordConsumerHandler = sourceRecordConsumerHandler;
public Db2EngineProducer(AgroalParser agroalParser, DebeziumFactory debeziumFactory) {
this.agroalParser = agroalParser;
this.debeziumFactory = debeziumFactory;
}

@Override
Expand All @@ -46,18 +43,11 @@ public DebeziumConnectorRegistry engine(DebeziumEngineConfiguration debeziumEngi
private final Map<String, Debezium> engines = multiEngineConfigurations
.stream()
.map(engine -> {
EngineManifest engineManifest = new EngineManifest(engine.engineId());

Map<String, String> debeziumConfiguration = engine.configuration();

// remove unnecessary configuration for sqlserver
debeziumConfiguration.remove(DATABASE_CONFIG_PREFIX + JdbcConfiguration.DATABASE.name());
engine.configuration()
.remove(DATABASE_CONFIG_PREFIX + JdbcConfiguration.DATABASE.name());

return Map.entry(engine.engineId(), new SourceRecordDebezium(
engine.configuration(),
stateHandler,
DB2,
sourceRecordConsumerHandler.get(engineManifest), engineManifest));
return Map.entry(engine.engineId(), debeziumFactory.get(DB2, engine));
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import io.quarkus.debezium.deployment.items.DebeziumGeneratedInvokerBuildItem;
import io.quarkus.debezium.deployment.items.DebeziumGeneratedPostProcessorBuildItem;
import io.quarkus.debezium.deployment.items.DebeziumMediatorBuildItem;
import io.quarkus.debezium.engine.DebeziumFactory;
import io.quarkus.debezium.engine.DebeziumRecorder;
import io.quarkus.debezium.engine.DefaultStateHandler;
import io.quarkus.debezium.engine.capture.CapturingEventInvokerRegistryProducer;
Expand Down Expand Up @@ -173,6 +174,12 @@ void engine(BuildProducer<AdditionalBeanBuildItem> additionalBeanProducer,
.addBeanClasses(DefaultStateHandler.class)
.build());

additionalBeanProducer.produce(AdditionalBeanBuildItem
.builder()
.setUnremovable()
.addBeanClasses(DebeziumFactory.class)
.build());

additionalBeanProducer.produce(AdditionalBeanBuildItem.builder()
.addBeanClasses(
DefaultNotificationHandler.class,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.quarkus.debezium.engine;

import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;

import io.debezium.DebeziumException;
import io.debezium.runtime.Connector;
import io.debezium.runtime.Debezium;
import io.debezium.runtime.DebeziumSerialization;
import io.debezium.runtime.EngineManifest;
import io.quarkus.debezium.configuration.DebeziumConfigurationEngineParser.MultiEngineConfiguration;
import io.quarkus.debezium.engine.capture.consumer.SourceRecordConsumerHandler;

public class DebeziumFactory {

private final Instance<DebeziumSerialization> serialization;
private final StateHandler stateHandler;
private final SourceRecordConsumerHandler sourceRecordConsumerHandler;

@Inject
public DebeziumFactory(
Instance<DebeziumSerialization> serialization,
StateHandler stateHandler,
SourceRecordConsumerHandler sourceRecordConsumerHandler) {
this.serialization = serialization;
this.stateHandler = stateHandler;
this.sourceRecordConsumerHandler = sourceRecordConsumerHandler;
}

public Debezium get(Connector connector, MultiEngineConfiguration engine) {
if (serialization.isResolvable()) {
throw new DebeziumException("not implemented yet engine with configurable serialization");
}

EngineManifest engineManifest = new EngineManifest(engine.engineId());

return new SourceRecordDebezium(
engine.configuration(),
stateHandler,
connector,
sourceRecordConsumerHandler.get(engineManifest),
engineManifest);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.runtime;

import io.debezium.common.annotation.Incubating;
import io.debezium.engine.format.SerializationFormat;

/**
* serialization information associated to a running {@link Debezium} engine
*/
@Incubating
public interface DebeziumSerialization {

/**
*
* @return the key {@link SerializationFormat}
*/
Class<? extends SerializationFormat<?>> getKeyFormat();

/**
*
* @return the value {@link SerializationFormat}
*/
Class<? extends SerializationFormat<?>> getValueFormat();

/**
*
* @return the header {@link SerializationFormat}
*/
Class<? extends SerializationFormat<?>> getHeaderFormat();

/**
*
* @return the id for the {@link Debezium} engine
*/
String getEngineId();
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,18 @@
import io.quarkus.datasource.common.runtime.DatabaseKind;
import io.quarkus.debezium.agroal.engine.AgroalParser;
import io.quarkus.debezium.configuration.DebeziumConfigurationEngineParser.MultiEngineConfiguration;
import io.quarkus.debezium.engine.capture.consumer.SourceRecordConsumerHandler;

public class MariaDbEngineProducer implements ConnectorProducer {
public static final Connector MARIADB = new Connector(MariaDbConnector.class.getName());

private final StateHandler stateHandler;
private final SourceRecordConsumerHandler sourceRecordConsumerHandler;
private final AgroalParser agroalParser;
private final DebeziumFactory debeziumFactory;

@Inject
public MariaDbEngineProducer(StateHandler stateHandler,
SourceRecordConsumerHandler sourceRecordConsumerHandler,
AgroalParser agroalParser) {
this.stateHandler = stateHandler;
this.sourceRecordConsumerHandler = sourceRecordConsumerHandler;
public MariaDbEngineProducer(AgroalParser agroalParser,
DebeziumFactory debeziumFactory) {
this.agroalParser = agroalParser;
this.debeziumFactory = debeziumFactory;
}

@Produces
Expand All @@ -55,18 +51,11 @@ public DebeziumConnectorRegistry engine(DebeziumEngineConfiguration debeziumEngi
private final Map<String, Debezium> engines = multiEngineConfigurations
.stream()
.map(engine -> {
EngineManifest engineManifest = new EngineManifest(engine.engineId());

Map<String, String> debeziumConfiguration = engine.configuration();

// remove unnecessary configuration for sqlserver
debeziumConfiguration.remove(DATABASE_CONFIG_PREFIX + JdbcConfiguration.DATABASE.name());

return Map.entry(engine.engineId(), new SourceRecordDebezium(
engine.configuration(),
stateHandler,
MARIADB,
sourceRecordConsumerHandler.get(engineManifest), engineManifest));
return Map.entry(engine.engineId(), debeziumFactory.get(MARIADB, engine));
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,37 +29,23 @@
import io.quarkus.debezium.configuration.DebeziumConfigurationEngineParser.MultiEngineConfiguration;
import io.quarkus.debezium.configuration.MongoDbDatasourceConfiguration;
import io.quarkus.debezium.configuration.MultiEngineMongoDbDatasourceConfiguration;
import io.quarkus.debezium.engine.capture.consumer.SourceRecordConsumerHandler;
import io.quarkus.debezium.notification.QuarkusNotificationChannel;

public class MongoDbEngineProducer implements ConnectorProducer {

public static final Connector MONGODB = new Connector(MongoDbConnector.class.getName());
private final StateHandler stateHandler;
private final Map<String, MongoDbDatasourceConfiguration> quarkusDatasourceConfigurations;
private DebeziumFactory debeziumFactory;
private final QuarkusNotificationChannel channel;
private final SourceRecordConsumerHandler sourceRecordConsumerHandler;
private final DebeziumConfigurationEngineParser engineParser = new DebeziumConfigurationEngineParser();

@Inject
public MongoDbEngineProducer(StateHandler stateHandler,
MultiEngineMongoDbDatasourceConfiguration multiEngineMongoDbDatasourceConfiguration,
public MongoDbEngineProducer(MultiEngineMongoDbDatasourceConfiguration multiEngineMongoDbDatasourceConfiguration,
QuarkusNotificationChannel channel,
SourceRecordConsumerHandler sourceRecordConsumerHandler) {
this.stateHandler = stateHandler;
DebeziumFactory debeziumFactory) {
this.channel = channel;
this.sourceRecordConsumerHandler = sourceRecordConsumerHandler;
this.quarkusDatasourceConfigurations = multiEngineMongoDbDatasourceConfiguration.get();
}

public MongoDbEngineProducer(StateHandler stateHandler,
Map<String, MongoDbDatasourceConfiguration> quarkusDatasourceConfigurations,
QuarkusNotificationChannel channel,
SourceRecordConsumerHandler sourceRecordConsumerHandler) {
this.stateHandler = stateHandler;
this.quarkusDatasourceConfigurations = quarkusDatasourceConfigurations;
this.channel = channel;
this.sourceRecordConsumerHandler = sourceRecordConsumerHandler;
this.debeziumFactory = debeziumFactory;
}

@Produces
Expand All @@ -79,15 +65,7 @@ public DebeziumConnectorRegistry engine(DebeziumEngineConfiguration debeziumEngi
return new DebeziumConnectorRegistry() {
private final Map<String, Debezium> engines = enrichedMultiEngineConfigurations
.stream()
.map(engine -> {
EngineManifest engineManifest = new EngineManifest(engine.engineId());

return Map.entry(engine.engineId(), new SourceRecordDebezium(
engine.configuration(),
stateHandler,
MONGODB,
sourceRecordConsumerHandler.get(engineManifest), engineManifest));
})
.map(engine -> Map.entry(engine.engineId(), debeziumFactory.get(MONGODB, engine)))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,18 @@
import io.quarkus.datasource.common.runtime.DatabaseKind;
import io.quarkus.debezium.agroal.engine.AgroalParser;
import io.quarkus.debezium.configuration.DebeziumConfigurationEngineParser.MultiEngineConfiguration;
import io.quarkus.debezium.engine.capture.consumer.SourceRecordConsumerHandler;

public class MySqlEngineProducer implements ConnectorProducer {

public static final Connector MYSQL = new Connector(MySqlConnector.class.getName());

private final StateHandler stateHandler;
private final SourceRecordConsumerHandler sourceRecordConsumerHandler;
private final AgroalParser agroalParser;
private final DebeziumFactory debeziumFactory;

@Inject
public MySqlEngineProducer(StateHandler stateHandler,
SourceRecordConsumerHandler sourceRecordConsumerHandler,
AgroalParser agroalParser) {
this.stateHandler = stateHandler;
this.sourceRecordConsumerHandler = sourceRecordConsumerHandler;
public MySqlEngineProducer(AgroalParser agroalParser, DebeziumFactory debeziumFactory) {
this.agroalParser = agroalParser;
this.debeziumFactory = debeziumFactory;
}

@Produces
Expand All @@ -56,18 +51,11 @@ public DebeziumConnectorRegistry engine(DebeziumEngineConfiguration debeziumEngi
private final Map<String, Debezium> engines = multiEngineConfigurations
.stream()
.map(engine -> {
EngineManifest engineManifest = new EngineManifest(engine.engineId());

Map<String, String> debeziumConfiguration = engine.configuration();

// remove unnecessary configuration
debeziumConfiguration.remove(DATABASE_CONFIG_PREFIX + JdbcConfiguration.DATABASE.name());
engine.configuration()
.remove(DATABASE_CONFIG_PREFIX + JdbcConfiguration.DATABASE.name());

return Map.entry(engine.engineId(), new SourceRecordDebezium(
engine.configuration(),
stateHandler,
MYSQL,
sourceRecordConsumerHandler.get(engineManifest), engineManifest));
return Map.entry(engine.engineId(), debeziumFactory.get(MYSQL, engine));
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class PostgresEngineProducer implements ConnectorProducer {

private final StateHandler stateHandler;
private final Map<String, QuarkusDatasourceConfiguration> quarkusDatasourceConfigurations;
private DebeziumFactory debeziumFactory;
private final QuarkusNotificationChannel channel;
private final SourceRecordConsumerHandler sourceRecordConsumerHandler;
private final DebeziumConfigurationEngineParser engineParser = new DebeziumConfigurationEngineParser();
Expand All @@ -52,13 +53,14 @@ public class PostgresEngineProducer implements ConnectorProducer {
public PostgresEngineProducer(StateHandler stateHandler,
Instance<QuarkusDatasourceConfiguration> configurations,
QuarkusNotificationChannel channel,
SourceRecordConsumerHandler sourceRecordConsumerHandler) {
SourceRecordConsumerHandler sourceRecordConsumerHandler, DebeziumFactory debeziumFactory) {
this.stateHandler = stateHandler;
this.channel = channel;
this.sourceRecordConsumerHandler = sourceRecordConsumerHandler;
this.quarkusDatasourceConfigurations = configurations
.stream()
.collect(Collectors.toMap(QuarkusDatasourceConfiguration::getSanitizedName, Function.identity()));
this.debeziumFactory = debeziumFactory;
}

public PostgresEngineProducer(StateHandler stateHandler,
Expand Down Expand Up @@ -96,15 +98,7 @@ public DebeziumConnectorRegistry engine(DebeziumEngineConfiguration debeziumEngi
return new DebeziumConnectorRegistry() {
private final Map<String, Debezium> engines = enrichedMultiEngineConfigurations
.stream()
.map(engine -> {
EngineManifest engineManifest = new EngineManifest(engine.engineId());

return Map.entry(engine.engineId(), new SourceRecordDebezium(
engine.configuration(),
stateHandler,
POSTGRES,
sourceRecordConsumerHandler.get(engineManifest), engineManifest));
})
.map(engine -> Map.entry(engine.engineId(), debeziumFactory.get(POSTGRES, engine)))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

@Override
Expand Down
Loading