Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@

import com.scylladb.cdc.model.worker.RawChange;
import com.scylladb.cdc.model.worker.ChangeSchema;
import com.scylladb.cdc.model.worker.cql.Cell;
import com.scylladb.cdc.model.worker.cql.CqlDate;
import com.scylladb.cdc.model.worker.cql.CqlDuration;
import com.scylladb.cdc.model.worker.cql.Field;
import io.debezium.data.Envelope;
import io.debezium.pipeline.AbstractChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
Expand All @@ -13,7 +12,11 @@

import java.util.Calendar;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.stream.Collectors;

public class ScyllaChangeRecordEmitter extends AbstractChangeRecordEmitter<ScyllaCollectionSchema> {

Expand Down Expand Up @@ -96,9 +99,9 @@ protected void emitDeleteRecord(Receiver receiver, ScyllaCollectionSchema scylla

private void fillStructWithChange(ScyllaCollectionSchema schema, Struct keyStruct, Struct valueStruct, RawChange change) {
for (ChangeSchema.ColumnDefinition cdef : change.getSchema().getNonCdcColumnDefinitions()) {
if (!ScyllaSchema.isSupportedColumnSchema(cdef)) continue;
if (!ScyllaSchema.isSupportedColumnSchema(change.getSchema(), cdef)) continue;

Object value = translateCellToKafka(change.getCell(cdef.getColumnName()));
Object value = translateFieldToKafka(change.getCell(cdef.getColumnName()));

if (cdef.getBaseTableColumnType() == ChangeSchema.ColumnType.PARTITION_KEY || cdef.getBaseTableColumnType() == ChangeSchema.ColumnType.CLUSTERING_KEY) {
valueStruct.put(cdef.getColumnName(), value);
Expand All @@ -114,35 +117,35 @@ private void fillStructWithChange(ScyllaCollectionSchema schema, Struct keyStruc
}
}

private Object translateCellToKafka(Cell cell) {
ChangeSchema.DataType dataType = cell.getColumnDefinition().getCdcLogDataType();
private Object translateFieldToKafka(Field field) {
ChangeSchema.DataType dataType = field.getDataType();

if (cell.getAsObject() == null) {
if (field.getAsObject() == null) {
return null;
}

if (dataType.getCqlType() == ChangeSchema.CqlType.DECIMAL) {
return cell.getDecimal().toString();
return field.getDecimal().toString();
}

if (dataType.getCqlType() == ChangeSchema.CqlType.UUID) {
return cell.getUUID().toString();
return field.getUUID().toString();
}

if (dataType.getCqlType() == ChangeSchema.CqlType.TIMEUUID) {
return cell.getUUID().toString();
return field.getUUID().toString();
}

if (dataType.getCqlType() == ChangeSchema.CqlType.VARINT) {
return cell.getVarint().toString();
return field.getVarint().toString();
}

if (dataType.getCqlType() == ChangeSchema.CqlType.INET) {
return cell.getInet().getHostAddress();
return field.getInet().getHostAddress();
}

if (dataType.getCqlType() == ChangeSchema.CqlType.DATE) {
CqlDate cqlDate = cell.getDate();
CqlDate cqlDate = field.getDate();
Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
calendar.clear();
// Months start from 0 in Calendar:
Expand All @@ -151,9 +154,46 @@ private Object translateCellToKafka(Cell cell) {
}

if (dataType.getCqlType() == ChangeSchema.CqlType.DURATION) {
return cell.getDuration().toString();
return field.getDuration().toString();
}

return cell.getAsObject();
if (dataType.getCqlType() == ChangeSchema.CqlType.LIST) {
return field.getList().stream().map(this::translateFieldToKafka).collect(Collectors.toList());
}

if (dataType.getCqlType() == ChangeSchema.CqlType.SET) {
return field.getSet().stream().map(this::translateFieldToKafka).collect(Collectors.toList());
}

if (dataType.getCqlType() == ChangeSchema.CqlType.MAP) {
Map<Field, Field> map = field.getMap();
Map<Object, Object> kafkaMap = new LinkedHashMap<>();
map.forEach((key, value) -> {
Object kafkaKey = translateFieldToKafka(key);
Object kafkaValue = translateFieldToKafka(value);
kafkaMap.put(kafkaKey, kafkaValue);
});
return kafkaMap;
}

if (dataType.getCqlType() == ChangeSchema.CqlType.TUPLE) {
Struct tupleStruct = new Struct(ScyllaSchema.computeColumnSchema(dataType));
List<Field> tuple = field.getTuple();
for (int i = 0; i < tuple.size(); i++) {
tupleStruct.put("tuple_member_" + i, translateFieldToKafka(tuple.get(i)));
}
return tupleStruct;
}

if (dataType.getCqlType() == ChangeSchema.CqlType.UDT) {
Struct udtStruct = new Struct(ScyllaSchema.computeColumnSchema(dataType));
Map<String, Field> udt = field.getUDT();
udt.forEach((name, value) -> {
udtStruct.put(name, translateFieldToKafka(value));
});
return udtStruct;
}

return field.getAsObject();
}
}
64 changes: 50 additions & 14 deletions src/main/java/com/scylladb/cdc/debezium/connector/ScyllaSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class ScyllaSchema implements DatabaseSchema<CollectionId> {
private static final Logger LOGGER = LoggerFactory.getLogger(ScyllaSchema.class);
Expand Down Expand Up @@ -72,7 +74,7 @@ private Map<String, Schema> computeCellSchemas(ChangeSchema changeSchema, Collec
for (ChangeSchema.ColumnDefinition cdef : changeSchema.getNonCdcColumnDefinitions()) {
if (cdef.getBaseTableColumnType() == ChangeSchema.ColumnType.PARTITION_KEY
|| cdef.getBaseTableColumnType() == ChangeSchema.ColumnType.CLUSTERING_KEY) continue;
if (!isSupportedColumnSchema(cdef)) continue;
if (!isSupportedColumnSchema(changeSchema, cdef)) continue;

Schema columnSchema = computeColumnSchema(cdef);
Schema cellSchema = SchemaBuilder.struct()
Expand All @@ -89,7 +91,7 @@ private Schema computeKeySchema(ChangeSchema changeSchema, CollectionId collecti
for (ChangeSchema.ColumnDefinition cdef : changeSchema.getNonCdcColumnDefinitions()) {
if (cdef.getBaseTableColumnType() != ChangeSchema.ColumnType.PARTITION_KEY
&& cdef.getBaseTableColumnType() != ChangeSchema.ColumnType.CLUSTERING_KEY) continue;
if (!isSupportedColumnSchema(cdef)) continue;
if (!isSupportedColumnSchema(changeSchema, cdef)) continue;

Schema columnSchema = computeColumnSchema(cdef);
keySchemaBuilder = keySchemaBuilder.field(cdef.getColumnName(), columnSchema);
Expand All @@ -102,7 +104,7 @@ private Schema computeAfterSchema(ChangeSchema changeSchema, Map<String, Schema>
SchemaBuilder afterSchemaBuilder = SchemaBuilder.struct()
.name(adjuster.adjust(configuration.getLogicalName() + "." + collectionId.getTableName().keyspace + "." + collectionId.getTableName().name + ".After"));
for (ChangeSchema.ColumnDefinition cdef : changeSchema.getNonCdcColumnDefinitions()) {
if (!isSupportedColumnSchema(cdef)) continue;
if (!isSupportedColumnSchema(changeSchema, cdef)) continue;

if (cdef.getBaseTableColumnType() != ChangeSchema.ColumnType.PARTITION_KEY && cdef.getBaseTableColumnType() != ChangeSchema.ColumnType.CLUSTERING_KEY) {
afterSchemaBuilder = afterSchemaBuilder.field(cdef.getColumnName(), cellSchemas.get(cdef.getColumnName()));
Expand All @@ -118,7 +120,7 @@ private Schema computeBeforeSchema(ChangeSchema changeSchema, Map<String, Schema
SchemaBuilder beforeSchemaBuilder = SchemaBuilder.struct()
.name(adjuster.adjust(configuration.getLogicalName() + "." + collectionId.getTableName().keyspace + "." + collectionId.getTableName().name + ".Before"));
for (ChangeSchema.ColumnDefinition cdef : changeSchema.getNonCdcColumnDefinitions()) {
if (!isSupportedColumnSchema(cdef)) continue;
if (!isSupportedColumnSchema(changeSchema, cdef)) continue;

if (cdef.getBaseTableColumnType() != ChangeSchema.ColumnType.PARTITION_KEY && cdef.getBaseTableColumnType() != ChangeSchema.ColumnType.CLUSTERING_KEY) {
beforeSchemaBuilder = beforeSchemaBuilder.field(cdef.getColumnName(), cellSchemas.get(cdef.getColumnName()));
Expand All @@ -130,8 +132,12 @@ private Schema computeBeforeSchema(ChangeSchema changeSchema, Map<String, Schema
return beforeSchemaBuilder.optional().build();
}

private Schema computeColumnSchema(ChangeSchema.ColumnDefinition cdef) {
switch (cdef.getCdcLogDataType().getCqlType()) {
protected static Schema computeColumnSchema(ChangeSchema.ColumnDefinition cdef) {
return computeColumnSchema(cdef.getCdcLogDataType());
}

protected static Schema computeColumnSchema(ChangeSchema.DataType type) {
switch (type.getCqlType()) {
case ASCII:
return Schema.OPTIONAL_STRING_SCHEMA;
case BIGINT:
Expand Down Expand Up @@ -179,21 +185,51 @@ private Schema computeColumnSchema(ChangeSchema.ColumnDefinition cdef) {
return Schema.OPTIONAL_INT8_SCHEMA;
case DURATION:
return Schema.OPTIONAL_STRING_SCHEMA;
case LIST:
case MAP:
case SET:
case UDT:
case TUPLE:
case LIST: {
Schema innerSchema = computeColumnSchema(type.getTypeArguments().get(0));
return SchemaBuilder.array(innerSchema);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return SchemaBuilder.array(innerSchema);
return SchemaBuilder.array(innerSchema).optional().build();

}
case MAP: {
Schema keySchema = computeColumnSchema(type.getTypeArguments().get(0));
Schema valueSchema = computeColumnSchema(type.getTypeArguments().get(1));
return SchemaBuilder.map(keySchema, valueSchema);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return SchemaBuilder.map(keySchema, valueSchema);
return SchemaBuilder.map(keySchema, valueSchema).optional().build();

}
case TUPLE: {
List<Schema> innerSchemas = type.getTypeArguments().stream()
.map(ScyllaSchema::computeColumnSchema).collect(Collectors.toList());
SchemaBuilder tupleSchema = SchemaBuilder.struct();
for (int i = 0; i < innerSchemas.size(); i++) {
tupleSchema = tupleSchema.field("tuple_member_" + i, innerSchemas.get(i));
}
return tupleSchema.optional().build();
}
case UDT: {
SchemaBuilder udtSchema = SchemaBuilder.struct();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
SchemaBuilder udtSchema = SchemaBuilder.struct();
SchemaBuilder udtSchema = SchemaBuilder.struct().name(type.getUdtType().getName());

for (Map.Entry<String, ChangeSchema.DataType> field : type.getUdtType().getFields().entrySet()) {
udtSchema = udtSchema.field(field.getKey(), computeColumnSchema(field.getValue()));
}
return udtSchema.optional().build();
}
default:
throw new UnsupportedOperationException();
}
}

protected static boolean isSupportedColumnSchema(ChangeSchema.ColumnDefinition cdef) {
protected static boolean isSupportedColumnSchema(ChangeSchema changeSchema, ChangeSchema.ColumnDefinition cdef) {
ChangeSchema.CqlType type = cdef.getCdcLogDataType().getCqlType();
return type != ChangeSchema.CqlType.LIST && type != ChangeSchema.CqlType.MAP &&
type != ChangeSchema.CqlType.SET && type != ChangeSchema.CqlType.UDT &&
type != ChangeSchema.CqlType.TUPLE;
if (type == ChangeSchema.CqlType.LIST || type == ChangeSchema.CqlType.SET
|| type == ChangeSchema.CqlType.MAP || type == ChangeSchema.CqlType.UDT) {
// We only support frozen lists, sets, maps and UDTs,
// (which can be identified by cdc$deleted_elements_ column).

// FIXME: When isFrozen is fixed in scylla-cdc-java (PR #60),
// replace with just a call to isFrozen.
String deletedElementsColumnName = "cdc$deleted_elements_" + cdef.getColumnName();
return changeSchema.getAllColumnDefinitions().stream()
.noneMatch(c -> c.getColumnName().equals(deletedElementsColumnName));
}
return true;
}

public ScyllaCollectionSchema updateChangeSchema(CollectionId collectionId, ChangeSchema changeSchema) {
Expand Down