Skip to content

Commit

Permalink
fix: cleanup mcs-core rooms by tying them to akka-apps lifecycle
Browse files Browse the repository at this point in the history
Should avoid stuck rooms/users on the majority of scenarios
  • Loading branch information
prlanzarin committed Feb 14, 2022
1 parent 788ac8f commit ee2fd63
Show file tree
Hide file tree
Showing 12 changed files with 133 additions and 27 deletions.
4 changes: 4 additions & 0 deletions config/custom-environment-variables.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ mcs-port: MCS_PORT
mcs-host: MCS_HOST
mcs-address: MCS_ADDRESS
mcsUserEjectionTimer: MCS_USER_EJECTION_TIMER
mcsRoomDestructionTimer: MCS_ROOM_DESTRUCTION_TIMER
destroyRoomOnEject:
__name: DESTROY_ROOM_ON_EJECT
__format: json

freeswitch:
ip: FREESWITCH_CONN_IP
Expand Down
3 changes: 3 additions & 0 deletions config/default.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ redisExpireTime: 1209600
# Time to wait before ejecting an user which has the autoLeave flag marked
# as true and has no more media sessions under its ownership
mcsUserEjectionTimer: 60000
# Time to wait before destroying a room (on all scenarios)
mcsUserEjectionTimer: 90000
# mcs-core entrypoint configured on nginx
mcs-path: /mcs
mcs-port: 3010
Expand Down Expand Up @@ -256,6 +258,7 @@ mediaThresholds:
allowDuplicateExtUserId: true
ejectOnUserLeft: true
permissionProbes: true
destroyRoomOnEject: true

# Direct Prometheus instrumentation. EXPERIMENTAL, so disabled by default.
prometheus:
Expand Down
1 change: 1 addition & 0 deletions lib/bbb/messages/Constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
TO_LISTEN_ONLY: 'to-sfu-listen-only',
FROM_SFU: 'from-sfu-redis-channel',
TO_SFU: 'to-sfu-redis-channel',
TO_VOICE_CONF: 'to-voice-conf-redis-channel',

// RedisWrapper events
REDIS_MESSAGE : "redis_message",
Expand Down
41 changes: 41 additions & 0 deletions lib/main/janitor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
const config = require('config');
const RedisGateway = (new (require('../bbb/pubsub/bbb-gw')));
const MCSAgent = require('./mcs-agent.js');
const Logger = require('../common/logger.js');

const { TO_VOICE_CONF } = require('../bbb/messages/Constants');
const DESTROY_ROOM_ON_EJECT = config.has('destroyRoomOnEject')
? config.get('destroyRoomOnEject')
: false;
const EJECT_ALL_FROM_VOICE_CONF = 'EjectAllFromVoiceConfMsg';

const _destroyRoomOnEjectAllFromVoiceConf = () => {
if (!DESTROY_ROOM_ON_EJECT) return;

RedisGateway.on(EJECT_ALL_FROM_VOICE_CONF, ({ body }) => {
const { voiceConf } = body;

if (voiceConf) {
MCSAgent.destroyRoom(voiceConf).then(() => {
Logger.info('[main-janitor] Requested room destruction on EjectAllFromVoiceConfMsg', {
voiceConf,
});
}).catch(error => {
Logger.error('[main-janitor] Room destruction on EjectAllFromVoiceConfMsg failed', {
voiceConf, errorMessage: error.message, errorCode: error.code,
});
});
}
});
};


const clockIn = () => {
RedisGateway.addSubscribeChannel(TO_VOICE_CONF);
_destroyRoomOnEjectAllFromVoiceConf();
};


module.exports = {
clockIn,
}
16 changes: 16 additions & 0 deletions lib/main/mcs-agent.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
const config = require('config');
const MCS = require('../base/MCSAPIWrapper.js');
const Logger = require('../common/logger.js');

const MCS_ADDRESS = config.get("mcs-address");
const MCS_PORT = config.get("mcs-port");

const mcs = new MCS()

mcs.start(MCS_ADDRESS, MCS_PORT).catch(error => {
Logger.error('[main-process] Failed to establish MCS connection', {
errorMessage: error.message, errorCode: error.code,
});
})

module.exports = mcs;
10 changes: 5 additions & 5 deletions lib/mcs-core/lib/adapters/kurento/kurento.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ module.exports = class Kurento extends EventEmitter {
this._pipelinePromises = [];
this._transposingQueue = [];
this.balancer.on(C.EVENT.MEDIA_SERVER_OFFLINE, this._destroyElementsFromHost.bind(this));
this._globalEmitter.on(C.EVENT.ROOM_EMPTY, this._releaseAllRoomPipelines.bind(this));
this._globalEmitter.on(C.EVENT.ROOM_DESTROYED, this._releaseAllRoomPipelines.bind(this));
instance = this;
}

