Skip to content

Commit

Permalink
Remove rediscala library and replace it by a lettuce implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
GhaziTriki committed Dec 5, 2018
1 parent 7e736d4 commit fd959f9
Show file tree
Hide file tree
Showing 40 changed files with 388 additions and 513 deletions.
6 changes: 0 additions & 6 deletions akka-bbb-apps/project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ object Dependencies {
val lang = "3.8.1"
val codec = "1.11"

// Redis
val redisScala = "1.8.0"

// BigBlueButton
val bbbCommons = "0.0.20-SNAPSHOT"

Expand All @@ -52,8 +49,6 @@ object Dependencies {

val apacheLang = "org.apache.commons" % "commons-lang3" % Versions.lang

val redisScala = "com.github.etaty" % "rediscala_2.12" % Versions.redisScala

val bbbCommons = "org.bigbluebutton" % "bbb-common-message_2.12" % Versions.bbbCommons
}

Expand Down Expand Up @@ -84,6 +79,5 @@ object Dependencies {
Compile.commonsCodec,
Compile.sprayJson,
Compile.apacheLang,
Compile.redisScala,
Compile.bbbCommons) ++ testing
}
4 changes: 2 additions & 2 deletions akka-bbb-apps/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "DEBUG"

rediscala-publish-worker-dispatcher {
redis-publish-worker-dispatcher {
mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 512
}

rediscala-subscriber-worker-dispatcher {
redis-subscriber-worker-dispatcher {
mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
Expand Down
6 changes: 3 additions & 3 deletions akka-bbb-apps/src/main/scala/org/bigbluebutton/Boot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import org.bigbluebutton.endpoint.redis.AppsRedisSubscriberActor
import org.bigbluebutton.endpoint.redis.RedisRecorderActor

import akka.actor.ActorSystem
import akka.event.Logging
import org.bigbluebutton.common2.redis.MessageSender
import akka.event.Logging
import org.bigbluebutton.common2.redis.MessageSender
import org.bigbluebutton.common2.bus.IncomingJsonMessageBus

object Boot extends App with SystemConfiguration {
Expand Down Expand Up @@ -52,5 +52,5 @@ object Boot extends App with SystemConfiguration {
val redisMessageHandlerActor = system.actorOf(ReceivedJsonMsgHandlerActor.props(bbbMsgBus, incomingJsonMessageBus))
incomingJsonMessageBus.subscribe(redisMessageHandlerActor, toAkkaAppsJsonChannel)

val redisSubscriberActor = system.actorOf(AppsRedisSubscriberActor.props(incomingJsonMessageBus), "redis-subscriber")
val redisSubscriberActor = system.actorOf(AppsRedisSubscriberActor.props(system, incomingJsonMessageBus), "redis-subscriber")
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.text.SimpleDateFormat

import scala.collection.mutable.HashMap
import org.bigbluebutton.core.api.TimestampGenerator
import scala.collection._

trait RecordEvent {
import RecordEvent._
Expand Down Expand Up @@ -70,13 +71,18 @@ trait RecordEvent {
eventMap.put(EVENT, event)
}

// @fixme : not used anymore
/**
* Convert the event into a Map to be recorded.
* @return
*/
final def toMap(): Map[String, String] = {
eventMap.toMap
}

final def toJavaMap(): java.util.Map[java.lang.String, java.lang.String] = {
eventMap.asInstanceOf[java.util.Map[java.lang.String, java.lang.String]]
}
}

object RecordEvent extends RecordEvent {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,65 +1,32 @@
package org.bigbluebutton.endpoint.redis

import java.net.InetSocketAddress

import org.bigbluebutton.SystemConfiguration
import org.bigbluebutton.common2.bus.IncomingJsonMessage
import org.bigbluebutton.common2.bus.IncomingJsonMessageBus
import org.bigbluebutton.common2.bus.ReceivedJsonMessage
import org.bigbluebutton.common2.redis.RedisAppSubscriberActor
import org.bigbluebutton.common2.redis.RedisConfiguration
import org.bigbluebutton.common2.redis.RedisSubscriber
import org.bigbluebutton.common2.redis.{ RedisSubscriber, RedisSubscriberProvider }

import akka.actor.ActorSystem
import akka.actor.Props
import redis.actors.RedisSubscriberActor
import redis.api.pubsub.Message
import redis.api.servers.ClientSetname
import java.io.StringWriter
import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy.Resume
import scala.concurrent.duration.DurationInt
import java.io.PrintWriter

object AppsRedisSubscriberActor extends RedisSubscriber with RedisConfiguration {
object AppsRedisSubscriberActor extends RedisSubscriber {

val channels = Seq(toAkkaAppsRedisChannel, fromVoiceConfRedisChannel)
val patterns = Seq("bigbluebutton:to-bbb-apps:*", "bigbluebutton:from-voice-conf:*", "bigbluebutton:from-bbb-transcode:*")

def props(jsonMsgBus: IncomingJsonMessageBus): Props =
Props(classOf[AppsRedisSubscriberActor], jsonMsgBus,
def props(system: ActorSystem, jsonMsgBus: IncomingJsonMessageBus): Props =
Props(
classOf[AppsRedisSubscriberActor],
system, jsonMsgBus,
redisHost, redisPort,
channels, patterns).withDispatcher("akka.rediscala-subscriber-worker-dispatcher")
channels, patterns).withDispatcher("akka.redis-subscriber-worker-dispatcher")
}

class AppsRedisSubscriberActor(jsonMsgBus: IncomingJsonMessageBus, redisHost: String,
redisPort: Int,
channels: Seq[String] = Nil, patterns: Seq[String] = Nil)
extends RedisSubscriberActor(
new InetSocketAddress(redisHost, redisPort),
channels, patterns, onConnectStatus = connected => { println(s"connected: $connected") })
with SystemConfiguration
with RedisAppSubscriberActor {

override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
case e: Exception => {
val sw: StringWriter = new StringWriter()
sw.write("An exception has been thrown on AppsRedisSubscriberActor, exception message [" + e.getMessage() + "] (full stacktrace below)\n")
e.printStackTrace(new PrintWriter(sw))
log.error(sw.toString())
Resume
}
}

// Set the name of this client to be able to distinguish when doing
// CLIENT LIST on redis-cli
write(ClientSetname("BbbAppsAkkaSub").encodedRequest)

def onMessage(message: Message) {
if (message.channel == toAkkaAppsRedisChannel || message.channel == fromVoiceConfRedisChannel) {
val receivedJsonMessage = new ReceivedJsonMessage(message.channel, message.data.utf8String)
//log.debug(s"RECEIVED:\n [${receivedJsonMessage.channel}] \n ${receivedJsonMessage.data} \n")
jsonMsgBus.publish(IncomingJsonMessage(toAkkaAppsJsonChannel, receivedJsonMessage))
}
}
class AppsRedisSubscriberActor(
system: ActorSystem,
jsonMsgBus: IncomingJsonMessageBus,
redisHost: String, redisPort: Int,
channels: Seq[String] = Nil, patterns: Seq[String] = Nil)
extends RedisSubscriberProvider(system, "BbbAppsAkkaSub", channels, patterns, jsonMsgBus) with SystemConfiguration {

addListener(toAkkaAppsJsonChannel)
subscribe()
}
Loading

0 comments on commit fd959f9

Please sign in to comment.