diff --git a/src/main/scala/Main.scala b/src/main/scala/Main.scala index cf5784a..9f5dbd0 100644 --- a/src/main/scala/Main.scala +++ b/src/main/scala/Main.scala @@ -27,7 +27,7 @@ object Main { KafkaSink.writeStream(streamDS) //Finally read it from kafka, in case checkpointing is not available we read last offsets saved from Cassandra - val (startingOption, partitionsAndOffsets) = CassandraDriver.getKafaMetadata() + val (startingOption, partitionsAndOffsets) = CassandraDriver.getKafkaMetadata() val kafkaInputDS = KafkaSource.read(startingOption, partitionsAndOffsets) //Just debugging Kafka source into our console diff --git a/src/main/scala/cassandra/CassandraDriver.scala b/src/main/scala/cassandra/CassandraDriver.scala index 90dfa80..95abf81 100644 --- a/src/main/scala/cassandra/CassandraDriver.scala +++ b/src/main/scala/cassandra/CassandraDriver.scala @@ -73,7 +73,7 @@ object CassandraDriver extends LazyLogger { * assign json string {"topicA":[0,1],"topicB":[2,4]} * Specific TopicPartitions to consume. Only one of "assign", "subscribe" or "subscribePattern" options can be specified for Kafka source. */ - def getKafaMetadata() = { + def getKafkaMetadata() = { try { val kafkaMetadataRDD = spark.sparkContext.cassandraTable(namespace, kafkaMetadata)