Skip to content

Commit

Permalink
fix: reorganize module forking to work around FIFO sched issues
Browse files Browse the repository at this point in the history
On Node.js >= 20, the application may get stuck at 100% CPU
usage on boot whenever its under the FIFO CPU scheduling policy.
This has been isolated down to the node-redis library. tl;dr:
prototype overriding on forked processes + FIFO is causing this, but
it's most likely a Node.js bug rather than the dep's.

While the source issue hasn't been fixed, module spawning (i.e.:
process forking) has been re-organized to work around that issue:
  - Make module spawning sequential and synchronous. Start them
    one-by-one, failures means an application boot failure.
  - Add a configurable minimum delay between process forks. Default: 1s.
    See moduleSpawnDelayMs.
  - Change module start triggers to something realistic (i.e.:
    subprocess notifying that they're ready via IPC)

The trade-off is that the boot sequence now takes longer. While this is
a workaround, most of the changes are good nonetheless. Once the Node.js
issue is fixed, the likely outcome is to reduce the spawn delay to 0.
  • Loading branch information
prlanzarin committed Nov 19, 2024
1 parent b6a74cf commit 26dd64d
Show file tree
Hide file tree
Showing 10 changed files with 146 additions and 72 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ For previous changes, see the [release notes](https://github.com/bigbluebutton/b

* feat(livekit): sync screen share state with BBB
* fix(core): onEvent is not a transaction, treat is as such
* fix: reorganize module forking to work around FIFO sched issues
* build: [email protected]
* build: [email protected]

Expand Down
1 change: 1 addition & 0 deletions config/custom-environment-variables.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ recordingMediaProfile: RECORDING_MEDIA_PROFILE
recordingFormat: RECORDING_FORMAT
recordingAdapter: RECORDING_ADAPTER

moduleSpawnDelayMs: MODULE_SPAWN_DELAY_MS
modules:
__name: SFU_MODULES
__format: json
Expand Down
1 change: 1 addition & 0 deletions config/default.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ log:
# Whether to log to a file
file: true
filename: /var/log/bbb-webrtc-sfu/bbb-webrtc-sfu.log
moduleSpawnDelayMs: 1000
modules:
- path: ./lib/mcs-core/process.js
name: core
Expand Down
2 changes: 2 additions & 0 deletions lib/base/base-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ module.exports = class BaseManager {
await this.mcs.start(MCS_ADDRESS, MCS_PORT);
this.mcsStarted = true;
}

process.send({ type: 'ready' });
} catch (error) {
Logger.error('Manager: cannot connect to Redis channel', {
errorMessage: error.message,
Expand Down
10 changes: 7 additions & 3 deletions lib/main/janitor.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const config = require('config');
const RedisGateway = (new (require('../bbb/pubsub/bbb-gw')));
const MCSAgent = require('./mcs-agent.js');
const { mcs, startAgent } = require('./mcs-agent.js');
const Logger = require('../common/logger.js');

const { TO_VOICE_CONF } = require('../bbb/messages/Constants');
Expand All @@ -16,7 +16,7 @@ const _destroyRoomOnEjectAllFromVoiceConf = () => {
const { voiceConf } = body;

if (voiceConf) {
MCSAgent.destroyRoom(voiceConf).then(() => {
mcs.destroyRoom(voiceConf).then(() => {
Logger.info('Janitor: requested room destruction on EjectAllFromVoiceConfMsg', {
voiceConf,
});
Expand All @@ -31,7 +31,11 @@ const _destroyRoomOnEjectAllFromVoiceConf = () => {

const clockIn = () => {
RedisGateway.addSubscribeChannel(TO_VOICE_CONF);
_destroyRoomOnEjectAllFromVoiceConf();

startAgent().then(() => {
Logger.info('Janitor: MCS connection established');
_destroyRoomOnEjectAllFromVoiceConf();
});
};

module.exports = {
Expand Down
17 changes: 11 additions & 6 deletions lib/main/mcs-agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,15 @@ const MCS_PORT = config.get("mcs-port");

const mcs = new MCS()

mcs.start(MCS_ADDRESS, MCS_PORT).catch(error => {
Logger.error('Failed to establish MCS connection', {
errorMessage: error.message, errorCode: error.code,
});
})
const startAgent = () => {
return mcs.start(MCS_ADDRESS, MCS_PORT).catch(error => {
Logger.error('Failed to establish MCS connection', {
errorMessage: error.message, errorCode: error.code,
});
})
};

module.exports = mcs;
module.exports = {
mcs,
startAgent,
};
97 changes: 64 additions & 33 deletions lib/main/process-wrapper.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const Logger = require('../common/logger.js');
const { setProcessPriority } = require('../common/utils.js');

const PROCESS_RESPAWN_DELAY = 3000;
const PROCESS_RESOLUTION_TIMEOUT = 5000;
const { PrometheusAgent, SFUM_NAMES } = require('./metrics/main-metrics.js');

module.exports = class ProcessWrapper extends EventEmitter {
Expand Down Expand Up @@ -37,6 +38,7 @@ module.exports = class ProcessWrapper extends EventEmitter {
this.routingAliases = routingAliases;
this.priority = priority;
this.runningState = "RUNNING";
this._onMessage = this._onMessage.bind(this);
}

set onmessage (callback) {
Expand All @@ -56,7 +58,7 @@ module.exports = class ProcessWrapper extends EventEmitter {

_onMessage (message) {
Logger.info("Received message from forked process",
{ pid: this.process.pid, message });
{ pid: this.process?.pid, message, path: this.path });
}

onError (error) {
Expand All @@ -65,38 +67,67 @@ module.exports = class ProcessWrapper extends EventEmitter {
}

start () {
const childEnv = {
...process.env,
SFU_MODULE_NAME: this.name,
SFU_IPC_MODE: this.ipcMode,
SFU_IPC_OPTS: this.options,
SFU_MODULE_PATH: this.path,
}

this.process = cp.fork(this.path, {
// Pass over all of the environment.
env: childEnv,
// Share stdout/stderr, so we can hear the inevitable errors.
silent: false
});

this.process.on('message', this._onMessage);
this.process.on('error', this.onError.bind(this));
this.process.on('exit', (...args) => {
this.emit('exit', ...args);
});

if (this.priority != null) setProcessPriority(this.process.pid, this.priority);

PrometheusAgent.set(SFUM_NAMES.MODULE_STATUS, 1, {
module: this.name,
});

Logger.info("New module process forked", {
name: this.name,
ipc: this.ipcMode,
path: this.path,
pid: this.process.pid,
return new Promise((resolve, reject) => {
try {
const resolutionTimeout = setTimeout(() => {
Logger.warn("Module process startup timeout, might be unresponsive", {
name: this.name,
path: this.path,
ipc: this.ipcMode,
});

success();
}, PROCESS_RESOLUTION_TIMEOUT);
const success = () => {
clearTimeout(resolutionTimeout);
PrometheusAgent.set(SFUM_NAMES.MODULE_STATUS, 1, {
module: this.name,
});
Logger.info("New module process forked", {
name: this.name,
ipc: this.ipcMode,
path: this.path,
pid: this.process.pid,
});
resolve();
};

const childEnv = {
...process.env,
SFU_MODULE_NAME: this.name,
SFU_IPC_MODE: this.ipcMode,
SFU_IPC_OPTS: this.options,
SFU_MODULE_PATH: this.path,
}

this.process = cp.fork(this.path, {
// Pass over all of the environment.
env: childEnv,
// Share stdout/stderr, so we can hear the inevitable errors.
silent: false
});

this.process.on('error', this.onError.bind(this));
this.process.on('exit', (...args) => {
this.emit('exit', ...args);
if (resolutionTimeout) clearTimeout(resolutionTimeout);
reject(new Error("Process exited"));
});
this.process.once('message', () => {
// Any message is a sign of successful startup - so modules should
// send anything to signal they are ready (recommended: { type: 'ready' }).
success();
});
this.process.on('message', this._onMessage);

if (this.priority != null) setProcessPriority(this.process.pid, this.priority);
} catch (error) {
Logger.error("Failed to start module process", {
name: this.name,
ipc: this.ipcMode,
});
reject(error);
}
});
}

Expand Down
73 changes: 47 additions & 26 deletions lib/main/sfu-module-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@

const Logger = require('../common/logger.js');
const config = require('config');
const MODULES = config.get('modules');
const ProcessWrapper = require('./process-wrapper.js');
const { PrometheusAgent, SFUM_NAMES } = require('./metrics/main-metrics.js');

const MODULES = config.get('modules');
const MODULE_SPAWN_DELAY = config.has('moduleSpawnDelayMs')
? config.get('moduleSpawnDelayMs')
: 0;
const UNEXPECTED_TERMINATION_SIGNALS = ['SIGABRT', 'SIGBUS', 'SIGSEGV', 'SIGILL'];

class SFUModuleManager {
Expand All @@ -21,32 +24,43 @@ class SFUModuleManager {
this.runningState = "RUNNING";
}

start () {
// Start the rest of the preconfigured SFU modules
for (let i = 0; i < MODULES.length; i++) {
let {
enabled = true,
name,
path,
routingAliases,
ipc,
priority,
} = MODULES[i];

if (!enabled) {
Logger.info(`Module ${name} is disabled, skipping`);
continue;
async start () {
const spawners = MODULES.map((module) => {
return async () => {
try {
let {
enabled = true,
name,
path,
routingAliases,
ipc,
priority,
} = module;

if (!enabled) {
Logger.info(`Module ${name} is disabled, skipping`);
Promise.resolve();
return;
}

let proc = new ProcessWrapper(name, path, ipc.mode, {
ipcOptions: ipc.options,
routingAliases,
priority,
});
await proc.start();
this.trackModuleShutdown(proc);
this.modules[proc.name] = proc;
} catch (error) {
Logger.error("Failed to start module", {
module: module.name,
errorMessage: error.message,
errorStack: error?.stack,
});
throw error;
}
}

let proc = new ProcessWrapper(name, path, ipc.mode, {
ipcOptions: ipc.options,
routingAliases,
priority,
});
proc.start();
this.trackModuleShutdown(proc);
this.modules[proc.name] = proc;
}
});

process.on('SIGTERM', async () => {
await this.stopModules();
Expand All @@ -73,6 +87,13 @@ class SFUModuleManager {
process.on('unhandledRejection', (reason) => {
Logger.error("CRITICAL: Unhandled promise rejection", { reason: reason.toString() });
});

const waitFor = (ms) => new Promise((resolve) => setTimeout(resolve, ms));

for (const spawner of spawners) {
await spawner();
await waitFor(MODULE_SPAWN_DELAY);
}
}

trackModuleShutdown (proc) {
Expand Down
1 change: 1 addition & 0 deletions lib/mcs-core/process.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class CoreProcess extends BaseProcess {
});

mcsServer.on('connection', this.app.setupClient.bind(this.app));
process.send({ type: 'ready' });
}

handleException (error) {
Expand Down
15 changes: 11 additions & 4 deletions server.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const WebsocketConnectionManager = require('./lib/main/ws-connection-manager.js'
const ConnectionManager = require('./lib/main/connection-manager.js');
const SFUModuleManager = require('./lib/main/sfu-module-manager.js');
const Janitor = require('./lib/main/janitor.js');
const Logger = require('./lib/common/logger.js');

const HTTP_SERVER_HOST = config.has('clientHost') ? config.get('clientHost') : '127.0.0.1';
const HTTP_SERVER_PORT = config.get('clientPort');
Expand All @@ -21,7 +22,13 @@ const WSManager = new WebsocketConnectionManager(

const CM = new ConnectionManager();

SFUModuleManager.start();
CM.setupModuleRouting(SFUModuleManager.modules);
CM.addAdapter(WSManager);
Janitor.clockIn();
SFUModuleManager.start().then(() => {
CM.setupModuleRouting(SFUModuleManager.modules);
CM.addAdapter(WSManager);
Janitor.clockIn();
}).catch((error) => {
Logger.error('Failed to start SFU Module Manager', error);
SFUModuleManager.stopModules().then(() => {
process.exit(1);
});
});

0 comments on commit 26dd64d

Please sign in to comment.