Expand Down Expand Up @@ -139,11 +139,11 @@ module.exports = class Kurento extends EventEmitter {
}
}

_releaseAllRoomPipelines (room) {
_releaseAllRoomPipelines ({ roomId }) {
try {
if (this._mediaPipelines[room]) {
Object.keys(this._mediaPipelines[room]).forEach(async pk => {
await this._releasePipeline(room, pk);
if (this._mediaPipelines[roomId]) {
Object.keys(this._mediaPipelines[roomId]).forEach(async pk => {
await this._releasePipeline(roomId, pk);
});
}
} catch (e) {
Expand Down
10 changes: 5 additions & 5 deletions lib/mcs-core/lib/adapters/loopback/tainted-loopback.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ module.exports = class TaintedLoopbackAdapter extends EventEmitter {
this._mediaElements = {};
this._pipelinePromises = [];
this.balancer.on(C.EVENT.MEDIA_SERVER_OFFLINE, this._destroyElementsFromHost.bind(this));
this._globalEmitter.on(C.EVENT.ROOM_EMPTY, this._releaseAllRoomPipelines.bind(this));
this._globalEmitter.on(C.EVENT.ROOM_DESTROYED, this._releaseAllRoomPipelines.bind(this));
this._bogusHost = {
id: uuidv4(),
ip: '198.51.100.13',
Expand Down Expand Up @@ -119,11 +119,11 @@ module.exports = class TaintedLoopbackAdapter extends EventEmitter {
}
}

_releaseAllRoomPipelines (room) {
_releaseAllRoomPipelines ({ roomId }) {
try {
if (this._mediaPipelines[room]) {
Object.keys(this._mediaPipelines[room]).forEach(async pk => {
await this._releasePipeline(room, pk);
if (this._mediaPipelines[roomId]) {
Object.keys(this._mediaPipelines[roomId]).forEach(async pk => {
await this._releasePipeline(roomId, pk);
});
}
} catch (e) {
Expand Down
2 changes: 1 addition & 1 deletion lib/mcs-core/lib/adapters/mediasoup/mediasoup-adapter.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ module.exports = class MediasoupAdapter extends EventEmitter {
this.name = name;
this.balancer = balancer;
this._globalEmitter = GLOBAL_EVENT_EMITTER;
this._globalEmitter.on(C.EVENT.ROOM_EMPTY, Routers.releaseAllRoutersWithIdSuffix.bind(this));
this._globalEmitter.on(C.EVENT.ROOM_DESTROYED, Routers.releaseAllRoutersWithIdSuffix.bind(this));
Workers.createWorkers(
NOF_WORKERS,
WORKER_SETTINGS,
Expand Down
2 changes: 1 addition & 1 deletion lib/mcs-core/lib/adapters/mediasoup/routers.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ const releaseRouter = (routerId) => {
}

// TODO refactor: why are we iterating over the whole map...
const releaseAllRoutersWithIdSuffix = (routerIdSuffix) => {
const releaseAllRoutersWithIdSuffix = ({ roomId: routerIdSuffix }) => {
ROUTER_STORAGE.forEach(async (router, routerId) => {
const targetSuffix = getRouterIdSuffix(routerId);
if (targetSuffix === routerIdSuffix) {
Expand Down
33 changes: 23 additions & 10 deletions lib/mcs-core/lib/media/media-controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class MediaControllerC {
AdapterFactory.getAdapters({});

GLOBAL_EVENT_EMITTER.on(C.EVENT.ROOM_EMPTY, this.removeRoom.bind(this));
GLOBAL_EVENT_EMITTER.on(C.EVENT.ROOM_DESTROYED, this._handleRoomDestroyed.bind(this));
GLOBAL_EVENT_EMITTER.on(C.EVENT.CONFERENCE_NEW_VIDEO_FLOOR, this._handleNewVideoFloor.bind(this));
// FIXME remove this once all audio goes through mcs-core's API
GLOBAL_EVENT_EMITTER.on(C.EVENT.MEDIA_EXTERNAL_AUDIO_CONNECTED, this._handleExternalAudioMediaConnected.bind(this));
Expand Down Expand Up @@ -122,10 +123,11 @@ class MediaControllerC {
}

leave (roomId, userId) {
let room;
const user = this.getUser(userId);
const room = this.getRoom(user.roomId);

if (user) {
room = this.getRoom(user.roomId);
const { id: userId, externalUserId } = user;
user.leave().then((killedMedias) => {
Logger.info(LOG_PREFIX, "User left", { userId, externalUserId });
Expand Down Expand Up @@ -509,22 +511,33 @@ class MediaControllerC {
return this.rooms.has(userId);
}

_handleRoomDestroyed ({ roomId }) {
const room = this.getRoom(roomId);

if (room) {
room.getUsers().forEach(user => {
this.leave(roomId, user.userId);
});
this.rooms.delete(roomId);
Logger.info(LOG_PREFIX, "Room destroyed", { roomId });
}

MCSPrometheusAgent.set(METRIC_NAMES.ROOMS, this.getNumberOfRooms());
}

removeRoom (roomId) {
let removed = false;
try {
const room = this.getRoom(roomId);
if (room == null) return true;

this.emitter.emit(C.EVENT.ROOM_DESTROYED, room.getInfo());
room.destroy();
removed = this.rooms.delete(roomId)

if (removed) {
MCSPrometheusAgent.set(METRIC_NAMES.ROOMS, this.getNumberOfRooms());
Logger.info(LOG_PREFIX, "Room destroyed", { roomId });
if (room == null) {
this.emitter.emit(C.EVENT.ROOM_DESTROYED, Room.ROOM_INFO(roomId));
return true;
}

return removed;
room.destroy();

return true;
} catch (error) {
Logger.error(LOG_PREFIX, "CRITICAL: Room deletion failed",
{ roomId, errorMessage: error.message, errorCode: error.code });
Expand Down
36 changes: 31 additions & 5 deletions lib/mcs-core/lib/model/room.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,19 @@ const { v4: uuidv4 } = require('uuid');
const { perRoom: ROOM_MEDIA_THRESHOLD } = config.get('mediaThresholds');
const LOG_PREFIX = "[mcs-room]";
const MAX_PREVIOUS_FLOORS = 10;
const MCS_ROOM_DESTRUCTION_TIMER = config.has('mcsRoomDestructionTimer')
? config.get('mcsRoomDestructionTimer')
: 90000; // 01m30s


module.exports = class Room {
static ROOM_INFO (roomId) {
return {
memberType: C.MEMBERS.ROOM,
roomId,
};
}

constructor (id = uuidv4()) {
this.id = id;
this.users = {};
Expand All @@ -31,10 +42,7 @@ module.exports = class Room {
}

getInfo () {
return {
memberType: C.MEMBERS.ROOM,
roomId: this.id,
};
return Room.ROOM_INFO(this.id);
}

getMediaInfos () {
Expand Down Expand Up @@ -71,6 +79,7 @@ module.exports = class Room {
const found = user.id in this.users;
if (!found) {
this.users[user.id] = user;
if (this.destructionRoutine) this._clearEjectionTimeout();
GLOBAL_EVENT_EMITTER.emit(C.EVENT.USER_JOINED, { roomId: this.id, user: user.getUserInfo() });
}
}
Expand Down Expand Up @@ -298,15 +307,32 @@ module.exports = class Room {
if (this.users[userId]) {
delete this.users[userId];
if (Object.keys(this.users).length <= 0) {
this._setupDestructionRoutine();
GLOBAL_EVENT_EMITTER.emit(C.EVENT.ROOM_EMPTY, this.id);
}
}
}

destroy () {
_clearDestructionRoutine () {
clearTimeout(this.destructionRoutine);
this.destructionRoutine = null;
}

_setupDestructionRoutine () {
if (this.destructionRoutine == null) {
this.destructionRoutine = setTimeout(this._destroy.bind(this), MCS_ROOM_DESTRUCTION_TIMER);
}
}

_destroy () {
this._registeredEvents.forEach(({ event, callback }) => {
GLOBAL_EVENT_EMITTER.removeListener(event, callback);
});
this._registeredEvents = [];
GLOBAL_EVENT_EMITTER.emit(C.EVENT.ROOM_DESTROYED, this.getInfo());
}

destroy () {
this._setupDestructionRoutine();
}
}
2 changes: 2 additions & 0 deletions server.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const config = require('config');
const WebsocketConnectionManager = require('./lib/main/ws-connection-manager.js');
const ConnectionManager = require('./lib/main/connection-manager.js');
const SFUModuleManager = require('./lib/main/sfu-module-manager.js');
const Janitor = require('./lib/main/janitor.js');

const HTTP_SERVER_HOST = config.has('clientHost') ? config.get('clientHost') : '127.0.0.1';
const HTTP_SERVER_PORT = config.get('clientPort');
Expand All @@ -23,3 +24,4 @@ const CM = new ConnectionManager();
SFUModuleManager.start();
CM.setupModuleRouting(SFUModuleManager.modules);
CM.addAdapter(WSManager);
Janitor.clockIn();

0 comments on commit ee2fd63

Please sign in to comment.