From 5eb353cb08304faf63529569635afe53a2346f94 Mon Sep 17 00:00:00 2001 From: Rania ZYANE Date: Tue, 7 Feb 2023 15:55:11 +0100 Subject: [PATCH 1/2] Update CassandraDriver.scala --- src/main/scala/cassandra/CassandraDriver.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/cassandra/CassandraDriver.scala b/src/main/scala/cassandra/CassandraDriver.scala index 90dfa80..95abf81 100644 --- a/src/main/scala/cassandra/CassandraDriver.scala +++ b/src/main/scala/cassandra/CassandraDriver.scala @@ -73,7 +73,7 @@ object CassandraDriver extends LazyLogger { * assign json string {"topicA":[0,1],"topicB":[2,4]} * Specific TopicPartitions to consume. Only one of "assign", "subscribe" or "subscribePattern" options can be specified for Kafka source. */ - def getKafaMetadata() = { + def getKafkaMetadata() = { try { val kafkaMetadataRDD = spark.sparkContext.cassandraTable(namespace, kafkaMetadata) From a77d2eb01ec0c28d0c773bd287ed22dfac7d175a Mon Sep 17 00:00:00 2001 From: Rania ZYANE Date: Tue, 7 Feb 2023 16:06:19 +0100 Subject: [PATCH 2/2] rename kafkaMetadata getter --- src/main/scala/Main.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/Main.scala b/src/main/scala/Main.scala index cf5784a..9f5dbd0 100644 --- a/src/main/scala/Main.scala +++ b/src/main/scala/Main.scala @@ -27,7 +27,7 @@ object Main { KafkaSink.writeStream(streamDS) //Finally read it from kafka, in case checkpointing is not available we read last offsets saved from Cassandra - val (startingOption, partitionsAndOffsets) = CassandraDriver.getKafaMetadata() + val (startingOption, partitionsAndOffsets) = CassandraDriver.getKafkaMetadata() val kafkaInputDS = KafkaSource.read(startingOption, partitionsAndOffsets) //Just debugging Kafka source into our console