From fd959f9e14fe42015bcf8d111a0d361063d9cee8 Mon Sep 17 00:00:00 2001 From: Ghazi Triki Date: Wed, 5 Dec 2018 20:21:06 +0100 Subject: [PATCH] Remove rediscala library and replace it by a lettuce implementation. --- akka-bbb-apps/project/Dependencies.scala | 6 - .../src/main/resources/application.conf | 4 +- .../main/scala/org/bigbluebutton/Boot.scala | 6 +- .../core/record/events/RecordEvent.scala | 6 + .../redis/AkkaAppsRedisSubscriberActor.scala | 65 +++------- .../endpoint/redis/RedisRecorderActor.scala | 111 ++++++++---------- akka-bbb-fsesl/build.sbt | 1 - akka-bbb-fsesl/project/Dependencies.scala | 8 +- .../src/main/resources/application.conf | 74 ++++++------ .../main/scala/org/bigbluebutton/Boot.scala | 8 +- .../redis/FSESLRedisSubscriberActor.scala | 65 +++------- .../freeswitch/RxJsonMsgHdlrActor.scala | 14 ++- .../freeswitch/bus/InJsonMsgBus.scala | 31 ----- bbb-apps-common/project/Dependencies.scala | 8 +- .../client/ClientGWApplication.scala | 22 ++-- .../redis/Red5AppsRedisSubscriberActor.scala | 85 ++++++-------- bbb-common-message/project/Dependencies.scala | 5 +- .../common2/redis/RedisAwareCommunicator.java | 5 + .../common2/redis/RedisStorageService.java | 18 +-- .../redis/RedisAppSubscriberActor.scala | 18 --- .../common2/redis/RedisClientProvider.scala | 15 +++ .../common2/redis/RedisConfiguration.scala | 1 + .../common2/redis/RedisPublisher.scala | 19 +-- .../common2/redis/RedisStorageProvider.scala | 13 ++ .../common2/redis/RedisSubscriber.scala | 2 +- .../redis/RedisSubscriberProvider.scala | 63 ++++++++++ bbb-common-web/project/Dependencies.scala | 9 +- .../bigbluebutton/api2/BbbWebApiGWApp.scala | 7 +- .../redis/RedisDataStorageActor.scala | 28 ++--- .../redis/WebRedisSubscriberActor.scala | 91 ++++++-------- bbb-fsesl-client/build.sbt | 2 +- bbb-screenshare/app/build.sbt | 1 - bbb-screenshare/app/deploy.sh | 3 +- .../app/project/Dependencies.scala | 3 - .../screenshare/ScreenShareApplication.scala | 6 +- .../ScreenshareRedisSubscriberActor.scala | 62 +++------- .../webapp/WEB-INF/classes/application.conf | 4 +- .../WEB-INF/classes/screenshare-app.conf | 4 +- .../webapp/WEB-INF/classes/application.conf | 4 +- .../grails-app/conf/application.conf | 4 +- 40 files changed, 388 insertions(+), 513 deletions(-) delete mode 100755 akka-bbb-fsesl/src/main/scala/org/bigbluebutton/freeswitch/bus/InJsonMsgBus.scala delete mode 100644 bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisAppSubscriberActor.scala create mode 100644 bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisClientProvider.scala create mode 100644 bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisStorageProvider.scala create mode 100644 bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisSubscriberProvider.scala diff --git a/akka-bbb-apps/project/Dependencies.scala b/akka-bbb-apps/project/Dependencies.scala index c56eb9713d0e..005bd86a3736 100644 --- a/akka-bbb-apps/project/Dependencies.scala +++ b/akka-bbb-apps/project/Dependencies.scala @@ -24,9 +24,6 @@ object Dependencies { val lang = "3.8.1" val codec = "1.11" - // Redis - val redisScala = "1.8.0" - // BigBlueButton val bbbCommons = "0.0.20-SNAPSHOT" @@ -52,8 +49,6 @@ object Dependencies { val apacheLang = "org.apache.commons" % "commons-lang3" % Versions.lang - val redisScala = "com.github.etaty" % "rediscala_2.12" % Versions.redisScala - val bbbCommons = "org.bigbluebutton" % "bbb-common-message_2.12" % Versions.bbbCommons } @@ -84,6 +79,5 @@ object Dependencies { Compile.commonsCodec, Compile.sprayJson, Compile.apacheLang, - Compile.redisScala, Compile.bbbCommons) ++ testing } \ No newline at end of file diff --git a/akka-bbb-apps/src/main/resources/application.conf b/akka-bbb-apps/src/main/resources/application.conf index e6904f7e9d99..591f22a88f5e 100755 --- a/akka-bbb-apps/src/main/resources/application.conf +++ b/akka-bbb-apps/src/main/resources/application.conf @@ -10,7 +10,7 @@ akka { loggers = ["akka.event.slf4j.Slf4jLogger"] loglevel = "DEBUG" - rediscala-publish-worker-dispatcher { + redis-publish-worker-dispatcher { mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox" # Throughput defines the maximum number of messages to be # processed per actor before the thread jumps to the next actor. @@ -18,7 +18,7 @@ akka { throughput = 512 } - rediscala-subscriber-worker-dispatcher { + redis-subscriber-worker-dispatcher { mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox" # Throughput defines the maximum number of messages to be # processed per actor before the thread jumps to the next actor. diff --git a/akka-bbb-apps/src/main/scala/org/bigbluebutton/Boot.scala b/akka-bbb-apps/src/main/scala/org/bigbluebutton/Boot.scala index 8df346563773..82b544784216 100755 --- a/akka-bbb-apps/src/main/scala/org/bigbluebutton/Boot.scala +++ b/akka-bbb-apps/src/main/scala/org/bigbluebutton/Boot.scala @@ -10,8 +10,8 @@ import org.bigbluebutton.endpoint.redis.AppsRedisSubscriberActor import org.bigbluebutton.endpoint.redis.RedisRecorderActor import akka.actor.ActorSystem -import akka.event.Logging -import org.bigbluebutton.common2.redis.MessageSender +import akka.event.Logging +import org.bigbluebutton.common2.redis.MessageSender import org.bigbluebutton.common2.bus.IncomingJsonMessageBus object Boot extends App with SystemConfiguration { @@ -52,5 +52,5 @@ object Boot extends App with SystemConfiguration { val redisMessageHandlerActor = system.actorOf(ReceivedJsonMsgHandlerActor.props(bbbMsgBus, incomingJsonMessageBus)) incomingJsonMessageBus.subscribe(redisMessageHandlerActor, toAkkaAppsJsonChannel) - val redisSubscriberActor = system.actorOf(AppsRedisSubscriberActor.props(incomingJsonMessageBus), "redis-subscriber") + val redisSubscriberActor = system.actorOf(AppsRedisSubscriberActor.props(system, incomingJsonMessageBus), "redis-subscriber") } diff --git a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/record/events/RecordEvent.scala b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/record/events/RecordEvent.scala index 3dfb958767ff..37bb30d7eb6e 100755 --- a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/record/events/RecordEvent.scala +++ b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/record/events/RecordEvent.scala @@ -23,6 +23,7 @@ import java.text.SimpleDateFormat import scala.collection.mutable.HashMap import org.bigbluebutton.core.api.TimestampGenerator +import scala.collection._ trait RecordEvent { import RecordEvent._ @@ -70,6 +71,7 @@ trait RecordEvent { eventMap.put(EVENT, event) } + // @fixme : not used anymore /** * Convert the event into a Map to be recorded. * @return @@ -77,6 +79,10 @@ trait RecordEvent { final def toMap(): Map[String, String] = { eventMap.toMap } + + final def toJavaMap(): java.util.Map[java.lang.String, java.lang.String] = { + eventMap.asInstanceOf[java.util.Map[java.lang.String, java.lang.String]] + } } object RecordEvent extends RecordEvent { diff --git a/akka-bbb-apps/src/main/scala/org/bigbluebutton/endpoint/redis/AkkaAppsRedisSubscriberActor.scala b/akka-bbb-apps/src/main/scala/org/bigbluebutton/endpoint/redis/AkkaAppsRedisSubscriberActor.scala index c0ff9efed4b8..8e8ba2ebc6cd 100755 --- a/akka-bbb-apps/src/main/scala/org/bigbluebutton/endpoint/redis/AkkaAppsRedisSubscriberActor.scala +++ b/akka-bbb-apps/src/main/scala/org/bigbluebutton/endpoint/redis/AkkaAppsRedisSubscriberActor.scala @@ -1,65 +1,32 @@ package org.bigbluebutton.endpoint.redis -import java.net.InetSocketAddress - import org.bigbluebutton.SystemConfiguration -import org.bigbluebutton.common2.bus.IncomingJsonMessage import org.bigbluebutton.common2.bus.IncomingJsonMessageBus -import org.bigbluebutton.common2.bus.ReceivedJsonMessage -import org.bigbluebutton.common2.redis.RedisAppSubscriberActor -import org.bigbluebutton.common2.redis.RedisConfiguration -import org.bigbluebutton.common2.redis.RedisSubscriber +import org.bigbluebutton.common2.redis.{ RedisSubscriber, RedisSubscriberProvider } +import akka.actor.ActorSystem import akka.actor.Props -import redis.actors.RedisSubscriberActor -import redis.api.pubsub.Message -import redis.api.servers.ClientSetname -import java.io.StringWriter -import akka.actor.OneForOneStrategy -import akka.actor.SupervisorStrategy.Resume -import scala.concurrent.duration.DurationInt -import java.io.PrintWriter -object AppsRedisSubscriberActor extends RedisSubscriber with RedisConfiguration { +object AppsRedisSubscriberActor extends RedisSubscriber { val channels = Seq(toAkkaAppsRedisChannel, fromVoiceConfRedisChannel) val patterns = Seq("bigbluebutton:to-bbb-apps:*", "bigbluebutton:from-voice-conf:*", "bigbluebutton:from-bbb-transcode:*") - def props(jsonMsgBus: IncomingJsonMessageBus): Props = - Props(classOf[AppsRedisSubscriberActor], jsonMsgBus, + def props(system: ActorSystem, jsonMsgBus: IncomingJsonMessageBus): Props = + Props( + classOf[AppsRedisSubscriberActor], + system, jsonMsgBus, redisHost, redisPort, - channels, patterns).withDispatcher("akka.rediscala-subscriber-worker-dispatcher") + channels, patterns).withDispatcher("akka.redis-subscriber-worker-dispatcher") } -class AppsRedisSubscriberActor(jsonMsgBus: IncomingJsonMessageBus, redisHost: String, - redisPort: Int, - channels: Seq[String] = Nil, patterns: Seq[String] = Nil) - extends RedisSubscriberActor( - new InetSocketAddress(redisHost, redisPort), - channels, patterns, onConnectStatus = connected => { println(s"connected: $connected") }) - with SystemConfiguration - with RedisAppSubscriberActor { - - override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { - case e: Exception => { - val sw: StringWriter = new StringWriter() - sw.write("An exception has been thrown on AppsRedisSubscriberActor, exception message [" + e.getMessage() + "] (full stacktrace below)\n") - e.printStackTrace(new PrintWriter(sw)) - log.error(sw.toString()) - Resume - } - } - - // Set the name of this client to be able to distinguish when doing - // CLIENT LIST on redis-cli - write(ClientSetname("BbbAppsAkkaSub").encodedRequest) - - def onMessage(message: Message) { - if (message.channel == toAkkaAppsRedisChannel || message.channel == fromVoiceConfRedisChannel) { - val receivedJsonMessage = new ReceivedJsonMessage(message.channel, message.data.utf8String) - //log.debug(s"RECEIVED:\n [${receivedJsonMessage.channel}] \n ${receivedJsonMessage.data} \n") - jsonMsgBus.publish(IncomingJsonMessage(toAkkaAppsJsonChannel, receivedJsonMessage)) - } - } +class AppsRedisSubscriberActor( + system: ActorSystem, + jsonMsgBus: IncomingJsonMessageBus, + redisHost: String, redisPort: Int, + channels: Seq[String] = Nil, patterns: Seq[String] = Nil) + extends RedisSubscriberProvider(system, "BbbAppsAkkaSub", channels, patterns, jsonMsgBus) with SystemConfiguration { + addListener(toAkkaAppsJsonChannel) + subscribe() } diff --git a/akka-bbb-apps/src/main/scala/org/bigbluebutton/endpoint/redis/RedisRecorderActor.scala b/akka-bbb-apps/src/main/scala/org/bigbluebutton/endpoint/redis/RedisRecorderActor.scala index 01f6b594523b..7e2b41d352fa 100755 --- a/akka-bbb-apps/src/main/scala/org/bigbluebutton/endpoint/redis/RedisRecorderActor.scala +++ b/akka-bbb-apps/src/main/scala/org/bigbluebutton/endpoint/redis/RedisRecorderActor.scala @@ -1,40 +1,29 @@ package org.bigbluebutton.endpoint.redis -import akka.actor.{ Actor, ActorLogging, ActorSystem, Props } -import org.bigbluebutton.SystemConfiguration -import redis.RedisClient -import scala.concurrent.ExecutionContext.Implicits.global import scala.collection.immutable.StringOps + +import org.bigbluebutton.SystemConfiguration import org.bigbluebutton.common2.msgs._ -import org.bigbluebutton.core.record.events._ +import org.bigbluebutton.common2.redis.RedisStorageProvider import org.bigbluebutton.core.apps.groupchats.GroupChatApp +import org.bigbluebutton.core.record.events._ + +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.actor.ActorSystem +import akka.actor.Props object RedisRecorderActor { def props(system: ActorSystem): Props = Props(classOf[RedisRecorderActor], system) } -class RedisRecorderActor(val system: ActorSystem) - extends SystemConfiguration +class RedisRecorderActor(system: ActorSystem) + extends RedisStorageProvider(system, "BbbAppsAkkaRecorder") + with SystemConfiguration with Actor with ActorLogging { - val redis = RedisClient(host = redisHost, password = Some(redisPassword), port= redisPort)(system) - - // Set the name of this client to be able to distinguish when doing - // CLIENT LIST on redis-cli - redis.clientSetname("BbbAppsAkkaRecorder") - - val COLON = ":" - - private def record(session: String, message: collection.immutable.Map[String, String]): Unit = { - for { - msgid <- redis.incr("global:nextRecordedMsgId") - key = "recording" + COLON + session + COLON + msgid - _ <- redis.hmset(key.mkString, message) - _ <- redis.expire(key.mkString, keysExpiresInSec) - key2 = "meeting" + COLON + session + COLON + "recordings" - _ <- redis.rpush(key2.mkString, msgid.toString) - result <- redis.expire(key2.mkString, keysExpiresInSec) - } yield result + private def record(session: String, message: java.util.Map[java.lang.String, java.lang.String]): Unit = { + redis.recordAndExpire(session, message) } def receive = { @@ -121,7 +110,7 @@ class RedisRecorderActor(val system: ActorSystem) ev.setMessage(msg.body.msg.message) ev.setColor(msg.body.msg.color) - record(msg.header.meetingId, ev.toMap) + record(msg.header.meetingId, ev.toJavaMap) } } @@ -129,7 +118,7 @@ class RedisRecorderActor(val system: ActorSystem) val ev = new ClearPublicChatRecordEvent() ev.setMeetingId(msg.header.meetingId) - record(msg.header.meetingId, ev.toMap) + record(msg.header.meetingId, ev.toJavaMap) } private def handlePresentationConversionCompletedEvtMsg(msg: PresentationConversionCompletedEvtMsg) { @@ -139,7 +128,7 @@ class RedisRecorderActor(val system: ActorSystem) ev.setPresentationName(msg.body.presentation.id) ev.setOriginalFilename(msg.body.presentation.name) - record(msg.header.meetingId, ev.toMap) + record(msg.header.meetingId, ev.toJavaMap) if (msg.body.presentation.current) { recordSharePresentationEvent(msg.header.meetingId, msg.body.podId, msg.body.presentation.id) @@ -154,7 +143,7 @@ class RedisRecorderActor(val system: ActorSystem) ev.setSlide(getPageNum(msg.body.pageId)) ev.setId(msg.body.pageId) - record(msg.header.meetingId, ev.toMap) + record(msg.header.meetingId, ev.toJavaMap) } private def handleResizeAndMovePageEvtMsg(msg: ResizeAndMovePageEvtMsg) { @@ -168,7 +157,7 @@ class RedisRecorderActor(val system: ActorSystem) ev.setWidthRatio(msg.body.widthRatio) ev.setHeightRatio(msg.body.heightRatio) - record(msg.header.meetingId, ev.toMap) + record(msg.header.meetingId, ev.toJavaMap) } private def handleRemovePresentationEvtMsg(msg: RemovePresentationEvtMsg) { @@ -177,7 +166,7 @@ class RedisRecorderActor(val system: ActorSystem) ev.setPodId(msg.body.podId) ev.setPresentationName(msg.body.presentationId) - record(msg.header.meetingId, ev.toMap) + record(msg.header.meetingId, ev.toJavaMap) } private def handleSetPresentationDownloadableEvtMsg(msg: SetPresentationDownloadableEvtMsg) { @@ -187,7 +176,7 @@ class RedisRecorderActor(val system: ActorSystem) ev.setPresentationName(msg.body.presentationId) ev.setDownloadable(msg.body.downloadable) - record(msg.header.meetingId, ev.toMap) + record(msg.header.meetingId, ev.toJavaMap) } private def handleSetCurrentPresentationEvtMsg(msg: SetCurrentPresentationEvtMsg) { @@ -200,7 +189,7 @@ class RedisRecorderActor(val system: ActorSystem) ev.setPodId(msg.body.podId) ev.setCurrentPresenter(msg.body.currentPresenterId) - record(msg.header.meetingId, ev.toMap) + record(msg.header.meetingId, ev.toJavaMap) } private def handleRemovePresentationPodEvtMsg(msg: RemovePresentationPodEvtMsg) { @@ -208,7 +197,7 @@ class RedisRecorderActor(val system: ActorSystem) ev.setMeetingId(msg.header.meetingId) ev.setPodId(msg.body.podId) - record(msg.header.meetingId, ev.toMap) + record(msg.header.meetingId, ev.toJavaMap) } private def handleSetPresenterInPodRespMsg(msg: SetPresenterInPodRespMsg) { @@ -217,7 +206,7 @@ class RedisRecorderActor(val system: ActorSystem) ev.setPodId(msg.body.podId) ev.setNextPresenterId(msg.body.nextPresenterId) - record(msg.header.meetingId, ev.toMap) + record(msg.header.meetingId, ev.toJavaMap) } private def recordSharePresentationEvent(meetingId: String, podId: String, presentationId: String) { @@ -227,7 +216,7 @@ class RedisRecorderActor(val system: ActorSystem) ev.setPresentationName(presentationId) ev.setShare(true) - record(meetingId, ev.toMap) + record(meetingId, ev.toJavaMap) } private def getPageNum(id: String): Integer = { @@ -266,7 +255,7 @@ class RedisRecorderActor(val system: ActorSystem) ev.setPosition(annotation.position) ev.addAnnotation(annotation.annotationInfo) - record(msg.header.meetingId, ev.toMap) + record(msg.header.meetingId, ev.toJavaMap) } private def handleSendCursorPositionEvtMsg(msg: SendCursorPositionEvtMsg) { @@ -279,7 +268,7 @@ class RedisRecorderActor(val system: ActorSystem) ev.setXPercent(msg.body.xPercent) ev.setYPercent(msg.body.yPercent) - record(msg.header.meetingId, ev.toMap) + record(msg.header.meetingId, ev.toJavaMap) } private def handleClearWhiteboardEvtMsg(msg: ClearWhiteboardEvtMsg) { @@ -291,7 +280,7 @@ class RedisRecorderActor(val system: ActorSystem) ev.setUserId(msg.body.userId) ev.setFullClear(msg.body.fullClear) - record(msg.header.meetingId, ev.toMap) + record(msg.header.meetingId, ev.toJavaMap) } private def handleUndoWhiteboardEvtMsg(msg: UndoWhiteboardEvtMsg) { @@ -302,7 +291,7 @@ class RedisRecorderActor(val system: ActorSystem) ev.setWhiteboardId(msg.body.whiteboardId) ev.setUserId(msg.body.userId) ev.setShapeId(msg.body.annotationId) - record(msg.header.meetingId, ev.toMap) + record(msg.header.meetingId, ev.toJavaMap) } private def handleUserJoinedMeetingEvtMsg(msg: UserJoinedMeetingEvtMsg): Unit = { @@ -313,7 +302,7 @@ class RedisRecorderActor(val system: ActorSystem) ev.setName(msg.body.name) ev.setRole(msg.body.role) - record(msg.header.meetingId, ev.toMap) + record(msg.header.meetingId, ev.toJavaMap) } private def handleUserLeftMeetingEvtMsg(msg: UserLeftMeetingEvtMsg): Unit = { @@ -321,7 +310,7 @@ class RedisRecorderActor(val system: ActorSystem) ev.setMeetingId(msg.header.meetingId) ev.setUserId(msg.body.intId) - record(msg.header.meetingId, ev.toMap) + record(msg.header.meetingId, ev.toJavaMap) } private def handlePresenterAssignedEvtMsg(msg: PresenterAssignedEvtMsg): Unit = { @@ -331,7 +320,7 @@ class RedisRecorderActor(val system: ActorSystem) ev.setName(msg.body.presenterName) ev.setAssignedBy(msg.body.assignedBy) - record(msg.header.meetingId, ev.toMap) + record(msg.header.meetingId, ev.toJavaMap) } private def handleUserEmojiChangedEvtMsg(msg: UserEmojiChangedEvtMsg) { handleUserStatusChange(msg.header.meetingId, msg.body.userId, "emojiStatus", msg.body.emoji) @@ -352,7 +341,7 @@ class RedisRecorderActor(val system: ActorSystem) ev.setStatus(statusName) ev.setValue(statusValue) - record(meetingId, ev.toMap) + record(meetingId, ev.toJavaMap) } private def handleUserJoinedVoiceConfToClientEvtMsg(msg: UserJoinedVoiceConfToClientEvtMsg) { @@ -365,7 +354,7 @@ class RedisRecorderActor(val system: ActorSystem) ev.setMuted(msg.body.muted) ev.setTalking(msg.body.talking) - record(msg.header.meetingId, ev.toMap) + record(msg.header.meetingId, ev.toJavaMap) } private def handleUserLeftVoiceConfToClientEvtMsg(msg: UserLeftVoiceConfToClientEvtMsg) { @@ -374,7 +363,7 @@ class RedisRecorderActor(val system: ActorSystem) ev.setBridge(msg.body.voiceConf) ev.setParticipant(msg.body.intId) - record(msg.header.meetingId, ev.toMap) + record(msg.header.meetingId, ev.toJavaMap) } private def handleUserMutedVoiceEvtMsg(msg: UserMutedVoiceEvtMsg) { @@ -384,7 +373,7 @@ class RedisRecorderActor(val system: ActorSystem) ev.setParticipant(msg.body.intId) ev.setMuted(msg.body.muted) - record(msg.header.meetingId, ev.toMap) + record(msg.header.meetingId, ev.toJavaMap) } private def handleUserTalkingVoiceEvtMsg(msg: UserTalkingVoiceEvtMsg) { @@ -394,7 +383,7 @@ class RedisRecorderActor(val system: ActorSystem) ev.setParticipant(msg.body.intId) ev.setTalking(msg.body.talking) - record(msg.header.meetingId, ev.toMap) + record(msg.header.meetingId, ev.toJavaMap) } private def handleVoiceRecordingStartedEvtMsg(msg: VoiceRecordingStartedEvtMsg) { @@ -404,7 +393,7 @@ class RedisRecorderActor(val system: ActorSystem) ev.setRecordingTimestamp(msg.body.timestamp) ev.setFilename(msg.body.stream) - record(msg.header.meetingId, ev.toMap) + record(msg.header.meetingId, ev.toJavaMap) } private def handleVoiceRecordingStoppedEvtMsg(msg: VoiceRecordingStoppedEvtMsg) { @@ -414,7 +403,7 @@ class RedisRecorderActor(val system: ActorSystem) ev.setRecordingTimestamp(msg.body.timestamp) ev.setFilename(msg.body.stream) - record(msg.header.meetingId, ev.toMap) + record(msg.header.meetingId, ev.toJavaMap) } private def handleEditCaptionHistoryEvtMsg(msg: EditCaptionHistoryEvtMsg) { @@ -426,7 +415,7 @@ class RedisRecorderActor(val system: ActorSystem) ev.setLocaleCode(msg.body.localeCode) ev.setText(msg.body.text) - record(msg.header.meetingId, ev.toMap) + record(msg.header.meetingId, ev.toJavaMap) } private def handleScreenshareRtmpBroadcastStartedEvtMsg(msg: ScreenshareRtmpBroadcastStartedEvtMsg) { @@ -434,7 +423,7 @@ class RedisRecorderActor(val system: ActorSystem) ev.setMeetingId(msg.header.meetingId) ev.setStreamPath(msg.body.stream) - record(msg.header.meetingId, ev.toMap) + record(msg.header.meetingId, ev.toJavaMap) } private def handleScreenshareRtmpBroadcastStoppedEvtMsg(msg: ScreenshareRtmpBroadcastStoppedEvtMsg) { @@ -442,7 +431,7 @@ class RedisRecorderActor(val system: ActorSystem) ev.setMeetingId(msg.header.meetingId) ev.setStreamPath(msg.body.stream) - record(msg.header.meetingId, ev.toMap) + record(msg.header.meetingId, ev.toJavaMap) } /* @@ -462,7 +451,7 @@ class RedisRecorderActor(val system: ActorSystem) ev.setUserId(msg.body.setBy) ev.setRecordingStatus(msg.body.recording) - record(msg.header.meetingId, ev.toMap) + record(msg.header.meetingId, ev.toJavaMap) } private def handleRecordStatusResetSysMsg(msg: RecordStatusResetSysMsg) { @@ -471,7 +460,7 @@ class RedisRecorderActor(val system: ActorSystem) ev.setUserId(msg.body.setBy) ev.setRecordingStatus(msg.body.recording) - record(msg.header.meetingId, ev.toMap) + record(msg.header.meetingId, ev.toJavaMap) } private def handleWebcamsOnlyForModeratorChangedEvtMsg(msg: WebcamsOnlyForModeratorChangedEvtMsg) { @@ -480,14 +469,14 @@ class RedisRecorderActor(val system: ActorSystem) ev.setUserId(msg.body.setBy) ev.setWebcamsOnlyForModerator(msg.body.webcamsOnlyForModerator) - record(msg.header.meetingId, ev.toMap) + record(msg.header.meetingId, ev.toJavaMap) } private def handleEndAndKickAllSysMsg(msg: EndAndKickAllSysMsg): Unit = { val ev = new EndAndKickAllRecordEvent() ev.setMeetingId(msg.header.meetingId) - record(msg.header.meetingId, ev.toMap) + record(msg.header.meetingId, ev.toJavaMap) } private def handleRecordingChapterBreakSysMsg(msg: RecordingChapterBreakSysMsg): Unit = { @@ -495,7 +484,7 @@ class RedisRecorderActor(val system: ActorSystem) ev.setMeetingId(msg.header.meetingId) ev.setChapterBreakTimestamp(msg.body.timestamp) - record(msg.header.meetingId, ev.toMap) + record(msg.header.meetingId, ev.toJavaMap) } private def handlePollStartedEvtMsg(msg: PollStartedEvtMsg): Unit = { @@ -503,7 +492,7 @@ class RedisRecorderActor(val system: ActorSystem) ev.setPollId(msg.body.pollId) ev.setAnswers(msg.body.poll.answers) - record(msg.header.meetingId, ev.toMap) + record(msg.header.meetingId, ev.toJavaMap) } private def handleUserRespondedToPollRecordMsg(msg: UserRespondedToPollRecordMsg): Unit = { @@ -512,7 +501,7 @@ class RedisRecorderActor(val system: ActorSystem) ev.setUserId(msg.header.userId) ev.setAnswerId(msg.body.answerId) - record(msg.header.meetingId, ev.toMap) + record(msg.header.meetingId, ev.toJavaMap) } private def handlePollStoppedEvtMsg(msg: PollStoppedEvtMsg): Unit = { @@ -527,6 +516,6 @@ class RedisRecorderActor(val system: ActorSystem) val ev = new PollStoppedRecordEvent() ev.setPollId(pollId) - record(meetingId, ev.toMap) + record(meetingId, ev.toJavaMap) } } diff --git a/akka-bbb-fsesl/build.sbt b/akka-bbb-fsesl/build.sbt index 2a1d6b363af5..020426a651d3 100755 --- a/akka-bbb-fsesl/build.sbt +++ b/akka-bbb-fsesl/build.sbt @@ -30,7 +30,6 @@ val compileSettings = Seq( resolvers ++= Seq( "spray repo" at "http://repo.spray.io/", - "rediscala" at "http://dl.bintray.com/etaty/maven", "blindside-repos" at "http://blindside.googlecode.com/svn/repository/" ) diff --git a/akka-bbb-fsesl/project/Dependencies.scala b/akka-bbb-fsesl/project/Dependencies.scala index 0a4d0f8c6eaa..6eab6d49d1ae 100644 --- a/akka-bbb-fsesl/project/Dependencies.scala +++ b/akka-bbb-fsesl/project/Dependencies.scala @@ -19,12 +19,9 @@ object Dependencies { val lang = "3.8.1" val codec = "1.11" - // Redis - val redisScala = "1.8.0" - // BigBlueButton val bbbCommons = "0.0.20-SNAPSHOT" - val bbbFsesl = "0.0.7" + val bbbFsesl = "0.0.7-SNAPSHOT" // Test val scalaTest = "3.0.5" @@ -44,8 +41,6 @@ object Dependencies { val apacheLang = "org.apache.commons" % "commons-lang3" % Versions.lang - val redisScala = "com.github.etaty" % "rediscala_2.12" % Versions.redisScala - val bbbCommons = "org.bigbluebutton" % "bbb-common-message_2.12" % Versions.bbbCommons val bbbFseslClient = "org.bigbluebutton" % "bbb-fsesl-client" % Versions.bbbFsesl } @@ -71,7 +66,6 @@ object Dependencies { Compile.logback, Compile.commonsCodec, Compile.apacheLang, - Compile.redisScala, Compile.bbbCommons, Compile.bbbFseslClient) ++ testing } diff --git a/akka-bbb-fsesl/src/main/resources/application.conf b/akka-bbb-fsesl/src/main/resources/application.conf index 37ac46cfee4c..ac1acc3b350d 100755 --- a/akka-bbb-fsesl/src/main/resources/application.conf +++ b/akka-bbb-fsesl/src/main/resources/application.conf @@ -1,37 +1,37 @@ -akka { - actor { - debug { - receive = on - } - } - loggers = ["akka.event.slf4j.Slf4jLogger"] - loglevel = "DEBUG" - stdout-loglevel = "DEBUG" - - rediscala-subscriber-worker-dispatcher { - mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox" - # Throughput defines the maximum number of messages to be - # processed per actor before the thread jumps to the next actor. - # Set to 1 for as fair as possible. - throughput = 512 - } -} - - -freeswitch { - esl { - host="127.0.0.1" - port=8021 - password="ClueCon" - } - conf { - profile="cdquality" - } -} - -redis { - host="127.0.0.1" - port=6379 - password="" -} - +akka { + actor { + debug { + receive = on + } + } + loggers = ["akka.event.slf4j.Slf4jLogger"] + loglevel = "DEBUG" + stdout-loglevel = "DEBUG" + + redis-subscriber-worker-dispatcher { + mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox" + # Throughput defines the maximum number of messages to be + # processed per actor before the thread jumps to the next actor. + # Set to 1 for as fair as possible. + throughput = 512 + } +} + + +freeswitch { + esl { + host="127.0.0.1" + port=8021 + password="ClueCon" + } + conf { + profile="cdquality" + } +} + +redis { + host="127.0.0.1" + port=6379 + password="" +} + diff --git a/akka-bbb-fsesl/src/main/scala/org/bigbluebutton/Boot.scala b/akka-bbb-fsesl/src/main/scala/org/bigbluebutton/Boot.scala index 1ae4cbed2e28..aad91e4ddd34 100755 --- a/akka-bbb-fsesl/src/main/scala/org/bigbluebutton/Boot.scala +++ b/akka-bbb-fsesl/src/main/scala/org/bigbluebutton/Boot.scala @@ -1,15 +1,15 @@ package org.bigbluebutton -import akka.actor.{ ActorSystem } - +import org.bigbluebutton.common2.bus.IncomingJsonMessageBus import org.bigbluebutton.common2.redis.RedisPublisher import org.bigbluebutton.endpoint.redis.FSESLRedisSubscriberActor import org.bigbluebutton.freeswitch.{ RxJsonMsgHdlrActor, VoiceConferenceService } -import org.bigbluebutton.freeswitch.bus.InsonMsgBus import org.bigbluebutton.freeswitch.voice.FreeswitchConferenceEventListener import org.bigbluebutton.freeswitch.voice.freeswitch.{ ConnectionManager, ESLEventListener, FreeswitchApplication } import org.freeswitch.esl.client.manager.DefaultManagerConnection +import akka.actor.ActorSystem + object Boot extends App with SystemConfiguration { implicit val system = ActorSystem("bigbluebutton-fsesl-system") @@ -31,7 +31,7 @@ object Boot extends App with SystemConfiguration { val fsApplication = new FreeswitchApplication(connManager, fsProfile) fsApplication.start() - val inJsonMsgBus = new InsonMsgBus + val inJsonMsgBus = new IncomingJsonMessageBus val redisMessageHandlerActor = system.actorOf(RxJsonMsgHdlrActor.props(fsApplication)) inJsonMsgBus.subscribe(redisMessageHandlerActor, toFsAppsJsonChannel) diff --git a/akka-bbb-fsesl/src/main/scala/org/bigbluebutton/endpoint/redis/FSESLRedisSubscriberActor.scala b/akka-bbb-fsesl/src/main/scala/org/bigbluebutton/endpoint/redis/FSESLRedisSubscriberActor.scala index c021f739b7e3..016bc9890b42 100755 --- a/akka-bbb-fsesl/src/main/scala/org/bigbluebutton/endpoint/redis/FSESLRedisSubscriberActor.scala +++ b/akka-bbb-fsesl/src/main/scala/org/bigbluebutton/endpoint/redis/FSESLRedisSubscriberActor.scala @@ -1,66 +1,38 @@ package org.bigbluebutton.endpoint.redis -import java.io.PrintWriter -import java.io.StringWriter -import java.net.InetSocketAddress - import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration.DurationInt import org.bigbluebutton.SystemConfiguration -import org.bigbluebutton.common2.redis.RedisAppSubscriberActor -import org.bigbluebutton.common2.redis.RedisConfiguration -import org.bigbluebutton.common2.redis.RedisSubscriber -import org.bigbluebutton.freeswitch.bus.InJsonMsg -import org.bigbluebutton.freeswitch.bus.InsonMsgBus -import org.bigbluebutton.freeswitch.bus.ReceivedJsonMsg +import org.bigbluebutton.common2.bus.IncomingJsonMessageBus +import org.bigbluebutton.common2.redis.{ RedisSubscriber, RedisSubscriberProvider } import akka.actor.ActorSystem -import akka.actor.OneForOneStrategy import akka.actor.Props -import akka.actor.SupervisorStrategy.Resume -import redis.actors.RedisSubscriberActor -import redis.api.pubsub.Message -import redis.api.servers.ClientSetname -object FSESLRedisSubscriberActor extends RedisSubscriber with RedisConfiguration { +object FSESLRedisSubscriberActor extends RedisSubscriber { val channels = Seq(toVoiceConfRedisChannel) val patterns = Seq("bigbluebutton:to-voice-conf:*", "bigbluebutton:from-bbb-apps:*") - def props(system: ActorSystem, inJsonMgBus: InsonMsgBus): Props = - Props(classOf[FSESLRedisSubscriberActor], system, inJsonMgBus, + def props(system: ActorSystem, inJsonMgBus: IncomingJsonMessageBus): Props = + Props( + classOf[FSESLRedisSubscriberActor], + system, inJsonMgBus, redisHost, redisPort, - channels, patterns).withDispatcher("akka.rediscala-subscriber-worker-dispatcher") + channels, patterns).withDispatcher("akka.redis-subscriber-worker-dispatcher") } class FSESLRedisSubscriberActor( - val system: ActorSystem, - inJsonMgBus: InsonMsgBus, redisHost: String, - redisPort: Int, - channels: Seq[String] = Nil, patterns: Seq[String] = Nil) - extends RedisSubscriberActor( - new InetSocketAddress(redisHost, redisPort), - channels, patterns, onConnectStatus = connected => { println(s"connected: $connected") }) - with SystemConfiguration - with RedisAppSubscriberActor { + system: ActorSystem, + inJsonMgBus: IncomingJsonMessageBus, + redisHost: String, redisPort: Int, + channels: Seq[String] = Nil, patterns: Seq[String] = Nil) + extends RedisSubscriberProvider(system, "BbbFsEslAkkaSub", channels, patterns, inJsonMgBus) with SystemConfiguration { - override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { - case e: Exception => { - val sw: StringWriter = new StringWriter() - sw.write("An exception has been thrown on FSESlRedisSubscriberActor, exception message [" + e.getMessage() + "] (full stacktrace below)\n") - e.printStackTrace(new PrintWriter(sw)) - log.error(sw.toString()) - Resume - } - } var lastPongReceivedOn = 0L system.scheduler.schedule(10 seconds, 10 seconds)(checkPongMessage()) - // Set the name of this client to be able to distinguish when doing - // CLIENT LIST on redis-cli - write(ClientSetname("BbbFsEslAkkaSub").encodedRequest) - def checkPongMessage() { val now = System.currentTimeMillis() @@ -69,11 +41,6 @@ class FSESLRedisSubscriberActor( } } - def onMessage(message: Message) { - if (message.channel == toVoiceConfRedisChannel) { - val receivedJsonMessage = new ReceivedJsonMsg(message.channel, message.data.utf8String) - log.debug(s"RECEIVED:\n [${receivedJsonMessage.channel}] \n ${receivedJsonMessage.data} \n") - inJsonMgBus.publish(InJsonMsg(toFsAppsJsonChannel, receivedJsonMessage)) - } - } -} + addListener(toFsAppsJsonChannel) + subscribe() +} \ No newline at end of file diff --git a/akka-bbb-fsesl/src/main/scala/org/bigbluebutton/freeswitch/RxJsonMsgHdlrActor.scala b/akka-bbb-fsesl/src/main/scala/org/bigbluebutton/freeswitch/RxJsonMsgHdlrActor.scala index 378210c50ca7..dc839a9f3162 100755 --- a/akka-bbb-fsesl/src/main/scala/org/bigbluebutton/freeswitch/RxJsonMsgHdlrActor.scala +++ b/akka-bbb-fsesl/src/main/scala/org/bigbluebutton/freeswitch/RxJsonMsgHdlrActor.scala @@ -1,12 +1,16 @@ package org.bigbluebutton.freeswitch -import akka.actor.{ Actor, ActorLogging, Props } -import com.fasterxml.jackson.databind.JsonNode import org.bigbluebutton.SystemConfiguration +import org.bigbluebutton.common2.bus.ReceivedJsonMessage import org.bigbluebutton.common2.msgs._ -import org.bigbluebutton.freeswitch.bus.ReceivedJsonMsg import org.bigbluebutton.freeswitch.voice.freeswitch.FreeswitchApplication +import com.fasterxml.jackson.databind.JsonNode + +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.actor.Props + object RxJsonMsgHdlrActor { def props(fsApp: FreeswitchApplication): Props = Props(classOf[RxJsonMsgHdlrActor], fsApp) @@ -15,13 +19,13 @@ object RxJsonMsgHdlrActor { class RxJsonMsgHdlrActor(val fsApp: FreeswitchApplication) extends Actor with ActorLogging with SystemConfiguration with RxJsonMsgDeserializer { def receive = { - case msg: ReceivedJsonMsg => + case msg: ReceivedJsonMessage => log.debug("handling {} - {}", msg.channel, msg.data) handleReceivedJsonMessage(msg) case _ => // do nothing } - def handleReceivedJsonMessage(msg: ReceivedJsonMsg): Unit = { + def handleReceivedJsonMessage(msg: ReceivedJsonMessage): Unit = { for { envJsonNode <- JsonDeserializer.toBbbCommonEnvJsNodeMsg(msg.data) } yield handle(envJsonNode.envelope, envJsonNode.core) diff --git a/akka-bbb-fsesl/src/main/scala/org/bigbluebutton/freeswitch/bus/InJsonMsgBus.scala b/akka-bbb-fsesl/src/main/scala/org/bigbluebutton/freeswitch/bus/InJsonMsgBus.scala deleted file mode 100755 index 6f7bb865ea8c..000000000000 --- a/akka-bbb-fsesl/src/main/scala/org/bigbluebutton/freeswitch/bus/InJsonMsgBus.scala +++ /dev/null @@ -1,31 +0,0 @@ -package org.bigbluebutton.freeswitch.bus - -import akka.actor.ActorRef -import akka.event.{ EventBus, LookupClassification } - -case class ReceivedJsonMsg(channel: String, data: String) -case class InJsonMsg(val topic: String, val payload: ReceivedJsonMsg) - -class InsonMsgBus extends EventBus with LookupClassification { - type Event = InJsonMsg - type Classifier = String - type Subscriber = ActorRef - - // is used for extracting the classifier from the incoming events - override protected def classify(event: Event): Classifier = event.topic - - // will be invoked for each event for all subscribers which registered themselves - // for the event’s classifier - override protected def publish(event: Event, subscriber: Subscriber): Unit = { - subscriber ! event.payload - } - - // must define a full order over the subscribers, expressed as expected from - // `java.lang.Comparable.compare` - override protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = - a.compareTo(b) - - // determines the initial size of the index data structure - // used internally (i.e. the expected number of different classifiers) - override protected def mapSize: Int = 128 -} diff --git a/bbb-apps-common/project/Dependencies.scala b/bbb-apps-common/project/Dependencies.scala index 0d40045ef8ad..86f51a25ef7c 100644 --- a/bbb-apps-common/project/Dependencies.scala +++ b/bbb-apps-common/project/Dependencies.scala @@ -20,9 +20,6 @@ object Dependencies { val io = "2.6" val pool = "2.6.0" - // Redis - val redisScala = "1.8.0" - // BigBlueButton val bbbCommons = "0.0.20-SNAPSHOT" } @@ -42,8 +39,6 @@ object Dependencies { val apacheIo = "commons-io" % "commons-io" % Versions.io val apachePool2 = "org.apache.commons" % "commons-pool2" % Versions.pool - val redisScala = "com.github.etaty" % "rediscala_2.12" % Versions.redisScala - val bbbCommons = "org.bigbluebutton" % "bbb-common-message_2.12" % Versions.bbbCommons } @@ -58,6 +53,5 @@ object Dependencies { Compile.apacheLang, Compile.apacheIo, Compile.apachePool2, - Compile.bbbCommons, - Compile.redisScala) + Compile.bbbCommons) } diff --git a/bbb-apps-common/src/main/scala/org/bigbluebutton/client/ClientGWApplication.scala b/bbb-apps-common/src/main/scala/org/bigbluebutton/client/ClientGWApplication.scala index 1efa0124fdb8..bad84e615018 100644 --- a/bbb-apps-common/src/main/scala/org/bigbluebutton/client/ClientGWApplication.scala +++ b/bbb-apps-common/src/main/scala/org/bigbluebutton/client/ClientGWApplication.scala @@ -3,16 +3,16 @@ package org.bigbluebutton.client import akka.actor.ActorSystem import akka.event.Logging import org.bigbluebutton.client.bus._ -import org.bigbluebutton.client.endpoint.redis.AppsRedisSubscriberActor +import org.bigbluebutton.client.endpoint.redis.Red5AppsRedisSubscriberActor import org.bigbluebutton.client.meeting.MeetingManagerActor import org.bigbluebutton.common2.redis.RedisPublisher - -import scala.concurrent.duration._ -import org.bigbluebutton.common2.redis.MessageSender -import org.bigbluebutton.api2.bus.MsgFromAkkaAppsEventBus + +import scala.concurrent.duration._ +import org.bigbluebutton.common2.redis.MessageSender +import org.bigbluebutton.api2.bus.MsgFromAkkaAppsEventBus import org.bigbluebutton.common2.bus.JsonMsgFromAkkaAppsBus -class ClientGWApplication(val msgToClientGW: MsgToClientGW) extends SystemConfiguration{ +class ClientGWApplication(val msgToClientGW: MsgToClientGW) extends SystemConfiguration { implicit val system = ActorSystem("bbb-apps-common") implicit val timeout = akka.util.Timeout(3 seconds) @@ -45,19 +45,17 @@ class ClientGWApplication(val msgToClientGW: MsgToClientGW) extends SystemConfig msgToClientEventBus.subscribe(msgToClientJsonActor, toClientChannel) - private val appsRedisSubscriberActor = system.actorOf( - AppsRedisSubscriberActor.props(receivedJsonMsgBus), "appsRedisSubscriberActor") + private val appsRedisSubscriberActor = system.actorOf(Red5AppsRedisSubscriberActor.props(system, receivedJsonMsgBus), "appsRedisSubscriberActor") private val receivedJsonMsgHdlrActor = system.actorOf( ReceivedJsonMsgHdlrActor.props(msgFromAkkaAppsEventBus), "receivedJsonMsgHdlrActor") receivedJsonMsgBus.subscribe(receivedJsonMsgHdlrActor, fromAkkaAppsJsonChannel) - /** - * - * External Interface for Gateway - */ + * + * External Interface for Gateway + */ def connect(connInfo: ConnInfo): Unit = { //log.debug("**** ClientGWApplication connect " + connInfo) diff --git a/bbb-apps-common/src/main/scala/org/bigbluebutton/client/endpoint/redis/Red5AppsRedisSubscriberActor.scala b/bbb-apps-common/src/main/scala/org/bigbluebutton/client/endpoint/redis/Red5AppsRedisSubscriberActor.scala index e32fcc75d45c..e23985f145a1 100644 --- a/bbb-apps-common/src/main/scala/org/bigbluebutton/client/endpoint/redis/Red5AppsRedisSubscriberActor.scala +++ b/bbb-apps-common/src/main/scala/org/bigbluebutton/client/endpoint/redis/Red5AppsRedisSubscriberActor.scala @@ -1,59 +1,50 @@ package org.bigbluebutton.client.endpoint.redis -import akka.actor.{ ActorLogging, OneForOneStrategy, Props } -import akka.actor.SupervisorStrategy.Resume -import java.io.{ PrintWriter, StringWriter } -import java.net.InetSocketAddress - -import redis.actors.RedisSubscriberActor -import redis.api.pubsub.{ Message, PMessage } - -import scala.concurrent.duration._ -import org.bigbluebutton.client._ -import redis.api.servers.ClientSetname -import org.bigbluebutton.common2.redis.RedisAppSubscriberActor -import org.bigbluebutton.common2.bus.{ JsonMsgFromAkkaApps, JsonMsgFromAkkaAppsBus, JsonMsgFromAkkaAppsEvent } -import org.bigbluebutton.common2.redis.{ RedisConfiguration, RedisSubscriber } - -object AppsRedisSubscriberActor extends RedisSubscriber with RedisConfiguration with SystemConfiguration { +import org.bigbluebutton.common2.redis.RedisSubscriberProvider +import io.lettuce.core.pubsub.RedisPubSubListener +import org.bigbluebutton.common2.bus.JsonMsgFromAkkaApps +import org.bigbluebutton.common2.redis.RedisConfiguration +import org.bigbluebutton.client.SystemConfiguration +import akka.actor.ActorSystem +import org.bigbluebutton.common2.redis.RedisSubscriber +import org.bigbluebutton.common2.bus.JsonMsgFromAkkaAppsBus +import akka.actor.Props +import org.bigbluebutton.common2.bus.JsonMsgFromAkkaAppsEvent + +object Red5AppsRedisSubscriberActor extends RedisSubscriber with RedisConfiguration with SystemConfiguration { val channels = Seq(fromAkkaAppsRedisChannel, fromAkkaAppsWbRedisChannel, fromAkkaAppsChatRedisChannel, fromAkkaAppsPresRedisChannel, fromThirdPartyRedisChannel) val patterns = Seq("bigbluebutton:from-bbb-apps:*") - def props(jsonMsgBus: JsonMsgFromAkkaAppsBus): Props = - Props(classOf[AppsRedisSubscriberActor], jsonMsgBus, + def props(system: ActorSystem, jsonMsgBus: JsonMsgFromAkkaAppsBus): Props = + Props( + classOf[Red5AppsRedisSubscriberActor], + system, jsonMsgBus, redisHost, redisPort, - channels, patterns).withDispatcher("akka.rediscala-subscriber-worker-dispatcher") + channels, patterns).withDispatcher("akka.redis-subscriber-worker-dispatcher") } -class AppsRedisSubscriberActor(jsonMsgBus: JsonMsgFromAkkaAppsBus, redisHost: String, - redisPort: Int, - channels: Seq[String] = Nil, patterns: Seq[String] = Nil) - extends RedisSubscriberActor( - new InetSocketAddress(redisHost, redisPort), - channels, patterns, onConnectStatus = connected => { println(s"connected: $connected") }) - with SystemConfiguration with ActorLogging with RedisAppSubscriberActor { - - override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { - case e: Exception => { - val sw: StringWriter = new StringWriter() - sw.write("An exception has been thrown on AppsRedisSubscriberActor, exception message [" + e.getMessage() + "] (full stacktrace below)\n") - e.printStackTrace(new PrintWriter(sw)) - log.error(sw.toString()) - Resume - } +class Red5AppsRedisSubscriberActor(system: ActorSystem, jsonMsgBus: JsonMsgFromAkkaAppsBus, + redisHost: String, redisPort: Int, + channels: Seq[String] = Nil, patterns: Seq[String] = Nil) + extends RedisSubscriberProvider(system, "Red5AppsSub", channels, patterns, null) with SystemConfiguration { + + override def addListener(appChannel: String) { + connection.addListener(new RedisPubSubListener[String, String] { + def message(channel: String, message: String): Unit = { + if (channels.contains(channel)) { + val receivedJsonMessage = new JsonMsgFromAkkaApps(channel, message) + jsonMsgBus.publish(JsonMsgFromAkkaAppsEvent(fromAkkaAppsJsonChannel, receivedJsonMessage)) + } + } + def message(pattern: String, channel: String, message: String): Unit = { log.info("Subscribed to channel {} with pattern {}", channel, pattern) } + def psubscribed(pattern: String, count: Long): Unit = { log.info("Subscribed to pattern {}", pattern) } + def punsubscribed(pattern: String, count: Long): Unit = { log.info("Unsubscribed to pattern {}", pattern) } + def subscribed(channel: String, count: Long): Unit = { log.info("Subscribed to pattern {}", channel) } + def unsubscribed(channel: String, count: Long): Unit = { log.info("Subscribed to channel {}", channel) } + }) } - // Set the name of this client to be able to distinguish when doing - // CLIENT LIST on redis-cli - write(ClientSetname("Red5AppsSub").encodedRequest) - - def onMessage(message: Message) { - if (channels.contains(message.channel)) { - //log.debug(s"RECEIVED:\n ${message.data.utf8String} \n") - val receivedJsonMessage = new JsonMsgFromAkkaApps(message.channel, message.data.utf8String) - jsonMsgBus.publish(JsonMsgFromAkkaAppsEvent(fromAkkaAppsJsonChannel, receivedJsonMessage)) - } - - } + addListener(null) + subscribe() } diff --git a/bbb-common-message/project/Dependencies.scala b/bbb-common-message/project/Dependencies.scala index 79afaa8e5515..c8d886208a73 100644 --- a/bbb-common-message/project/Dependencies.scala +++ b/bbb-common-message/project/Dependencies.scala @@ -21,7 +21,6 @@ object Dependencies { val pool = "2.6.0" // Redis - val redisScala = "1.8.0" val lettuce = "5.1.3.RELEASE" // Test @@ -40,7 +39,6 @@ object Dependencies { val red5 = "org.red5" % "red5-server-common" % Versions.red5 val apachePool2 = "org.apache.commons" % "commons-pool2" % Versions.pool - val redisScala = "com.github.etaty" % "rediscala_2.12" % Versions.redisScala val lettuceCore = "io.lettuce" % "lettuce-core" % Versions.lettuce } @@ -66,6 +64,5 @@ object Dependencies { Compile.sl4jApi, Compile.red5, Compile.apachePool2, - Compile.lettuceCore, - Compile.redisScala) ++ testing + Compile.lettuceCore) ++ testing } diff --git a/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/RedisAwareCommunicator.java b/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/RedisAwareCommunicator.java index 3455b0bd94b7..1a8d5f0108a8 100644 --- a/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/RedisAwareCommunicator.java +++ b/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/RedisAwareCommunicator.java @@ -31,6 +31,7 @@ public abstract class RedisAwareCommunicator { protected String password; protected int port; protected String clientName; + protected int expireKey; public abstract void start(); @@ -52,6 +53,10 @@ public void setPort(int port) { this.port = port; } + public void setExpireKey(int expireKey) { + this.expireKey = expireKey; + } + protected GenericObjectPoolConfig createPoolingConfig() { GenericObjectPoolConfig config = new GenericObjectPoolConfig(); config.setMaxTotal(32); diff --git a/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/RedisStorageService.java b/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/RedisStorageService.java index 3b35c3b78dbf..2ddc41dada7c 100644 --- a/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/RedisStorageService.java +++ b/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/RedisStorageService.java @@ -36,22 +36,18 @@ public class RedisStorageService extends RedisAwareCommunicator { private static Logger log = LoggerFactory.getLogger(RedisStorageService.class); - private int expireKey; - GenericObjectPool> connectionPool; public void start() { log.info("Starting RedisStorageService with client name: {}", clientName); - RedisURI redisUri = RedisURI.Builder.redis(this.host, this.port).withClientName(this.clientName).build(); - if (!this.password.isEmpty()) { - redisUri.setPassword(this.password); - } + RedisURI redisUri = RedisURI.Builder.redis(this.host, this.port).withClientName(this.clientName) + .withPassword(this.password).build(); redisClient = RedisClient.create(redisUri); redisClient.setOptions(ClientOptions.builder().autoReconnect(true).build()); - connectionPool = ConnectionPoolSupport.createGenericObjectPool(() -> redisClient.connectPubSub(), - createPoolingConfig()); + connectionPool = ConnectionPoolSupport.createGenericObjectPool( + () -> (codec != null) ? redisClient.connect(codec) : redisClient.connect(), createPoolingConfig()); } public void stop() { @@ -122,7 +118,6 @@ public void recordAndExpire(String meetingId, Map event) { Long msgid = commands.incr("global:nextRecordedMsgId"); commands.hmset("recording:" + meetingId + ":" + msgid, event); - commands.rpush("meeting:" + meetingId + ":recordings", Long.toString(msgid)); /** * We set the key to expire after 14 days as we are still recording * the event into redis even if the meeting is not recorded. (ralam @@ -139,10 +134,6 @@ public void recordAndExpire(String meetingId, Map event) { } } - public void setExpireKey(int expireKey) { - this.expireKey = expireKey; - } - private String recordMeeting(String key, Map info) { log.debug("Storing metadata {}", info); String result = ""; @@ -155,6 +146,5 @@ private String recordMeeting(String key, Map info) { connectionPool.close(); } return result; - } } \ No newline at end of file diff --git a/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisAppSubscriberActor.scala b/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisAppSubscriberActor.scala deleted file mode 100644 index 5ee1ee8d81be..000000000000 --- a/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisAppSubscriberActor.scala +++ /dev/null @@ -1,18 +0,0 @@ -package org.bigbluebutton.common2.redis - -import akka.actor.Actor -import akka.event.LoggingAdapter -import redis.api.pubsub.PMessage - -trait RedisAppSubscriberActor extends Actor { - - def log: LoggingAdapter - - def handleMessage(msg: String) { - throw new UnsupportedOperationException(); - } - - def onPMessage(pmessage: PMessage) { - throw new UnsupportedOperationException(); - } -} diff --git a/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisClientProvider.scala b/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisClientProvider.scala new file mode 100644 index 000000000000..afeba57bf44c --- /dev/null +++ b/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisClientProvider.scala @@ -0,0 +1,15 @@ +package org.bigbluebutton.common2.redis + +import akka.actor.ActorSystem +import io.lettuce.core.ClientOptions +import io.lettuce.core.RedisClient +import io.lettuce.core.RedisURI + +abstract class RedisClientProvider(val system: ActorSystem, val clientName: String) extends RedisConfiguration { + // Set the name of this client to be able to distinguish when doing + // CLIENT LIST on redis-cli + val redisUri = RedisURI.Builder.redis(redisHost, redisPort).withClientName(clientName).withPassword(redisPassword).build() + + var redis = RedisClient.create(redisUri) + redis.setOptions(ClientOptions.builder().autoReconnect(true).build()) +} diff --git a/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisConfiguration.scala b/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisConfiguration.scala index ff0e27a0acc6..8dcfd16ff578 100644 --- a/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisConfiguration.scala +++ b/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisConfiguration.scala @@ -10,6 +10,7 @@ trait RedisConfiguration { lazy val redisHost = Try(config.getString("redis.host")).getOrElse("127.0.0.1") lazy val redisPort = Try(config.getInt("redis.port")).getOrElse(6379) lazy val redisPassword = Try(config.getString("redis.password")).getOrElse("") + lazy val redisExpireKey = Try(config.getInt("redis.keyExpiry")).getOrElse(1209600) // Redis channels lazy val toAkkaAppsRedisChannel = Try(config.getString("redis.toAkkaAppsRedisChannel")).getOrElse("to-akka-apps-redis-channel") diff --git a/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisPublisher.scala b/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisPublisher.scala index 10b21ab9449d..60bfcbeec9e1 100755 --- a/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisPublisher.scala +++ b/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisPublisher.scala @@ -1,22 +1,15 @@ package org.bigbluebutton.common2.redis import akka.actor.ActorSystem -import akka.event.Logging -import akka.util.ByteString -import redis.RedisClient -class RedisPublisher(val system: ActorSystem, val clientName: String) extends RedisConfiguration { +class RedisPublisher(system: ActorSystem, clientName: String) extends RedisClientProvider(system, clientName) { + val connection = redis.connectPubSub() - val redis = RedisClient(host = redisHost, password = Some(redisPassword), port= redisPort)(system) - - val log = Logging(system, getClass) - - // Set the name of this client to be able to distinguish when doing - // CLIENT LIST on redis-cli - redis.clientSetname(clientName) + redis.connect() def publish(channel: String, data: String) { - redis.publish(channel, ByteString(data)) + val async = connection.async(); + async.publish(channel, data); } - + } diff --git a/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisStorageProvider.scala b/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisStorageProvider.scala new file mode 100644 index 000000000000..71fc9df41a4c --- /dev/null +++ b/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisStorageProvider.scala @@ -0,0 +1,13 @@ +package org.bigbluebutton.common2.redis + +import akka.actor.ActorSystem + +abstract class RedisStorageProvider(system: ActorSystem, clientName: String) extends RedisConfiguration { + var redis = new RedisStorageService() + redis.setHost(redisHost) + redis.setPort(redisPort) + redis.setPassword(redisPassword) + redis.setExpireKey(redisExpireKey) + redis.setClientName(clientName) + redis.start(); +} diff --git a/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisSubscriber.scala b/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisSubscriber.scala index 630f07909935..5c65a3385776 100644 --- a/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisSubscriber.scala +++ b/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisSubscriber.scala @@ -1,6 +1,6 @@ package org.bigbluebutton.common2.redis -abstract class RedisSubscriber extends RedisConfiguration { +trait RedisSubscriber extends RedisConfiguration { val channels: Seq[String] val patterns: Seq[String] } diff --git a/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisSubscriberProvider.scala b/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisSubscriberProvider.scala new file mode 100644 index 000000000000..90febddff182 --- /dev/null +++ b/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisSubscriberProvider.scala @@ -0,0 +1,63 @@ +package org.bigbluebutton.common2.redis + +import akka.actor.ActorSystem +import org.bigbluebutton.common2.bus.ReceivedJsonMessage +import org.bigbluebutton.common2.bus.IncomingJsonMessage +import io.lettuce.core.pubsub.RedisPubSubListener +import org.bigbluebutton.common2.bus.IncomingJsonMessageBus +import akka.actor.ActorLogging +import akka.actor.Actor + +import akka.actor.ActorSystem +import akka.actor.OneForOneStrategy +import akka.actor.SupervisorStrategy.Resume +import java.io.StringWriter +import scala.concurrent.duration._ +import java.io.PrintWriter + +abstract class RedisSubscriberProvider(system: ActorSystem, clientName: String, + channels: Seq[String], patterns: Seq[String], + jsonMsgBus: IncomingJsonMessageBus) + extends RedisClientProvider(system, clientName) with Actor with ActorLogging { + var connection = redis.connectPubSub() + + def addListener(appChannel: String) { + connection.addListener(new RedisPubSubListener[String, String] { + def message(channel: String, message: String): Unit = { + if (channels.contains(channel)) { + val receivedJsonMessage = new ReceivedJsonMessage(channel, message) + jsonMsgBus.publish(IncomingJsonMessage(appChannel, receivedJsonMessage)) + } + } + def message(pattern: String, channel: String, message: String): Unit = { log.info("Subscribed to channel {} with pattern {}", channel, pattern) } + def psubscribed(pattern: String, count: Long): Unit = { log.info("Subscribed to pattern {}", pattern) } + def punsubscribed(pattern: String, count: Long): Unit = { log.info("Unsubscribed to pattern {}", pattern) } + def subscribed(channel: String, count: Long): Unit = { log.info("Subscribed to pattern {}", channel) } + def unsubscribed(channel: String, count: Long): Unit = { log.info("Subscribed to channel {}", channel) } + }) + } + + def subscribe() { + val async = connection.async() + for (channel <- channels) async.subscribe(channel) + for (pattern <- patterns) async.psubscribe(pattern) + } + + override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { + case e: Exception => { + val sw: StringWriter = new StringWriter() + sw.write("An exception has been thrown on " + getClass + ", exception message [" + e.getMessage + "] (full stacktrace below)\n") + e.printStackTrace(new PrintWriter(sw)) + log.error(sw.toString()) + Resume + } + } + + def publishEvent() { + + } + + def receive = { + case _ => // do nothing + } +} diff --git a/bbb-common-web/project/Dependencies.scala b/bbb-common-web/project/Dependencies.scala index 149bd135b8b6..11f02264ce17 100644 --- a/bbb-common-web/project/Dependencies.scala +++ b/bbb-common-web/project/Dependencies.scala @@ -32,9 +32,6 @@ object Dependencies { val io = "2.6" val pool = "2.6.0" - // Redis - val redisScala = "1.8.0" - // BigBlueButton val bbbCommons = "0.0.20-SNAPSHOT" @@ -69,8 +66,6 @@ object Dependencies { val apacheIo = "commons-io" % "commons-io" % Versions.io val apachePool2 = "org.apache.commons" % "commons-pool2" % Versions.pool - val redisScala = "com.github.etaty" % "rediscala_2.12" % Versions.redisScala - val bbbCommons = "org.bigbluebutton" % "bbb-common-message_2.12" % Versions.bbbCommons } @@ -104,7 +99,5 @@ object Dependencies { Compile.apacheLang, Compile.apacheIo, Compile.apachePool2, - Compile.redisScala, - Compile.bbbCommons, - ) ++ testing + Compile.bbbCommons) ++ testing } diff --git a/bbb-common-web/src/main/scala/org/bigbluebutton/api2/BbbWebApiGWApp.scala b/bbb-common-web/src/main/scala/org/bigbluebutton/api2/BbbWebApiGWApp.scala index 59e4999fe0a3..6d62c007f4b2 100755 --- a/bbb-common-web/src/main/scala/org/bigbluebutton/api2/BbbWebApiGWApp.scala +++ b/bbb-common-web/src/main/scala/org/bigbluebutton/api2/BbbWebApiGWApp.scala @@ -10,8 +10,8 @@ import org.bigbluebutton.common2.redis.MessageSender import org.bigbluebutton.api2.meeting.{ OldMeetingMsgHdlrActor, RegisterUser } import org.bigbluebutton.common2.domain._ import org.bigbluebutton.presentation.messages._ -import scala.concurrent.duration._ -import org.bigbluebutton.common2.redis._ +import scala.concurrent.duration._ +import org.bigbluebutton.common2.redis._ import org.bigbluebutton.common2.bus._ class BbbWebApiGWApp( @@ -54,8 +54,7 @@ class BbbWebApiGWApp( msgToAkkaAppsEventBus.subscribe(msgToAkkaAppsToJsonActor, toAkkaAppsChannel) - private val appsRedisSubscriberActor = system.actorOf( - WebRedisSubscriberActor.props(receivedJsonMsgBus, oldMessageEventBus), "appsRedisSubscriberActor") + private val appsRedisSubscriberActor = system.actorOf(WebRedisSubscriberActor.props(system, receivedJsonMsgBus, oldMessageEventBus), "appsRedisSubscriberActor") private val receivedJsonMsgHdlrActor = system.actorOf( ReceivedJsonMsgHdlrActor.props(msgFromAkkaAppsEventBus), "receivedJsonMsgHdlrActor") diff --git a/bbb-common-web/src/main/scala/org/bigbluebutton/api2/endpoint/redis/RedisDataStorageActor.scala b/bbb-common-web/src/main/scala/org/bigbluebutton/api2/endpoint/redis/RedisDataStorageActor.scala index 40e486e21c08..e4d6628018a0 100755 --- a/bbb-common-web/src/main/scala/org/bigbluebutton/api2/endpoint/redis/RedisDataStorageActor.scala +++ b/bbb-common-web/src/main/scala/org/bigbluebutton/api2/endpoint/redis/RedisDataStorageActor.scala @@ -1,8 +1,12 @@ package org.bigbluebutton.api2.endpoint.redis -import akka.actor.{ Actor, ActorLogging, ActorSystem, Props } import org.bigbluebutton.api2.SystemConfiguration -import redis.RedisClient +import org.bigbluebutton.common2.redis.RedisStorageProvider + +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.actor.ActorSystem +import akka.actor.Props case class RecordMeetingInfoMsg(meetingId: String, info: collection.immutable.Map[String, String]) case class RecordBreakoutInfoMsg(meetingId: String, info: collection.immutable.Map[String, String]) @@ -13,13 +17,10 @@ object RedisDataStorageActor { def props(system: ActorSystem): Props = Props(classOf[RedisDataStorageActor], system) } -class RedisDataStorageActor(val system: ActorSystem) extends Actor with ActorLogging with SystemConfiguration { - - val redis = RedisClient(host = redisHost, password = Some(redisPassword), port = redisPort)(system) - - // Set the name of this client to be able to distinguish when doing - // CLIENT LIST on redis-cli - redis.clientSetname("BbbWebStore") +class RedisDataStorageActor(val system: ActorSystem) + extends RedisStorageProvider(system, "BbbWebStore") + with SystemConfiguration + with Actor with ActorLogging { def receive = { case msg: RecordMeetingInfoMsg => handleRecordMeetingInfoMsg(msg) @@ -29,20 +30,19 @@ class RedisDataStorageActor(val system: ActorSystem) extends Actor with ActorLog } def handleRecordMeetingInfoMsg(msg: RecordMeetingInfoMsg): Unit = { - redis.hmset("meeting:info:" + msg.meetingId, msg.info) + redis.recordMeetingInfo(msg.meetingId, msg.info.asInstanceOf[java.util.Map[java.lang.String, java.lang.String]]); } def handleRecordBreakoutInfoMsg(msg: RecordBreakoutInfoMsg): Unit = { - redis.hmset("meeting:breakout:" + msg.meetingId, msg.info) + redis.recordBreakoutInfo(msg.meetingId, msg.info.asInstanceOf[java.util.Map[java.lang.String, java.lang.String]]) } def handleAddBreakoutRoomMsg(msg: AddBreakoutRoomMsg): Unit = { - redis.sadd("meeting:breakout:rooms:" + msg.parentId, msg.breakoutId) + redis.addBreakoutRoom(msg.parentId, msg.breakoutId) } def handleRemoveMeetingMsg(msg: RemoveMeetingMsg): Unit = { - redis.del("meeting-" + msg.meetingId) - redis.srem("meetings", msg.meetingId) + redis.removeMeeting(msg.meetingId) } } diff --git a/bbb-common-web/src/main/scala/org/bigbluebutton/api2/endpoint/redis/WebRedisSubscriberActor.scala b/bbb-common-web/src/main/scala/org/bigbluebutton/api2/endpoint/redis/WebRedisSubscriberActor.scala index fff005096388..f6158998de67 100755 --- a/bbb-common-web/src/main/scala/org/bigbluebutton/api2/endpoint/redis/WebRedisSubscriberActor.scala +++ b/bbb-common-web/src/main/scala/org/bigbluebutton/api2/endpoint/redis/WebRedisSubscriberActor.scala @@ -1,68 +1,53 @@ package org.bigbluebutton.api2.endpoint.redis -import java.io.{ PrintWriter, StringWriter } -import java.net.InetSocketAddress - -import akka.actor.SupervisorStrategy.Resume -import akka.actor.{ OneForOneStrategy, Props } -import redis.api.servers.ClientSetname -import redis.actors.RedisSubscriberActor -import redis.api.pubsub.{ Message, PMessage } -import scala.concurrent.duration._ - -import org.bigbluebutton.api2._ -import org.bigbluebutton.common2.redis._ -import org.bigbluebutton.common2.redis.RedisConfiguration +import org.bigbluebutton.api2.SystemConfiguration import org.bigbluebutton.common2.bus._ +import org.bigbluebutton.common2.redis.{ RedisConfiguration, RedisSubscriber, RedisSubscriberProvider } + +import akka.actor.ActorSystem +import akka.actor.Props +import io.lettuce.core.pubsub.RedisPubSubListener object WebRedisSubscriberActor extends RedisSubscriber with RedisConfiguration { val channels = Seq(fromAkkaAppsRedisChannel) val patterns = Seq("bigbluebutton:from-bbb-apps:*") - def props(jsonMsgBus: JsonMsgFromAkkaAppsBus, oldMessageEventBus: OldMessageEventBus): Props = - Props(classOf[WebRedisSubscriberActor], jsonMsgBus, oldMessageEventBus, + def props(system: ActorSystem, jsonMsgBus: JsonMsgFromAkkaAppsBus, oldMessageEventBus: OldMessageEventBus): Props = + Props( + classOf[WebRedisSubscriberActor], + system, jsonMsgBus, oldMessageEventBus, redisHost, redisPort, - channels, patterns).withDispatcher("akka.rediscala-subscriber-worker-dispatcher") + channels, patterns).withDispatcher("akka.redis-subscriber-worker-dispatcher") } -class WebRedisSubscriberActor(jsonMsgBus: JsonMsgFromAkkaAppsBus, oldMessageEventBus: OldMessageEventBus, redisHost: String, - redisPort: Int, - channels: Seq[String] = Nil, patterns: Seq[String] = Nil) - extends RedisSubscriberActor( - new InetSocketAddress(redisHost, redisPort), - channels, patterns, onConnectStatus = connected => { println(s"connected: $connected") }) - with SystemConfiguration - with RedisAppSubscriberActor { - - override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { - case e: Exception => { - val sw: StringWriter = new StringWriter() - sw.write("An exception has been thrown on AppsRedisSubscriberActor, exception message [" + e.getMessage() + "] (full stacktrace below)\n") - e.printStackTrace(new PrintWriter(sw)) - log.error(sw.toString()) - Resume - } +class WebRedisSubscriberActor( + system: ActorSystem, + jsonMsgBus: JsonMsgFromAkkaAppsBus, oldMessageEventBus: OldMessageEventBus, redisHost: String, + redisPort: Int, + channels: Seq[String] = Nil, patterns: Seq[String] = Nil) + extends RedisSubscriberProvider(system, "BbbWebSub", channels, patterns, null) with SystemConfiguration { + + override def addListener(appChannel: String) { + connection.addListener(new RedisPubSubListener[String, String] { + def message(channel: String, message: String): Unit = { + if (channels.contains(channel)) { + val receivedJsonMessage = new JsonMsgFromAkkaApps(channel, message) + jsonMsgBus.publish(JsonMsgFromAkkaAppsEvent(fromAkkaAppsJsonChannel, receivedJsonMessage)) + } + } + def message(pattern: String, channel: String, message: String): Unit = { + log.debug(s"RECEIVED:\n ${message} \n") + val receivedJsonMessage = new OldReceivedJsonMessage(pattern, channel, message) + oldMessageEventBus.publish(OldIncomingJsonMessage(fromAkkaAppsOldJsonChannel, receivedJsonMessage)) + } + def psubscribed(pattern: String, count: Long): Unit = { log.info("Subscribed to pattern {}", pattern) } + def punsubscribed(pattern: String, count: Long): Unit = { log.info("Unsubscribed to pattern {}", pattern) } + def subscribed(channel: String, count: Long): Unit = { log.info("Subscribed to pattern {}", channel) } + def unsubscribed(channel: String, count: Long): Unit = { log.info("Subscribed to channel {}", channel) } + }) } - // Set the name of this client to be able to distinguish when doing - // CLIENT LIST on redis-cli - write(ClientSetname("Red5AppsSub").encodedRequest) - - def onMessage(message: Message) { - //log.error(s"SHOULD NOT BE RECEIVING: $message") - if (message.channel == fromAkkaAppsRedisChannel) { - val receivedJsonMessage = new JsonMsgFromAkkaApps(message.channel, message.data.utf8String) - jsonMsgBus.publish(JsonMsgFromAkkaAppsEvent(fromAkkaAppsJsonChannel, receivedJsonMessage)) - } - } - - override def onPMessage(pmessage: PMessage) { - log.debug(s"RECEIVED:\n ${pmessage.data.utf8String} \n") - val receivedJsonMessage = new OldReceivedJsonMessage( - pmessage.patternMatched, - pmessage.channel, pmessage.data.utf8String) - - oldMessageEventBus.publish(OldIncomingJsonMessage(fromAkkaAppsOldJsonChannel, receivedJsonMessage)) - } + addListener(fromAkkaAppsJsonChannel) + subscribe() } diff --git a/bbb-fsesl-client/build.sbt b/bbb-fsesl-client/build.sbt index 426c6351836e..4399efc2a4eb 100755 --- a/bbb-fsesl-client/build.sbt +++ b/bbb-fsesl-client/build.sbt @@ -2,7 +2,7 @@ import org.bigbluebutton.build._ description := "BigBlueButton custom FS-ESL client built on top of FS-ESL Java library." -version := "0.0.7" +version := "0.0.7-SNAPSHOT" val compileSettings = Seq( organization := "org.bigbluebutton", diff --git a/bbb-screenshare/app/build.sbt b/bbb-screenshare/app/build.sbt index af7080d45151..a49598ac7d64 100755 --- a/bbb-screenshare/app/build.sbt +++ b/bbb-screenshare/app/build.sbt @@ -25,7 +25,6 @@ val compileSettings = Seq( resolvers ++= Seq( "spray repo" at "http://repo.spray.io/", - "rediscala" at "http://dl.bintray.com/etaty/maven", "blindside-repos" at "http://blindside.googlecode.com/svn/repository/" ) diff --git a/bbb-screenshare/app/deploy.sh b/bbb-screenshare/app/deploy.sh index cfc6e31a656e..69d99a0f08fa 100755 --- a/bbb-screenshare/app/deploy.sh +++ b/bbb-screenshare/app/deploy.sh @@ -19,8 +19,7 @@ sudo cp target/webapp/WEB-INF/lib/bbb-screenshare-akka_2.12-0.0.3.jar \ target/webapp/WEB-INF/lib/gson-2.8.5.jar \ target/webapp/WEB-INF/lib/commons-pool2-2.6.0.jar \ target/webapp/WEB-INF/lib/spring-webmvc-4.3.12.RELEASE.jar \ - target/webapp/WEB-INF/lib/rediscala_2.12-1.8.0.jar \ - target/webapp/WEB-INF/lib/bbb-common-message_2.12-0.0.20-SNAPSHOT.jar \ + target/webapp/WEB-INF/lib/bbb-common-message_2.12-0.0.20-SNAPSHOT.jar \ target/webapp/WEB-INF/lib/lettuce-core-5.1.3.RELEASE.jar \ target/webapp/WEB-INF/lib/netty-* \ target/webapp/WEB-INF/lib/reactor-core-3.2.3.RELEASE.jar \ diff --git a/bbb-screenshare/app/project/Dependencies.scala b/bbb-screenshare/app/project/Dependencies.scala index 0133264452bb..447f11e57374 100644 --- a/bbb-screenshare/app/project/Dependencies.scala +++ b/bbb-screenshare/app/project/Dependencies.scala @@ -28,7 +28,6 @@ object Dependencies { val pool2 = "2.6.0" // Redis - val redisScala = "1.8.0" val lettuce = "5.1.3.RELEASE" // BigBlueButton @@ -65,7 +64,6 @@ object Dependencies { val apacheLang = "org.apache.commons" % "commons-lang3" % Versions.lang val apachePool2 = "org.apache.commons" % "commons-pool2" % Versions.pool2 - val redisScala = "com.github.etaty" % "rediscala_2.12" % Versions.redisScala val lettuceCore = "io.lettuce" % "lettuce-core" % Versions.lettuce val bbbCommons = "org.bigbluebutton" % "bbb-common-message_2.12" % Versions.bbbCommons @@ -102,7 +100,6 @@ object Dependencies { Compile.commonsCodec, Compile.apacheLang, Compile.apachePool2, - Compile.redisScala, Compile.lettuceCore, Compile.bbbCommons) ++ testing } diff --git a/bbb-screenshare/app/src/main/scala/org/bigbluebutton/app/screenshare/ScreenShareApplication.scala b/bbb-screenshare/app/src/main/scala/org/bigbluebutton/app/screenshare/ScreenShareApplication.scala index 79bd740326e5..d1c42cdd8c86 100755 --- a/bbb-screenshare/app/src/main/scala/org/bigbluebutton/app/screenshare/ScreenShareApplication.scala +++ b/bbb-screenshare/app/src/main/scala/org/bigbluebutton/app/screenshare/ScreenShareApplication.scala @@ -26,10 +26,10 @@ import org.bigbluebutton.app.screenshare.server.sessions.ScreenshareManager import org.bigbluebutton.app.screenshare.server.sessions.messages._ import org.bigbluebutton.app.screenshare.server.util.LogHelper import akka.actor.ActorSystem -import org.bigbluebutton.app.screenshare.redis.{ AppsRedisSubscriberActor, ReceivedJsonMsgHandlerActor } +import org.bigbluebutton.app.screenshare.redis.{ ScreenshareRedisSubscriberActor, ReceivedJsonMsgHandlerActor } import scala.concurrent.{ Await, Future, TimeoutException } -import scala.concurrent.duration._ +import scala.concurrent.duration._ import org.bigbluebutton.common2.bus.IncomingJsonMessageBus class ScreenShareApplication(val bus: IEventsMessageBus, val jnlpFile: String, @@ -47,7 +47,7 @@ class ScreenShareApplication(val bus: IEventsMessageBus, val jnlpFile: String, //logger.debug("*********** meetingManagerChannel = " + meetingManagerChannel) val incomingJsonMessageBus = new IncomingJsonMessageBus - val redisSubscriberActor = system.actorOf(AppsRedisSubscriberActor.props(incomingJsonMessageBus), "redis-subscriber") + val redisSubscriberActor = system.actorOf(ScreenshareRedisSubscriberActor.props(system, incomingJsonMessageBus), "redis-subscriber") val screenShareManager = system.actorOf(ScreenshareManager.props(system, bus), "screenshare-manager") diff --git a/bbb-screenshare/app/src/main/scala/org/bigbluebutton/app/screenshare/redis/ScreenshareRedisSubscriberActor.scala b/bbb-screenshare/app/src/main/scala/org/bigbluebutton/app/screenshare/redis/ScreenshareRedisSubscriberActor.scala index 1a9aa5053a3f..367d4f6f7d3c 100755 --- a/bbb-screenshare/app/src/main/scala/org/bigbluebutton/app/screenshare/redis/ScreenshareRedisSubscriberActor.scala +++ b/bbb-screenshare/app/src/main/scala/org/bigbluebutton/app/screenshare/redis/ScreenshareRedisSubscriberActor.scala @@ -1,60 +1,32 @@ package org.bigbluebutton.app.screenshare.redis -import java.io.{ PrintWriter, StringWriter } -import java.net.InetSocketAddress - -import akka.actor.{ OneForOneStrategy, Props } -import akka.actor.SupervisorStrategy.Resume import org.bigbluebutton.app.screenshare.SystemConfiguration -import redis.actors.RedisSubscriberActor -import redis.api.servers.ClientSetname -import redis.actors.RedisSubscriberActor -import redis.api.pubsub.{ Message, PMessage } -import scala.concurrent.duration._ -import org.bigbluebutton.common2.bus.{ IncomingJsonMessageBus, ReceivedJsonMessage, IncomingJsonMessage } +import org.bigbluebutton.common2.bus.IncomingJsonMessageBus +import org.bigbluebutton.common2.redis.RedisSubscriberProvider + +import akka.actor.ActorSystem +import akka.actor.Props -object AppsRedisSubscriberActor extends SystemConfiguration { +object ScreenshareRedisSubscriberActor extends SystemConfiguration { val channels = Seq(fromAkkaAppsRedisChannel) val patterns = Seq("bigbluebutton:to-bbb-apps:*", "bigbluebutton:from-voice-conf:*") - def props(jsonMsgBus: IncomingJsonMessageBus): Props = - Props(classOf[AppsRedisSubscriberActor], jsonMsgBus, + def props(system: ActorSystem, jsonMsgBus: IncomingJsonMessageBus): Props = + Props( + classOf[ScreenshareRedisSubscriberActor], + system, jsonMsgBus, redisHost, redisPort, channels, patterns).withDispatcher("akka.rediscala-subscriber-worker-dispatcher") } -class AppsRedisSubscriberActor(jsonMsgBus: IncomingJsonMessageBus, redisHost: String, - redisPort: Int, +class ScreenshareRedisSubscriberActor( + system: ActorSystem, + jsonMsgBus: IncomingJsonMessageBus, + redisHost: String, redisPort: Int, channels: Seq[String] = Nil, patterns: Seq[String] = Nil) - extends RedisSubscriberActor( - new InetSocketAddress(redisHost, redisPort), - channels, patterns, onConnectStatus = connected => { println(s"connected: $connected") }) with SystemConfiguration { - - override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { - case e: Exception => { - val sw: StringWriter = new StringWriter() - sw.write("An exception has been thrown on AppsRedisSubscriberActor, exception message [" + e.getMessage() + "] (full stacktrace below)\n") - e.printStackTrace(new PrintWriter(sw)) - log.error(sw.toString()) - Resume - } - } - - // Set the name of this client to be able to distinguish when doing - // CLIENT LIST on redis-cli - write(ClientSetname("BbbScreenshareAkkaSub").encodedRequest) - - def onMessage(message: Message) { - //log.error(s"SHOULD NOT BE RECEIVING: $message") - if (message.channel == fromAkkaAppsRedisChannel) { - val receivedJsonMessage = new ReceivedJsonMessage(message.channel, message.data.utf8String) - //log.debug(s"RECEIVED:\n [${receivedJsonMessage.channel}] \n ${receivedJsonMessage.data} \n") - jsonMsgBus.publish(IncomingJsonMessage(toScreenshareAppsJsonChannel, receivedJsonMessage)) - } - } + extends RedisSubscriberProvider(system, "BbbScreenshareAkkaSub", channels, patterns, jsonMsgBus) with SystemConfiguration { - def onPMessage(pmessage: PMessage) { - //log.debug(s"RECEIVED:\n ${pmessage.data.utf8String} \n") - } + addListener(toScreenshareAppsJsonChannel) + subscribe() } diff --git a/bbb-screenshare/app/src/main/webapp/WEB-INF/classes/application.conf b/bbb-screenshare/app/src/main/webapp/WEB-INF/classes/application.conf index 74183500ea75..3b72838eb4bb 100755 --- a/bbb-screenshare/app/src/main/webapp/WEB-INF/classes/application.conf +++ b/bbb-screenshare/app/src/main/webapp/WEB-INF/classes/application.conf @@ -10,7 +10,7 @@ akka { loggers = ["akka.event.slf4j.Slf4jLogger"] loglevel = "DEBUG" - rediscala-publish-worker-dispatcher { + redis-publish-worker-dispatcher { mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox" # Throughput defines the maximum number of messages to be # processed per actor before the thread jumps to the next actor. @@ -18,7 +18,7 @@ akka { throughput = 512 } - rediscala-subscriber-worker-dispatcher { + redis-subscriber-worker-dispatcher { mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox" # Throughput defines the maximum number of messages to be # processed per actor before the thread jumps to the next actor. diff --git a/bbb-screenshare/app/src/main/webapp/WEB-INF/classes/screenshare-app.conf b/bbb-screenshare/app/src/main/webapp/WEB-INF/classes/screenshare-app.conf index 7a46eb9c4520..32a843bfaf81 100755 --- a/bbb-screenshare/app/src/main/webapp/WEB-INF/classes/screenshare-app.conf +++ b/bbb-screenshare/app/src/main/webapp/WEB-INF/classes/screenshare-app.conf @@ -10,7 +10,7 @@ akka { loggers = ["akka.event.slf4j.Slf4jLoggerDDD"] loglevel = "DEBUG" - rediscala-publish-worker-dispatcher { + redis-publish-worker-dispatcher { mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox" # Throughput defines the maximum number of messages to be # processed per actor before the thread jumps to the next actor. @@ -18,7 +18,7 @@ akka { throughput = 512 } - rediscala-subscriber-worker-dispatcher { + redis-subscriber-worker-dispatcher { mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox" # Throughput defines the maximum number of messages to be # processed per actor before the thread jumps to the next actor. diff --git a/bigbluebutton-apps/src/main/webapp/WEB-INF/classes/application.conf b/bigbluebutton-apps/src/main/webapp/WEB-INF/classes/application.conf index 74183500ea75..3b72838eb4bb 100755 --- a/bigbluebutton-apps/src/main/webapp/WEB-INF/classes/application.conf +++ b/bigbluebutton-apps/src/main/webapp/WEB-INF/classes/application.conf @@ -10,7 +10,7 @@ akka { loggers = ["akka.event.slf4j.Slf4jLogger"] loglevel = "DEBUG" - rediscala-publish-worker-dispatcher { + redis-publish-worker-dispatcher { mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox" # Throughput defines the maximum number of messages to be # processed per actor before the thread jumps to the next actor. @@ -18,7 +18,7 @@ akka { throughput = 512 } - rediscala-subscriber-worker-dispatcher { + redis-subscriber-worker-dispatcher { mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox" # Throughput defines the maximum number of messages to be # processed per actor before the thread jumps to the next actor. diff --git a/bigbluebutton-web/grails-app/conf/application.conf b/bigbluebutton-web/grails-app/conf/application.conf index f5ba5981ea95..c6fc978954d0 100644 --- a/bigbluebutton-web/grails-app/conf/application.conf +++ b/bigbluebutton-web/grails-app/conf/application.conf @@ -10,7 +10,7 @@ akka { loggers = ["akka.event.slf4j.Slf4jLogger"] loglevel = "DEBUG" - rediscala-publish-worker-dispatcher { + redis-publish-worker-dispatcher { mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox" # Throughput defines the maximum number of messages to be # processed per actor before the thread jumps to the next actor. @@ -18,7 +18,7 @@ akka { throughput = 512 } - rediscala-subscriber-worker-dispatcher { + redis-subscriber-worker-dispatcher { mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox" # Throughput defines the maximum number of messages to be # processed per actor before the thread jumps to the next actor.