-
Notifications
You must be signed in to change notification settings - Fork 24
Description
Overview
When sending Kafka messages quickly, some of them might have the same timestamp (millisecond precision). If multiple messages regarding the same row happen on a single millisecond, the Connector incorrectly applies only the first message (in that millisecond) onto the database, because it relies on the timestamp for determining order.
The environment I was testing the Connector on involved only a single partition. Kafka guarantees preserving the order of messages within a partition (partition offset). The exact scenario can also happen on a multi-partition topic, as many Kafka producers send messages with the same key to the same partition, so messages regarding the same row will end up in the same Kafka topic partition.
As you will see in "Futher investigation" the Connector already receives the messages in correct order (in my testing), but is unable to apply them correctly. A partition offset is also available to determine the correct order.
Reproduction
- Start up Confluent, ScyllaDB Sink Connector. Set up Connector with topic
t. - Download the
inputfile: input. It consists of 10 operation triplets, which add a row withv = 0, delete the row and add it again withv = 1. Therefore, the final table should contain 10 rows withv = 1:
{"pk":{"int":1},"ck":{"int":1}}${"ks.t.value_schema":{"pk":{"int":1},"ck":{"int":1},"v":{"int":0}}}
{"pk":{"int":1},"ck":{"int":1}}$null
{"pk":{"int":1},"ck":{"int":1}}${"ks.t.value_schema":{"pk":{"int":1},"ck":{"int":1},"v":{"int":1}}}
{"pk":{"int":2},"ck":{"int":2}}${"ks.t.value_schema":{"pk":{"int":2},"ck":{"int":2},"v":{"int":0}}}
{"pk":{"int":2},"ck":{"int":2}}$null
{"pk":{"int":2},"ck":{"int":2}}${"ks.t.value_schema":{"pk":{"int":2},"ck":{"int":2},"v":{"int":1}}}
[... MORE ...]
- Using the
kafka-avro-console-producerprovided by Confluent to write messages frominput:
bin/kafka-avro-console-producer --broker-list localhost:9092 --topic t --property parse.key=true \
--property key.schema='{"fields":[{"name":"pk","type":["null","int"]},{"name":"ck","type":["null","int"]}],"name":"key_schema","namespace":"ks.t","type":"record"}' \
--property "key.separator=$" --property value.schema='["null",{"fields":[{"name":"pk","type":["null","int"]},{"name":"ck","type":["null","int"]},{"name":"v","type":["null","int"]}],"name":"value_schema","namespace":"ks.t","type":"record"}]' \
--timeout 100 --request-required-acks 0 < input
- Select rows in the destination table:
SELECT * FROM test.t;
Got:
Expected: 10 rows with v = 1 and pk, ck from 1 to 10.
Futher investigation
Using kafka-avro-console-consumer I verified the messages were sent in the correct order (value part only shown):
After adding additional log statements in the Connector, (surprisingly?) it also received the messages in the correct order:
Root cause
kafka-connect-scylladb/src/main/java/io/connect/scylladb/ScyllaDbSinkTaskHelper.java
Lines 94 to 98 in aa89618
| boundStatement.setConsistencyLevel(topicConfigs.getConsistencyLevel()); | |
| boundStatement.setDefaultTimestamp(topicConfigs.getTimeStamp()); | |
| } else { | |
| boundStatement.setConsistencyLevel(this.scyllaDbSinkConnectorConfig.consistencyLevel); | |
| boundStatement.setDefaultTimestamp(record.timestamp()); |
The Connector uses a setDefaultTimestamp() method with the timestamp of a Kafka message. It is translated into CQL USING TIMESTAMP when executing a query and it prevents execution of next queries with timestamp lesser or equal.
In the reproduction example, row with pk = 1, ck = 1 is missing. It is caused by the DELETE and INSERT operation having the same USING TIMESTAMP, so INSERT is ignored:
There is another smaller issue in those lines in ScyllaDbSinkTaskHelper.java: setDefaultTimestamp() expects a epoch timestamp in microseconds, but a millisecond timestamp (from Kafka) is assigned, so the WRITETIME in database is off by a factor of 1000:




