-
Notifications
You must be signed in to change notification settings - Fork 78
BlockRDD[14] at command122480944 while reading the data in Spark Scala [Databricks] #135
Description
Hello, Im trying to read data from rabbitmq in databricks. below is the code.
def onStart() {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
override def run() { receive() }
}.start()
}
def onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
}
/** Create a socket connection and receive data until receiver is stopped */
def receive() {
var socket: Socket = null
var userInput: String = null
try {
var batchInterval = Seconds(5)
var ssc = new StreamingContext(sc, batchInterval)
val host = ""
val port = ""
val queueName = ""
val vHost = ""
val userName = ""
val password = ""
val receiverStream = RabbitMQUtils.createStream(ssc, Map(
"host" -> "host",
"port" -> "port",
"queueName" -> "queueName",
"vHost" -> "vHost",
"userName" -> "userName",
"password" -> "password"
))
receiverStream .start()
val lines = ssc.receiverStream(new CustomReceiver(host, port.toInt))
lines .foreachRDD(rdd => {
if (!rdd.isEmpty()) {
val count = rdd.count()
// Do something with this message
println(s"EVENTS COUNT : \t $count")
totalEvents.add(count)
//rdd.collect().sortBy(event => event.toInt).foreach(event => print(s"$event, "))
} else println("RDD is empty")
println(s"TOTAL EVENTS : \t $totalEvents")
})
lines.print()
ssc.start()
ssc.awaitTermination()
I either get empty output or BlockRDD[] issue. Did anyone face similar kind of issue. Could someone please assist.
Thank you