Skip to content

Commit

Permalink
Re-added redis connection pooling and running subscribers and publish…
Browse files Browse the repository at this point in the history
…ers inside executors.
  • Loading branch information
GhaziTriki committed Nov 30, 2018
1 parent 6b20807 commit 7e736d4
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 84 deletions.
5 changes: 4 additions & 1 deletion bbb-common-message/project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ object Dependencies {
val jackson = "2.9.7"
val sl4j = "1.7.25"
val red5 = "1.0.10-M5"
val pool = "2.6.0"

// Redis
val redisScala = "1.8.0"
val lettuce = "5.1.2.RELEASE"
val lettuce = "5.1.3.RELEASE"

// Test
val scalaTest = "3.0.5"
Expand All @@ -37,6 +38,7 @@ object Dependencies {
val jacksonModule = "com.fasterxml.jackson.module" %% "jackson-module-scala" % Versions.jackson
val sl4jApi = "org.slf4j" % "slf4j-api" % Versions.sl4j % "runtime"
val red5 = "org.red5" % "red5-server-common" % Versions.red5
val apachePool2 = "org.apache.commons" % "commons-pool2" % Versions.pool

val redisScala = "com.github.etaty" % "rediscala_2.12" % Versions.redisScala
val lettuceCore = "io.lettuce" % "lettuce-core" % Versions.lettuce
Expand All @@ -63,6 +65,7 @@ object Dependencies {
Compile.jacksonModule,
Compile.sl4jApi,
Compile.red5,
Compile.apachePool2,
Compile.lettuceCore,
Compile.redisScala) ++ testing
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.bigbluebutton.common2.redis;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import io.lettuce.core.RedisClient;

public abstract class RedisAwareCommunicator {
Expand Down Expand Up @@ -49,4 +51,19 @@ public void setHost(String host) {
public void setPort(int port) {
this.port = port;
}

protected GenericObjectPoolConfig createPoolingConfig() {
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setMaxTotal(32);
config.setMaxIdle(8);
config.setMinIdle(1);
config.setTestOnBorrow(true);
config.setTestOnReturn(true);
config.setTestWhileIdle(true);
config.setNumTestsPerEvictionRun(12);
config.setMaxWaitMillis(5000);
config.setTimeBetweenEvictionRunsMillis(60000);
config.setBlockWhenExhausted(true);
return config;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.Map;

import org.apache.commons.pool2.impl.GenericObjectPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -29,18 +30,18 @@
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.support.ConnectionPoolSupport;

public class RedisStorageService extends RedisAwareCommunicator {

private static Logger log = LoggerFactory.getLogger(RedisStorageService.class);

private long expireKey;
private int expireKey;

RedisCommands<String, String> commands;
private StatefulRedisConnection<String, String> connection;
GenericObjectPool<StatefulRedisConnection<String, String>> connectionPool;

public void start() {
log.info("Starting RedisStorageService");
log.info("Starting RedisStorageService with client name: {}", clientName);
RedisURI redisUri = RedisURI.Builder.redis(this.host, this.port).withClientName(this.clientName).build();
if (!this.password.isEmpty()) {
redisUri.setPassword(this.password);
Expand All @@ -49,12 +50,12 @@ public void start() {
redisClient = RedisClient.create(redisUri);
redisClient.setOptions(ClientOptions.builder().autoReconnect(true).build());

connection = redisClient.connect();
commands = connection.sync();
connectionPool = ConnectionPoolSupport.createGenericObjectPool(() -> redisClient.connectPubSub(),
createPoolingConfig());
}

public void stop() {
connection.close();
connectionPool.close();
redisClient.shutdown();
log.info("RedisStorageService Stopped");
}
Expand All @@ -71,51 +72,89 @@ public void recordBreakoutInfo(String meetingId, Map<String, String> breakoutInf

public void addBreakoutRoom(String parentId, String breakoutId) {
log.debug("Saving breakout room for meeting {}", parentId);
commands.sadd(Keys.BREAKOUT_ROOMS + parentId, breakoutId);
try (StatefulRedisConnection<String, String> connection = connectionPool.borrowObject()) {
RedisCommands<String, String> commands = connection.sync();
commands.sadd(Keys.BREAKOUT_ROOMS + parentId, breakoutId);
} catch (Exception e) {
log.error("Cannot add breakout room data: {}", parentId, e);
} finally {
connectionPool.close();
}
}

public void record(String meetingId, Map<String, String> event) {
log.debug("Recording meeting event {} inside a transaction", meetingId);
commands.multi();
Long msgid = commands.incr("global:nextRecordedMsgId");
commands.hmset("recording:" + meetingId + ":" + msgid, event);
commands.rpush("meeting:" + meetingId + ":" + "recordings", Long.toString(msgid));
commands.exec();
try (StatefulRedisConnection<String, String> connection = connectionPool.borrowObject()) {
RedisCommands<String, String> commands = connection.sync();
commands.multi();
Long msgid = commands.incr("global:nextRecordedMsgId");
commands.hmset("recording:" + meetingId + ":" + msgid, event);
commands.rpush("meeting:" + meetingId + ":" + "recordings", Long.toString(msgid));
commands.exec();
} catch (Exception e) {
log.debug("Cannot record meeting data: {}", meetingId, e);
} finally {
connectionPool.close();
}
}

// @fixme: not used anywhere
public void removeMeeting(String meetingId) {
log.debug("Removing meeting meeting {} inside a transaction", meetingId);
commands.multi();
commands.del(Keys.MEETING + meetingId);
commands.srem(Keys.MEETINGS + meetingId);
commands.exec();
try (StatefulRedisConnection<String, String> connection = connectionPool.borrowObject()) {
RedisCommands<String, String> commands = connection.sync();
commands.multi();
commands.del(Keys.MEETING + meetingId);
commands.srem(Keys.MEETINGS + meetingId);
commands.exec();
} catch (Exception e) {
log.debug("Cannot remove meeting data : {}", meetingId, e);
} finally {
connectionPool.close();
}
}

public void recordAndExpire(String meetingId, Map<String, String> event) {
log.debug("Recording meeting event {} inside a transaction", meetingId);
commands.multi();

Long msgid = commands.incr("global:nextRecordedMsgId");
commands.hmset("recording:" + meetingId + ":" + msgid, event);
commands.rpush("meeting:" + meetingId + ":recordings", Long.toString(msgid));
/**
* We set the key to expire after 14 days as we are still recording the
* event into redis even if the meeting is not recorded. (ralam sept 23,
* 2015)
*/
commands.expire("meeting:" + meetingId + ":recordings", expireKey);
commands.rpush("meeting:" + meetingId + ":recordings", Long.toString(msgid));
commands.expire("meeting:" + meetingId + ":recordings", expireKey);
commands.exec();
try (StatefulRedisConnection<String, String> connection = connectionPool.borrowObject()) {
RedisCommands<String, String> commands = connection.sync();
commands.multi();

Long msgid = commands.incr("global:nextRecordedMsgId");
commands.hmset("recording:" + meetingId + ":" + msgid, event);
commands.rpush("meeting:" + meetingId + ":recordings", Long.toString(msgid));
/**
* We set the key to expire after 14 days as we are still recording
* the event into redis even if the meeting is not recorded. (ralam
* sept 23, 2015)
*/
commands.expire("meeting:" + meetingId + ":recordings", expireKey);
commands.rpush("meeting:" + meetingId + ":recordings", Long.toString(msgid));
commands.expire("meeting:" + meetingId + ":recordings", expireKey);
commands.exec();
} catch (Exception e) {
log.error("Cannot record data with expire: {}", meetingId, e);
} finally {
connectionPool.close();
}
}

public void setExpireKey(long expireKey) {
public void setExpireKey(int expireKey) {
this.expireKey = expireKey;
}

private String recordMeeting(String key, Map<String, String> info) {
return commands.hmset(key, info);
}
log.debug("Storing metadata {}", info);
String result = "";
try (StatefulRedisConnection<String, String> connection = connectionPool.borrowObject()) {
RedisCommands<String, String> commands = connection.sync();
result = commands.hmset(key, info);
} catch (Exception e) {
log.debug("Cannot record data with expire: {}", key, e);
} finally {
connectionPool.close();
}
return result;

}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package org.bigbluebutton.common2.redis.pubsub;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import org.apache.commons.pool2.impl.GenericObjectPool;
import org.bigbluebutton.common2.redis.RedisAwareCommunicator;
import org.red5.logging.Red5LoggerFactory;
import org.slf4j.Logger;
Expand All @@ -11,13 +15,16 @@
import io.lettuce.core.pubsub.RedisPubSubListener;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
import io.lettuce.core.support.ConnectionPoolSupport;

public class MessageReceiver extends RedisAwareCommunicator {
private static Logger log = Red5LoggerFactory.getLogger(MessageReceiver.class, "video");

private ReceivedMessageHandler handler;

private StatefulRedisPubSubConnection<String, String> connection;
GenericObjectPool<StatefulRedisPubSubConnection<String, String>> connectionPool;

private final Executor runExec = Executors.newSingleThreadExecutor();

private volatile boolean receiveMessage = false;

Expand All @@ -34,23 +41,33 @@ public void start() {

redisClient = RedisClient.create(redisUri);
redisClient.setOptions(ClientOptions.builder().autoReconnect(true).build());
connection = redisClient.connectPubSub();

try {
if (receiveMessage) {
connection.addListener(new MessageListener());

RedisPubSubAsyncCommands<String, String> async = connection.async();
RedisFuture<Void> future = async.subscribe(FROM_BBB_APPS_PATTERN);
connectionPool = ConnectionPoolSupport.createGenericObjectPool(() -> redisClient.connectPubSub(),
createPoolingConfig());

Runnable messageReceiver = new Runnable() {
public void run() {
if (receiveMessage) {
try (StatefulRedisPubSubConnection<String, String> connection = connectionPool.borrowObject()) {
if (receiveMessage) {
connection.addListener(new MessageListener());

RedisPubSubAsyncCommands<String, String> async = connection.async();
RedisFuture<Void> future = async.subscribe(FROM_BBB_APPS_PATTERN);
}
} catch (Exception e) {
log.error("Error resubscribing to channels: ", e);
}
}
}
} catch (Exception e) {
log.error("Error resubscribing to channels: ", e);
}
};

runExec.execute(messageReceiver);
}

public void stop() {
receiveMessage = false;
connection.close();
connectionPool.close();
redisClient.shutdown();
log.info("MessageReceiver Stopped");
}
Expand All @@ -69,7 +86,13 @@ public void message(String channel, String message) {
@Override
public void message(String pattern, String channel, String message) {
log.debug("RECEIVED onPMessage" + channel + " message=\n" + message);
handler.handleMessage(pattern, channel, message);
Runnable task = new Runnable() {
public void run() {
handler.handleMessage(pattern, channel, message);
}
};

runExec.execute(task);
}

@Override
Expand Down
Loading

0 comments on commit 7e736d4

Please sign in to comment.