Skip to content

Commit 99edf5c

Browse files
authored
Merge pull request #190 from conker84/issue_186
fixes #186: Kafka even sink with manual commit fails with multiple topic subscriptions
2 parents 515fd85 + f4928b1 commit 99edf5c

File tree

2 files changed

+73
-9
lines changed

2 files changed

+73
-9
lines changed

consumer/src/main/kotlin/streams/kafka/KafkaEventSink.kt

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ class KafkaEventSink(private val config: Config,
9595

9696
private fun createJob(): Job {
9797
log.info("Creating Sink daemon Job")
98-
return GlobalScope.launch(Dispatchers.IO) {
98+
return GlobalScope.launch(Dispatchers.IO) { // TODO improve exception management
9999
try {
100100
while (isActive) {
101101
if (Neo4jUtils.isWriteableInstance(db)) {
@@ -177,7 +177,11 @@ open class KafkaAutoCommitEventConsumer(private val config: KafkaSinkConfigurati
177177
if (!records.isEmpty) {
178178
try {
179179
this.topics.forEach { topic ->
180-
action(topic, records.records(topic).map { JSONUtils.readValue<Any>(it.value()) })
180+
val topicRecords = records.records(topic)
181+
if (!topicRecords.iterator().hasNext()) {
182+
return@forEach
183+
}
184+
action(topic, topicRecords.map { JSONUtils.readValue<Any>(it.value()) })
181185
}
182186
} catch (e: Exception) {
183187
// TODO add dead letter queue
@@ -210,10 +214,10 @@ open class KafkaAutoCommitEventConsumer(private val config: KafkaSinkConfigurati
210214
fun toConsumerRecordsMap(topicPartitionsMap: Map<TopicPartition, Long>,
211215
records: ConsumerRecords<String, ByteArray>)
212216
: Map<TopicPartition, List<ConsumerRecord<String, ByteArray>>> = topicPartitionsMap
213-
.mapValues {
214-
records.records(it.key)
215-
}
216-
.filterValues { it.isNotEmpty() }
217+
.mapValues {
218+
records.records(it.key)
219+
}
220+
.filterValues { it.isNotEmpty() }
217221

218222
fun setSeek(topicPartitionsMap: Map<TopicPartition, Long>) {
219223
if (isSeekSet) {
@@ -250,6 +254,9 @@ class KafkaManualCommitEventConsumer(private val config: KafkaSinkConfiguration,
250254
if (!records.isEmpty) {
251255
this.topics.forEach { topic ->
252256
val topicRecords = records.records(topic)
257+
if (!topicRecords.iterator().hasNext()) {
258+
return@forEach
259+
}
253260
val lastRecord = topicRecords.last()
254261
val offsetAndMetadata = OffsetAndMetadata(lastRecord.offset(), "")
255262
val topicPartition = TopicPartition(lastRecord.topic(), lastRecord.partition())
@@ -315,7 +322,7 @@ class StreamsConsumerRebalanceListener(private val topicPartitionOffsetMap: Map<
315322
.map {
316323
val offset = consumer.position(it)
317324
if (log.isDebugEnabled) {
318-
log.debug("for topic ${it.topic()} partition ${it.partition()}, the last saved offset is: $offset")
325+
log.debug("onPartitionsRevoked: for topic ${it.topic()} partition ${it.partition()}, the last saved offset is: $offset")
319326
}
320327
it to OffsetAndMetadata(offset, "")
321328
}
@@ -325,9 +332,9 @@ class StreamsConsumerRebalanceListener(private val topicPartitionOffsetMap: Map<
325332

326333
override fun onPartitionsAssigned(partitions: Collection<TopicPartition>) {
327334
for (partition in partitions) {
328-
val offset = topicPartitionOffsetMap[partition]?.offset()
335+
val offset = (topicPartitionOffsetMap[partition] ?: consumer.committed(partition))?.offset()
329336
if (log.isDebugEnabled) {
330-
log.debug("for ${partition.topic()} partition ${partition.partition()}, the retrieved offset is: $offset")
337+
log.debug("onPartitionsAssigned: for ${partition.topic()} partition ${partition.partition()}, the retrieved offset is: $offset")
331338
}
332339
if (offset == null) {
333340
when (autoOffsetReset) {

consumer/src/test/kotlin/integrations/KafkaEventSinkIT.kt

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,63 @@ class KafkaEventSinkIT {
401401
}, equalTo(true), 30, TimeUnit.SECONDS)
402402
}
403403

404+
@Test
405+
fun `should fix issue 186 with auto commit false`() {
406+
val product = "product" to "MERGE (p:Product {id: event.id}) ON CREATE SET p.name = event.name"
407+
val customer = "customer" to "MERGE (c:Customer {id: event.id}) ON CREATE SET c.name = event.name"
408+
val bought = "bought" to """
409+
MERGE (c:Customer {id: event.id})
410+
MERGE (p:Product {id: event.id})
411+
MERGE (c)-[:BOUGHT]->(p)
412+
""".trimIndent()
413+
graphDatabaseBuilder.setConfig("streams.sink.topic.cypher.${product.first}", product.second)
414+
graphDatabaseBuilder.setConfig("streams.sink.topic.cypher.${customer.first}", customer.second)
415+
graphDatabaseBuilder.setConfig("streams.sink.topic.cypher.${bought.first}", bought.second)
416+
graphDatabaseBuilder.setConfig("kafka.${ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG}", "false")
417+
db = graphDatabaseBuilder.newGraphDatabase() as GraphDatabaseAPI
418+
419+
val props = mapOf("id" to 1, "name" to "My Awesome Product")
420+
var producerRecord = ProducerRecord(product.first, UUID.randomUUID().toString(),
421+
JSONUtils.writeValueAsBytes(props))
422+
kafkaProducer.send(producerRecord).get()
423+
assertEventually(ThrowingSupplier<Boolean, Exception> {
424+
val query = """
425+
MATCH (p:Product)
426+
WHERE properties(p) = {props}
427+
RETURN count(p) AS count
428+
""".trimIndent()
429+
val result = db.execute(query, mapOf("props" to props)).columnAs<Long>("count")
430+
result.hasNext() && result.next() == 1L && !result.hasNext()
431+
}, equalTo(true), 30, TimeUnit.SECONDS)
432+
}
404433

434+
@Test
435+
fun `should fix issue 186 with auto commit true`() {
436+
val product = "product" to "MERGE (p:Product {id: event.id}) ON CREATE SET p.name = event.name"
437+
val customer = "customer" to "MERGE (c:Customer {id: event.id}) ON CREATE SET c.name = event.name"
438+
val bought = "bought" to """
439+
MERGE (c:Customer {id: event.id})
440+
MERGE (p:Product {id: event.id})
441+
MERGE (c)-[:BOUGHT]->(p)
442+
""".trimIndent()
443+
graphDatabaseBuilder.setConfig("streams.sink.topic.cypher.${product.first}", product.second)
444+
graphDatabaseBuilder.setConfig("streams.sink.topic.cypher.${customer.first}", customer.second)
445+
graphDatabaseBuilder.setConfig("streams.sink.topic.cypher.${bought.first}", bought.second)
446+
db = graphDatabaseBuilder.newGraphDatabase() as GraphDatabaseAPI
447+
448+
val props = mapOf("id" to 1, "name" to "My Awesome Product")
449+
var producerRecord = ProducerRecord(product.first, UUID.randomUUID().toString(),
450+
JSONUtils.writeValueAsBytes(props))
451+
kafkaProducer.send(producerRecord).get()
452+
assertEventually(ThrowingSupplier<Boolean, Exception> {
453+
val query = """
454+
MATCH (p:Product)
455+
WHERE properties(p) = {props}
456+
RETURN count(p) AS count
457+
""".trimIndent()
458+
val result = db.execute(query, mapOf("props" to props)).columnAs<Long>("count")
459+
result.hasNext() && result.next() == 1L && !result.hasNext()
460+
}, equalTo(true), 30, TimeUnit.SECONDS)
461+
}
405462

406463
}

0 commit comments

Comments
 (0)