Skip to content

Commit 59890aa

Browse files
Chirag Bansaldkropachev
authored andcommitted
Adding option to push only post image events to kafka
1 parent 2cec699 commit 59890aa

File tree

5 files changed

+39
-11
lines changed

5 files changed

+39
-11
lines changed

README.md

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,25 @@ The connector has the following capabilities:
1515
- `UPDATE`
1616
- `DELETE` (single row deletes)
1717
- High scalability - able to split work accross multiple Kafka Connect workers
18-
- Fault tolerant - connector periodically saves its progress and can resume from previously saved offset (with at-least-once semantics)
18+
- Fault-tolerant - connector periodically saves its progress and can resume from previously saved offset (with at-least-once semantics)
1919
- Support for many standard Kafka Connect converters, such as JSON and Avro
2020
- Compatible with standard Kafka Connect transformations
2121
- Metadata about CDC events - each generated Kafka message contains information about source, such as timestamp and table name
2222
- Seamless handling of schema changes and topology changes (adding, removing nodes from Scylla cluster)
2323
- Preimage support ([optional](#advanced-configuration-parameters)) - messages generated for row-level changes can have their [`before`](#data-change-event-value) field filled with information from corresponding preimage row.
24+
- Post image only: You can configure the connector to produce only `POST_IMAGE` cdc events as `CREATE` events.
2425

2526
The connector has the following limitations:
2627
- Only Kafka 2.6.0+ is supported
2728
- Only row-level operations are produced (`INSERT`, `UPDATE`, `DELETE`):
2829
- Partition deletes - those changes are ignored
2930
- Row range deletes - those changes are ignored
3031
- No support for collection types (`LIST`, `SET`, `MAP`) and `UDT` - columns with those types are omitted from generated messages
32+
<<<<<<< HEAD
3133
- No support for postimage, preimage needs to be enabled - By default changes only contain those columns that were modified, not the entire row before/after change. More information [here](#cell-representation)
34+
=======
35+
- No support for preimage - changes only contain those columns that were modified, not the entire row before/after change. More information [here](#cell-representation)
36+
>>>>>>> 4c29e07 (Adding option to push only post image events to kafka)
3237
3338
## Connector installation
3439

@@ -684,6 +689,7 @@ The connector will generate the following data change event's value (with JSON s
684689

685690
In addition to the configuration parameters described in the ["Configuration"](#configuration) section, Scylla CDC Source Connector exposes the following (non-required) configuration parameters:
686691

692+
<<<<<<< HEAD
687693
| Property | Description |
688694
|----------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
689695
| `scylla.query.time.window.size` | The size of windows queried by the connector. Changes are queried using `SELECT` statements with time restriction with width defined by this parameter. Value expressed in milliseconds. |
@@ -692,6 +698,15 @@ In addition to the configuration parameters described in the ["Configuration"](#
692698
| `scylla.local.dc` | The name of Scylla local datacenter. This local datacenter name will be used to setup the connection to Scylla to prioritize sending requests to the nodes in the local datacenter. If not set, no particular datacenter will be prioritized. |
693699
| `experimental.preimages.enabled` | False by default. If enabled connector will use `PRE_IMAGE` CDC entries to populate 'before' field of the debezium Envelope of the next kafka message. This may change some expected behaviours (e.g. ROW_DELETE will use preimage instead of its own information). Relies on correct ordering of rows within same stream in CDC tables. |
694700

701+
=======
702+
| Property | Description |
703+
| --- | --- |
704+
| `scylla.query.time.window.size` | The size of windows queried by the connector. Changes are queried using `SELECT` statements with time restriction with width defined by this parameter. Value expressed in milliseconds. |
705+
| `scylla.confidence.window.size` | The size of the confidence window. It is necessary for the connector to avoid reading too fresh data from the CDC log due to the eventual consistency of Scylla. The problem could appear when a newer write reaches a replica before some older write. For a short period of time, when reading, it is possible for the replica to return only the newer write. The connector mitigates this problem by not reading a window of most recent changes (controlled by this parameter). Value expressed in milliseconds.|
706+
| `scylla.consistency.level` | The consistency level of CDC table read queries. This consistency level is used only for read queries to the CDC log table. By default, `QUORUM` level is used. |
707+
| `scylla.local.dc` | The name of Scylla local datacenter. This local datacenter name will be used to setup the connection to Scylla to prioritize sending requests to the nodes in the local datacenter. If not set, no particular datacenter will be prioritized. |
708+
| `post.image.only` | Push only the post image events from scylla cdc to kafka. The events are pushed as `CREATE` events. |
709+
>>>>>>> 4c29e07 (Adding option to push only post image events to kafka)
695710
696711
### Configuration for large Scylla clusters
697712
#### Offset (progress) storage

src/main/java/com/scylladb/cdc/debezium/connector/ScyllaChangeRecordEmitter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ protected Envelope.Operation getOperation() {
5151
case ROW_UPDATE:
5252
return Envelope.Operation.UPDATE;
5353
case ROW_INSERT:
54+
case POST_IMAGE:
5455
return Envelope.Operation.CREATE;
5556
case PARTITION_DELETE: // See comment in ScyllaChangesConsumer on the support of partition deletes.
5657
case ROW_DELETE:

src/main/java/com/scylladb/cdc/debezium/connector/ScyllaChangesConsumer.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.scylladb.cdc.model.TaskId;
44
import com.scylladb.cdc.model.worker.ChangeSchema;
55
import com.scylladb.cdc.model.worker.RawChange;
6+
import com.scylladb.cdc.model.worker.RawChange.OperationType;
67
import com.scylladb.cdc.model.worker.Task;
78
import com.scylladb.cdc.model.worker.TaskAndRawChangeConsumer;
89
import io.debezium.pipeline.EventDispatcher;
@@ -23,8 +24,9 @@ public class ScyllaChangesConsumer implements TaskAndRawChangeConsumer {
2324
private final Clock clock;
2425
private final boolean usePreimages;
2526
private final Map<TaskId, RawChange> lastPreImage;
27+
private final boolean postImageOnly;
2628

27-
public ScyllaChangesConsumer(EventDispatcher<CollectionId> dispatcher, ScyllaOffsetContext offsetContext, ScyllaSchema schema, Clock clock, boolean usePreimages) {
29+
public ScyllaChangesConsumer(EventDispatcher<CollectionId> dispatcher, ScyllaOffsetContext offsetContext, ScyllaSchema schema, Clock clock, boolean usePreimages, boolean postImageOnly) {
2830
this.dispatcher = dispatcher;
2931
this.offsetContext = offsetContext;
3032
this.schema = schema;
@@ -35,6 +37,7 @@ public ScyllaChangesConsumer(EventDispatcher<CollectionId> dispatcher, ScyllaOff
3537
} else {
3638
this.lastPreImage = null;
3739
}
40+
this.postImageOnly = postImageOnly;
3841
}
3942

4043
@Override
@@ -66,10 +69,13 @@ public CompletableFuture<Void> consume(Task task, RawChange change) {
6669
if (hasClusteringColumn) {
6770
return CompletableFuture.completedFuture(null);
6871
}
69-
} else if (operationType != RawChange.OperationType.ROW_INSERT
72+
} else if (!this.postImageOnly
73+
&& operationType != RawChange.OperationType.ROW_INSERT
7074
&& operationType != RawChange.OperationType.ROW_UPDATE
7175
&& operationType != RawChange.OperationType.ROW_DELETE) {
7276
return CompletableFuture.completedFuture(null);
77+
} else if (this.postImageOnly && operationType != OperationType.POST_IMAGE) {
78+
return CompletableFuture.completedFuture(null);
7379
}
7480

7581
if (usePreimages && lastPreImage.containsKey(task.id)) {

src/main/java/com/scylladb/cdc/debezium/connector/ScyllaConnectorConfig.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,14 @@ public class ScyllaConnectorConfig extends CommonConnectorConfig {
167167
"driver's QueryOptions before session construction. Set this to an explicit value if " +
168168
"experiencing too high memory usage.");
169169

170+
public static final boolean POST_IMAGE_ONLY_DEFAULT = false;
171+
public static final Field POST_IMAGE_ONLY = Field.create("post.image.only")
172+
.withDisplayName("Post Image Only")
173+
.withType(ConfigDef.Type.BOOLEAN)
174+
.withWidth(ConfigDef.Width.SHORT)
175+
.withImportance(ConfigDef.Importance.MEDIUM)
176+
.withDescription("Whether the connector should publish only post image events. The cdc settings must have `'postimage': 'true'`. To get full image with all fields, set `'preimage': 'full'`.");
177+
170178
public static final Field LOCAL_DC_NAME = Field.create("scylla.local.dc")
171179
.withDisplayName("Local DC Name")
172180
.withType(ConfigDef.Type.STRING)
@@ -205,7 +213,7 @@ public class ScyllaConnectorConfig extends CommonConnectorConfig {
205213
.name("Scylla")
206214
.type(CLUSTER_IP_ADDRESSES, USER, PASSWORD, LOGICAL_NAME, CONSISTENCY_LEVEL, QUERY_OPTIONS_FETCH_SIZE, LOCAL_DC_NAME, SSL_ENABLED, SSL_PROVIDER, SSL_TRUSTSTORE_PATH, SSL_TRUSTSTORE_PASSWORD, SSL_KEYSTORE_PATH, SSL_KEYSTORE_PASSWORD,SSL_CIPHER_SUITES, SSL_OPENSLL_KEYCERTCHAIN, SSL_OPENSLL_PRIVATEKEY)
207215
.connector(QUERY_TIME_WINDOW_SIZE, CONFIDENCE_WINDOW_SIZE, PREIMAGES_ENABLED)
208-
.events(TABLE_NAMES)
216+
.events(TABLE_NAMES, POST_IMAGE_ONLY)
209217
.excluding(Heartbeat.HEARTBEAT_INTERVAL).events(CUSTOM_HEARTBEAT_INTERVAL)
210218
// Exclude some Debezium options, which are not applicable/not supported by
211219
// the Scylla CDC Source Connector.
@@ -305,6 +313,10 @@ public CQLConfiguration.ConsistencyLevel getConsistencyLevel() {
305313
}
306314
}
307315

316+
public boolean isPostImageOnly() {
317+
return config.getBoolean(ScyllaConnectorConfig.POST_IMAGE_ONLY, POST_IMAGE_ONLY_DEFAULT);
318+
}
319+
308320
public String getLocalDCName() {
309321
return config.getString(ScyllaConnectorConfig.LOCAL_DC_NAME);
310322
}

src/main/java/com/scylladb/cdc/debezium/connector/ScyllaStreamingChangeEventSource.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,14 @@
22

33
import com.scylladb.cdc.cql.driver3.Driver3Session;
44
import com.scylladb.cdc.cql.driver3.Driver3WorkerCQL;
5-
import com.scylladb.cdc.model.ExponentialRetryBackoffWithJitter;
6-
import com.scylladb.cdc.model.RetryBackoff;
75
import com.scylladb.cdc.model.worker.WorkerConfiguration;
86
import com.scylladb.cdc.model.worker.Worker;
97
import io.debezium.pipeline.EventDispatcher;
108
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
119
import io.debezium.util.Clock;
1210
import org.apache.commons.lang3.tuple.Pair;
1311

14-
import java.net.InetAddress;
15-
import java.net.InetSocketAddress;
16-
import java.sql.Driver;
1712
import java.time.Duration;
18-
import java.util.List;
1913
import java.util.concurrent.ExecutionException;
2014
import java.util.stream.Collectors;
2115

@@ -43,7 +37,7 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
4337
Driver3Session session = new ScyllaSessionBuilder(configuration).build();
4438
Driver3WorkerCQL cql = new Driver3WorkerCQL(session);
4539
ScyllaWorkerTransport workerTransport = new ScyllaWorkerTransport(context, offsetContext, dispatcher, configuration.getHeartbeatIntervalMs());
46-
ScyllaChangesConsumer changeConsumer = new ScyllaChangesConsumer(dispatcher, offsetContext, schema, clock, configuration.getPreimagesEnabled());
40+
ScyllaChangesConsumer changeConsumer = new ScyllaChangesConsumer(dispatcher, offsetContext, schema, clock, configuration.getPreimagesEnabled(), configuration.isPostImageOnly());
4741
WorkerConfiguration workerConfiguration = WorkerConfiguration.builder()
4842
.withTransport(workerTransport)
4943
.withCQL(cql)

0 commit comments

Comments
 (0)