Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/

package io.quarkus.debezium.deployment.engine;

import java.util.Optional;
import java.util.UUID;

import org.jboss.jandex.AnnotationValue;
import org.jboss.jandex.MethodInfo;
import org.jboss.jandex.ParameterizedType;
import org.jboss.jandex.Type;

import io.debezium.runtime.BatchEvent;
import io.debezium.runtime.Capturing;
import io.debezium.runtime.CapturingEvents;
import io.quarkus.arc.processor.BeanInfo;
import io.quarkus.arc.processor.DotNames;
import io.quarkus.debezium.deployment.dotnames.DebeziumDotNames;
import io.quarkus.debezium.engine.capture.CapturingEventsInvoker;
import io.quarkus.gizmo.ClassCreator;
import io.quarkus.gizmo.ClassOutput;
import io.quarkus.gizmo.FieldDescriptor;
import io.quarkus.gizmo.MethodCreator;
import io.quarkus.runtime.util.HashUtil;

public class CapturingEventsGenerator implements GizmoBasedCapturingInvokerGenerator {
private final ClassOutput output;

public CapturingEventsGenerator(ClassOutput output) {
this.output = output;
}

@Override
public boolean isCompatible(Type type) {
return ParameterizedType.create(CapturingEvents.class, Type.create(BatchEvent.class)).equals(type);
}

/**
* it generates concrete classes based on the {@link io.quarkus.debezium.engine.capture.CapturingEventInvoker} interface using gizmo:
* <p>
* public class GeneratedCapturingInvoker {
* private final Object beanInstance;
* <p>
* void capture(CapturingEvents<BatchEvent> event) {
* beanInstance.method(event);
* }
* <p>
* }
*
* @param methodInfo
* @param beanInfo
* @return
*/
@Override
public GeneratedClassMetaData generate(MethodInfo methodInfo, BeanInfo beanInfo) {
String name = generateClassName(beanInfo, methodInfo);

try (ClassCreator invoker = ClassCreator.builder()
.classOutput(this.output)
.className(name)
.interfaces(CapturingEventsInvoker.class)
.build()) {

FieldDescriptor beanInstanceField = constructorWithObjectField(methodInfo, invoker, "beanInstance");
createCaptureMethod(methodInfo, invoker, beanInstanceField, CapturingEvents.class);

try (MethodCreator destination = invoker.getMethodCreator("destination", String.class)) {
Optional.ofNullable(methodInfo
.annotation(DebeziumDotNames.CAPTURING)
.value("destination"))
.map(AnnotationValue::asString)
.ifPresentOrElse(s -> {
if (s.isEmpty()) {
throw new IllegalArgumentException("empty destination are not allowed for @Capturing annotation " + methodInfo.declaringClass());
}
destination.returnValue(destination.load(s));
},
() -> destination.returnValue(destination.load(Capturing.ALL)));
}

createEngineMethod(methodInfo, invoker);

return new GeneratedClassMetaData(UUID.randomUUID(), name.replace('/', '.'), beanInfo, CapturingEventsInvoker.class);
}
}

private String generateClassName(BeanInfo bean, MethodInfo methodInfo) {
return DotNames.internalPackageNameWithTrailingSlash(bean.getImplClazz().name())
+ DotNames.simpleName(bean.getImplClazz().name())
+ "_DebeziumBatchInvoker" + "_"
+ methodInfo.name() + "_"
+ HashUtil.sha1(methodInfo.name() + "_" + methodInfo.returnType().name().toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,11 @@
import io.quarkus.debezium.engine.DebeziumRecorder;
import io.quarkus.debezium.engine.DefaultStateHandler;
import io.quarkus.debezium.engine.capture.CapturingEventInvokerRegistryProducer;
import io.quarkus.debezium.engine.capture.CapturingEventsInvokerRegistryProducer;
import io.quarkus.debezium.engine.capture.CapturingInvoker;
import io.quarkus.debezium.engine.capture.CapturingObjectInvokerRegistryProducer;
import io.quarkus.debezium.engine.capture.DynamicCapturingInvokerSupplier;
import io.quarkus.debezium.engine.capture.consumer.GeneralChangeConsumerProducer;
import io.quarkus.debezium.engine.capture.consumer.SourceRecordEventProducer;
import io.quarkus.debezium.engine.converter.custom.DynamicCustomConverterSupplier;
import io.quarkus.debezium.engine.deserializer.CapturingEventDeserializerRegistryProducer;
Expand Down Expand Up @@ -155,15 +157,17 @@ void engine(BuildProducer<AdditionalBeanBuildItem> additionalBeanProducer,
.builder()
.addBeanClasses(
CapturingEventInvokerRegistryProducer.class,
CapturingObjectInvokerRegistryProducer.class)
CapturingObjectInvokerRegistryProducer.class,
CapturingEventsInvokerRegistryProducer.class)
.setDefaultScope(DotNames.APPLICATION_SCOPED)
.setUnremovable()
.build());

additionalBeanProducer.produce(AdditionalBeanBuildItem
.builder()
.addBeanClasses(
SourceRecordEventProducer.class)
SourceRecordEventProducer.class,
GeneralChangeConsumerProducer.class)
.setDefaultScope(DotNames.APPLICATION_SCOPED)
.setUnremovable()
.build());
Expand Down Expand Up @@ -381,7 +385,8 @@ public void generateInvokers(List<DebeziumMediatorBuildItem> mediatorBuildItems,

CapturingInvokerGenerator capturingInvokerGenerator = new MultipleCapturingInvokerGenerators(
new CapturingEventGenerator(classOutput),
new CapturingObjectInvokerGenerator(classOutput));
new CapturingObjectInvokerGenerator(classOutput),
new CapturingEventsGenerator(classOutput));

PostProcessorGenerator postProcessorGenerator = new PostProcessorGenerator(classOutput);
CustomConverterGenerator customConverterGenerator = new CustomConverterGenerator(classOutput);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,26 @@
import io.debezium.runtime.DebeziumSerialization;
import io.debezium.runtime.EngineManifest;
import io.quarkus.debezium.configuration.DebeziumConfigurationEngineParser.MultiEngineConfiguration;
import io.quarkus.debezium.engine.capture.consumer.ChangeConsumerHandler;
import io.quarkus.debezium.engine.capture.consumer.SourceRecordConsumerHandler;

public class DebeziumFactory {

private final Instance<DebeziumSerialization> serialization;
private final StateHandler stateHandler;
private final SourceRecordConsumerHandler sourceRecordConsumerHandler;
private final ChangeConsumerHandler changeConsumerHandler;

@Inject
public DebeziumFactory(
Instance<DebeziumSerialization> serialization,
StateHandler stateHandler,
SourceRecordConsumerHandler sourceRecordConsumerHandler) {
SourceRecordConsumerHandler sourceRecordConsumerHandler,
ChangeConsumerHandler changeConsumerHandler) {
this.serialization = serialization;
this.stateHandler = stateHandler;
this.sourceRecordConsumerHandler = sourceRecordConsumerHandler;
this.changeConsumerHandler = changeConsumerHandler;
}

public Debezium get(Connector connector, MultiEngineConfiguration engine) {
Expand All @@ -39,6 +43,15 @@ public Debezium get(Connector connector, MultiEngineConfiguration engine) {

EngineManifest engineManifest = new EngineManifest(engine.engineId());

if (changeConsumerHandler != null) {
return new SourceRecordDebezium(
engine.configuration(),
changeConsumerHandler.get(engineManifest),
connector,
stateHandler,
engineManifest);
}

return new SourceRecordDebezium(
engine.configuration(),
stateHandler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

import io.debezium.config.Configuration;
import io.debezium.embedded.Connect;
import io.debezium.embedded.async.ConvertingAsyncEngineBuilderFactory;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.DebeziumEngine.ChangeConsumer;
import io.debezium.engine.DebeziumEngine.Signaler;
import io.debezium.runtime.Connector;
import io.debezium.runtime.DebeziumStatus;
Expand All @@ -42,7 +44,7 @@ class SourceRecordDebezium extends RunnableDebezium {
LOGGER.trace("Creating SourceRecordDebezium for engine {}", engineManifest);
this.configuration = configuration;
this.stateHandler = stateHandler;
this.engine = DebeziumEngine.create(Connect.class, Connect.class)
this.engine = DebeziumEngine.create(Connect.class, Connect.class, Connect.class, ConvertingAsyncEngineBuilderFactory.class.getName())
.using(Configuration.empty()
.withSystemProperties(Function.identity())
.edit()
Expand All @@ -56,6 +58,27 @@ class SourceRecordDebezium extends RunnableDebezium {
this.engineManifest = engineManifest;
}

SourceRecordDebezium(Map<String, String> configuration,
ChangeConsumer batchConsumer,
Connector connector,
StateHandler stateHandler,
EngineManifest engineManifest) {
this.configuration = configuration;
this.connector = connector;
this.stateHandler = stateHandler;
this.engineManifest = engineManifest;
this.engine = DebeziumEngine.create(Connect.class, Connect.class, Connect.class, ConvertingAsyncEngineBuilderFactory.class.getName())
.using(Configuration.empty()
.withSystemProperties(Function.identity())
.edit()
.with(Configuration.from(configuration))
.build().asProperties())
.using(this.stateHandler.connectorCallback(engineManifest, this))
.using(this.stateHandler.completionCallback(engineManifest, this))
.notifying(batchConsumer)
.build();
}

@Override
public Signaler signaler() {
return engine.getSignaler();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/

package io.quarkus.debezium.engine.capture;

import io.debezium.runtime.CapturingEvents;

public interface CapturingEventsInvoker extends CapturingInvoker<CapturingEvents<Object>> {
@Override
void capture(CapturingEvents<Object> event);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/

package io.quarkus.debezium.engine.capture;

public interface CapturingEventsInvokerRegistry<T> {
CapturingInvoker<T> get(T identifier);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/

package io.quarkus.debezium.engine.capture;

import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;

import io.debezium.runtime.CapturingEvents;

public class CapturingEventsInvokerRegistryProducer {

@Inject
Instance<CapturingEventsInvoker> invokers;

@Produces
@Dependent
public CapturingEventsInvokerRegistry<CapturingEvents<Object>> produce() {
Map<String, CapturingEventsInvoker> handlers = this.invokers
.stream()
.collect(Collectors.toMap(CapturingInvoker::generateKey, Function.identity()));

if (handlers.isEmpty()) {
return null;
}

return event -> handlers.get(event.engine() + "_" + event.destination());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/

package io.quarkus.debezium.engine.capture.consumer;

import io.debezium.runtime.EngineManifest;

public interface ChangeConsumerHandler {
GeneralChangeConsumer get(EngineManifest engineManifest);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/

package io.quarkus.debezium.engine.capture.consumer;

import java.util.List;

import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine.ChangeConsumer;
import io.debezium.engine.DebeziumEngine.RecordCommitter;

public interface GeneralChangeConsumer extends ChangeConsumer<ChangeEvent<Object, Object>> {

@Override
void handleBatch(List<ChangeEvent<Object, Object>> records, RecordCommitter<ChangeEvent<Object, Object>> committer)
throws InterruptedException;
}
Loading