Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ protected Envelope.Operation getOperation() {
case ROW_UPDATE:
return Envelope.Operation.UPDATE;
case ROW_INSERT:
// TODO: Postimages always generate some delta as part of an operation.
// Use the right envelope to its corresponding delta.
case POST_IMAGE:
return Envelope.Operation.CREATE;
case PARTITION_DELETE: // See comment in ScyllaChangesConsumer on the support of partition deletes.
case ROW_DELETE:
Expand Down Expand Up @@ -112,15 +115,23 @@ protected void emitDeleteRecord(Receiver receiver, ScyllaCollectionSchema scylla
scyllaCollectionSchema = this.schema.updateChangeSchema(scyllaCollectionSchema.id(), change.getSchema());

Struct keyStruct = new Struct(scyllaCollectionSchema.keySchema());
Struct beforeStruct = new Struct(scyllaCollectionSchema.beforeSchema());
Struct afterStruct = new Struct(scyllaCollectionSchema.afterSchema());
fillStructWithChange(scyllaCollectionSchema, keyStruct, afterStruct, change);

Struct envelope;
if (preImage != null) {
Struct beforeStruct = new Struct(scyllaCollectionSchema.beforeSchema());
fillStructWithChange(scyllaCollectionSchema, keyStruct, beforeStruct, preImage);
envelope = generalizedEnvelope(scyllaCollectionSchema.getEnvelopeSchema().schema(),
beforeStruct, afterStruct, getOffset().getSourceInfo(),
getClock().currentTimeAsInstant(), Envelope.Operation.DELETE);
} else {
fillStructWithChange(scyllaCollectionSchema, keyStruct, beforeStruct, change);
// before is null on a row/partition delete operation. We represent the affected KeySchema in the after field, where the remaining cells are null.
envelope = generalizedEnvelope(scyllaCollectionSchema.getEnvelopeSchema().schema(),
null, afterStruct, getOffset().getSourceInfo(),
getClock().currentTimeAsInstant(), Envelope.Operation.DELETE);
}

Struct envelope = scyllaCollectionSchema.getEnvelopeSchema().delete(beforeStruct, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());

receiver.changeRecord(scyllaCollectionSchema, getOperation(), keyStruct, envelope, getOffset(), null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ public class ScyllaChangesConsumer implements TaskAndRawChangeConsumer {
private final ScyllaSchema schema;
private final Clock clock;
private final boolean usePreimages;
private final boolean usePostimages;
private final Map<TaskId, RawChange> lastPreImage;

public ScyllaChangesConsumer(EventDispatcher<CollectionId> dispatcher, ScyllaOffsetContext offsetContext, ScyllaSchema schema, Clock clock, boolean usePreimages) {
public ScyllaChangesConsumer(EventDispatcher<CollectionId> dispatcher, ScyllaOffsetContext offsetContext,
ScyllaSchema schema, Clock clock, boolean usePreimages, boolean usePostimages) {
this.dispatcher = dispatcher;
this.offsetContext = offsetContext;
this.schema = schema;
Expand All @@ -35,8 +37,15 @@ public ScyllaChangesConsumer(EventDispatcher<CollectionId> dispatcher, ScyllaOff
} else {
this.lastPreImage = null;
}

this.usePostimages = usePostimages;
}

// TLDR;
// 1. Check for Preimages first
// 2. Verify whether a PARTITION_DELETE exists
// 3. If postImages are disabled, accept only INSERT/UPDATE/ROW_DELETE events
// 4. 4. If postImages are enabled, accept only POST_IMAGE events.
@Override
public CompletableFuture<Void> consume(Task task, RawChange change) {
try {
Expand Down Expand Up @@ -66,10 +75,13 @@ public CompletableFuture<Void> consume(Task task, RawChange change) {
if (hasClusteringColumn) {
return CompletableFuture.completedFuture(null);
}
} else if (operationType != RawChange.OperationType.ROW_INSERT
} else if (!this.usePostimages
&& operationType != RawChange.OperationType.ROW_INSERT
&& operationType != RawChange.OperationType.ROW_UPDATE
&& operationType != RawChange.OperationType.ROW_DELETE) {
return CompletableFuture.completedFuture(null);
} else if (this.usePostimages && operationType != RawChange.OperationType.POST_IMAGE) {
return CompletableFuture.completedFuture(null);
}

if (usePreimages && lastPreImage.containsKey(task.id)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,16 +176,25 @@ public class ScyllaConnectorConfig extends CommonConnectorConfig {
"the connection to Scylla to prioritize sending requests to " +
"the nodes in the local datacenter. If not set, no particular datacenter will be prioritized.");

public static final Field PREIMAGES_ENABLED = Field.create("experimental.preimages.enabled")
.withDisplayName("Enable experimental preimages support")
public static final Field PREIMAGES_ENABLED = Field.create("preimages.enabled")
.withDisplayName("Enable preimages support")
.withType(ConfigDef.Type.BOOLEAN)
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.LOW)
.withDefault(false)
.withDescription("If enabled connector will use PRE_IMAGE CDC entries to populate 'before' field of the " +
"debezium Envelope of the next kafka message. This may change some expected behaviours (e.g. ROW_DELETE " +
"will use preimage instead of its own information). See Scylla docs for more information about CDC " +
"preimages limitations. ");
"debezium Envelope of the next kafka message.");

public static final Field POSTIMAGES_ENABLED = Field.create("postimages.enabled")
.withDisplayName("Enable postimages support")
.withType(ConfigDef.Type.BOOLEAN)
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.MEDIUM)
.withDefault(false)
.withDescription("Whether the connector should ignore insert and update deltas and consume PostImage " +
"events instead. PostImages only get generated for rows modified as part of an UPDATE or INSERT " +
"statement. Only relevant when `'postimage': 'true'`, otherwise this option simply disables the " +
"consumption of deltas.");

/*
* Scylla CDC Source Connector relies on heartbeats to move the offset,
Expand All @@ -204,7 +213,7 @@ public class ScyllaConnectorConfig extends CommonConnectorConfig {
CommonConnectorConfig.CONFIG_DEFINITION.edit()
.name("Scylla")
.type(CLUSTER_IP_ADDRESSES, USER, PASSWORD, LOGICAL_NAME, CONSISTENCY_LEVEL, QUERY_OPTIONS_FETCH_SIZE, LOCAL_DC_NAME, SSL_ENABLED, SSL_PROVIDER, SSL_TRUSTSTORE_PATH, SSL_TRUSTSTORE_PASSWORD, SSL_KEYSTORE_PATH, SSL_KEYSTORE_PASSWORD,SSL_CIPHER_SUITES, SSL_OPENSLL_KEYCERTCHAIN, SSL_OPENSLL_PRIVATEKEY)
.connector(QUERY_TIME_WINDOW_SIZE, CONFIDENCE_WINDOW_SIZE, PREIMAGES_ENABLED)
.connector(QUERY_TIME_WINDOW_SIZE, CONFIDENCE_WINDOW_SIZE, PREIMAGES_ENABLED, POSTIMAGES_ENABLED)
.events(TABLE_NAMES)
.excluding(Heartbeat.HEARTBEAT_INTERVAL).events(CUSTOM_HEARTBEAT_INTERVAL)
// Exclude some Debezium options, which are not applicable/not supported by
Expand Down Expand Up @@ -313,6 +322,10 @@ public boolean getPreimagesEnabled() {
return config.getBoolean(ScyllaConnectorConfig.PREIMAGES_ENABLED);
}

public boolean getPostimagesEnabled() {
return config.getBoolean(ScyllaConnectorConfig.POSTIMAGES_ENABLED);
}

public int getQueryOptionsFetchSize() {
return config.getInteger(ScyllaConnectorConfig.QUERY_OPTIONS_FETCH_SIZE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
Driver3Session session = new ScyllaSessionBuilder(configuration).build();
Driver3WorkerCQL cql = new Driver3WorkerCQL(session);
ScyllaWorkerTransport workerTransport = new ScyllaWorkerTransport(context, offsetContext, dispatcher, configuration.getHeartbeatIntervalMs());
ScyllaChangesConsumer changeConsumer = new ScyllaChangesConsumer(dispatcher, offsetContext, schema, clock, configuration.getPreimagesEnabled());
ScyllaChangesConsumer changeConsumer = new ScyllaChangesConsumer(dispatcher, offsetContext, schema, clock, configuration.getPreimagesEnabled(), configuration.getPostimagesEnabled());
WorkerConfiguration workerConfiguration = WorkerConfiguration.builder()
.withTransport(workerTransport)
.withCQL(cql)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,39 +1,72 @@
package com.scylladb.cdc.debezium.connector.transforms;

import java.util.Map;
import io.debezium.data.Envelope;
import io.debezium.transforms.ExtractNewRecordState;
import io.debezium.transforms.ExtractNewRecordStateConfigDefinition;
import io.debezium.transforms.ExtractNewRecordStateConfigDefinition.DeleteHandling;

import org.apache.kafka.common.cache.Cache;
import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;

import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.util.SchemaUtil;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Schema.Type;
import org.apache.kafka.connect.data.Struct;

public class ScyllaExtractNewRecordState<R extends ConnectRecord<R>> extends ExtractNewRecordState<R> {
private Cache<Schema, Schema> schemaUpdateCache = new SynchronizedCache<>(new LRUCache<>(16));
private Cache<Integer, Schema> schemaUpdateCache = new SynchronizedCache<>(new LRUCache<>(16));
Copy link
Collaborator

@Bouncheck Bouncheck Mar 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the cache does not help much when so many different possible keys are introduced. Previously it was (generally) 1 schema for 1 table. Now depending on N non-key columns there are 2^N possible custom schemas. Of course not all of them will be generated but even a handful is a significant increase from 1. It could easily exceed 16

private DeleteHandling handleDeletes;

@Override
public void configure(final Map<String, ?> configs) {
super.configure(configs);
}


@Override
public R apply(final R record) {
// Debezium by default does not emit a rewrite (delete.handling.mode)
// when an after field is present. However, in ScyllaDB, we represent
// the affected keys in the after field.
boolean isDelete = false;
boolean hasRewrite = false;
Copy link
Collaborator

@Bouncheck Bouncheck Mar 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see hasRewrite used


if (record != null && (record.value() instanceof Struct) && record instanceof SourceRecord) {
final SourceRecord sr = (SourceRecord) record;
if (Envelope.operationFor(sr) == Envelope.Operation.DELETE) {
isDelete = true;
}
}

final R ret = super.apply(record);
if (ret == null || !(ret.value() instanceof Struct)) {
return ret;
}

final Struct value = (Struct)ret.value();

Schema updatedSchema = schemaUpdateCache.get(value.schema());
int schemaKey = getSchemaKey(value);

Schema updatedSchema = schemaUpdateCache.get(schemaKey);
if (updatedSchema == null) {
updatedSchema = makeUpdatedSchema(value.schema());
schemaUpdateCache.put(value.schema(), updatedSchema);
updatedSchema = makeUpdatedSchema(value.schema(), value);
schemaUpdateCache.put(schemaKey, updatedSchema);
}

final Struct updatedValue = new Struct(updatedSchema);

for (Field field : value.schema().fields()) {
if (isDroppableField(field, value)) {
continue;
}

if (isSimplifiableField(field)) {
Struct fieldValue = (Struct) value.get(field);
updatedValue.put(field.name(), fieldValue == null ? null : fieldValue.get("value"));
Expand All @@ -42,6 +75,12 @@ public R apply(final R record) {
}
}

if (isDelete) {
if (handleDeletes == DeleteHandling.REWRITE) {
updatedValue.put(ExtractNewRecordStateConfigDefinition.DELETED_FIELD, "true");
}
}

return ret.newRecord(ret.topic(), ret.kafkaPartition(), ret.keySchema(), ret.key(), updatedSchema, updatedValue, ret.timestamp());
}

Expand All @@ -51,6 +90,43 @@ public void close() {
schemaUpdateCache = null;
}

// getSchemaKey computes a unique schema according to the payload values
// This ensures different values are stored with different keys in the LRU cache.
private int getSchemaKey(Struct value) {
Schema schema = value.schema();
final int prime = 31;
int hash = 1;

hash += prime * hash + (schema.name() == null ? 0 : schema.name().hashCode());

for (Field field : schema.fields()) {
hash = prime * hash + field.name().hashCode();

Object fieldValue = value.get(field);
boolean isNull = fieldValue == null;
hash = prime * hash + (isNull ? 0 : 1);

if (!isNull && field.schema().type() == Type.STRUCT) {
hash = prime * hash + field.schema().type().hashCode();
}
}

return hash;
}

// isDroppableField drops null Struct values within a Field. It does NOT drops empty Structs representing an actual delta mutation.
// This allow subscribers to upsert deltas and converge to a stable result. For example, the following Type/Values will return:
// False (don't drop) -- Type: STRUCT Value: Struct{}
// True (drop) -- Type: STRUCT Value: null
private boolean isDroppableField(Field field, Struct value) {
if (field.schema().type() == Type.STRUCT
&& (value.get(field) == null)) {
return field.schema().isOptional();
}

return false;
}

private boolean isSimplifiableField(Field field) {
if (field.schema().type() != Type.STRUCT) {
return false;
Expand All @@ -64,10 +140,14 @@ private boolean isSimplifiableField(Field field) {
return true;
}

private Schema makeUpdatedSchema(Schema schema) {
private Schema makeUpdatedSchema(Schema schema, Struct value) {
final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct());

for (Field field : schema.fields()) {
if (isDroppableField(field, value)) {
continue;
}

if (isSimplifiableField(field)) {
builder.field(field.name(), field.schema().field("value").schema());
} else {
Expand Down
Loading