From 6ad0f494bba622df1eeac9f5a6e48ff44a6b12cd Mon Sep 17 00:00:00 2001 From: TSO Date: Tue, 19 Nov 2024 11:48:01 +0100 Subject: [PATCH 01/11] [IMP] add recording feature --- src/models/channel.js | 4 + src/models/recorder.js | 187 +++++++++++++++++++++++++++++++++++++++++ src/services/http.js | 47 ++++++++++- src/shared/enums.js | 6 ++ 4 files changed, 243 insertions(+), 1 deletion(-) create mode 100644 src/models/recorder.js diff --git a/src/models/channel.js b/src/models/channel.js index 2af4d70..b3f718f 100644 --- a/src/models/channel.js +++ b/src/models/channel.js @@ -5,6 +5,7 @@ import { getAllowedCodecs, Logger } from "#src/utils/utils.js"; import { AuthenticationError, OvercrowdedError } from "#src/utils/errors.js"; import { Session, SESSION_CLOSE_CODE } from "#src/models/session.js"; import { getWorker } from "#src/services/rtc.js"; +import { Recorder } from "#src/models/recorder.js"; const logger = new Logger("CHANNEL"); @@ -39,6 +40,7 @@ export class Channel extends EventEmitter { name; /** @type {WithImplicitCoercion} base 64 buffer key */ key; + recorder; /** @type {import("mediasoup").types.Router}*/ router; /** @type {Map} */ @@ -130,6 +132,7 @@ export class Channel extends EventEmitter { this.name = `${remoteAddress}*${this.uuid.slice(-5)}`; this.router = router; this._worker = worker; + this.recorder = new Recorder(this); this._onSessionClose = this._onSessionClose.bind(this); } @@ -249,6 +252,7 @@ export class Channel extends EventEmitter { } clearTimeout(this._closeTimeout); this.sessions.clear(); + this.recorder.stop(); Channel.records.delete(this.uuid); /** * @event Channel#close diff --git a/src/models/recorder.js b/src/models/recorder.js new file mode 100644 index 0000000..8f3150e --- /dev/null +++ b/src/models/recorder.js @@ -0,0 +1,187 @@ +import child_process from "node:child_process"; +import os from "node:os"; +import path from "node:path"; +import fs from "node:fs"; + +import { EventEmitter } from "node:events"; // TODO remove if unnecessary +import { Logger } from "#src/utils/utils.js"; +import { STREAM_TYPE } from "#src/shared/enums.js"; + +const logger = new Logger("RECORDER"); +const temp = os.tmpdir(); +const fileType = "mp4"; // TODO config +const VIDEO_LIMIT = 4; // TODO config (and other name?) + +/** + * @typedef {Object} RTPTransports + * @property {Array} audio + * @property {Array} camera + * @property {Array} screen + */ + +/** + * Wraps the FFMPEG process + * TODO move in own file + */ +class FFMPEG extends EventEmitter { + /** @type {child_process.ChildProcess} */ + _process; + /** @type {string} */ + _filePath; + + get _args() { + return [ + "-loglevel", + "debug", // TODO warning in prod + "-protocol_whitelist", + "pipe,udp,rtp", + "-fflags", + "+genpts", + "-f", + "sdp", + "-i", + "pipe:0", + "-movflags", + "frag_keyframe+empty_moov+default_base_moof", // fragmented + "-c:v", + "libx264", // vid codec + "-c:a", + "aac", // audio codec + "-f", + fileType, + this._filePath, + ]; + } + + /** + * @param {string} filePath + */ + constructor(filePath) { + super(); + this._filePath = filePath; + } + + /** + * @param {String[]} [sdp] + */ + async spawn(sdp) { + this._process = child_process.spawn("ffmpeg", this._args, { + stdio: ["pipe", "pipe", process.stderr], + }); + + if (!this._process.stdin.writable) { + throw new Error("FFMPEG stdin not writable."); + } + this._process.stdin.write(sdp); + this._process.stdin.end(); + + this._process.stdout.on("data", (chunk) => { + this.emit("data", chunk); // Emit data chunks as they become available + // may need to ues this to pipe to request if file stream does not work + }); + + this._process.on("close", (code) => { + if (code === 0) { + this.emit("success"); + } + }); + + logger.debug( + `FFMPEG process (pid:${this._process.pid}) spawned, outputting to ${this._filePath}` + ); + } + + kill() { + this._process?.kill("SIGINT"); + } +} + +export class Recorder extends EventEmitter { + static records = new Map(); + + /** @type {string} */ + uuid = crypto.randomUUID(); + /** @type {import("#src/models/channel").Channel} */ + channel; + /** @type {string} */ + state; + ffmpeg; + /** @type {RTPTransports} */ + _rtpTransports; + /** @type {string} */ + filePath; + /** + * @param {import("#src/models/channel").Channel} channel + */ + constructor(channel) { + super(); + this.channel = channel; + this.filePath = path.join(temp, `${this.uuid}.${fileType}`); + Recorder.records.set(this.uuid, this); + } + + /** @returns {number} */ + get videoCount() { + return this._rtpTransports.camera.length + this._rtpTransports.screen.length; + } + + /** + * @param {Array} ids + * @returns {string} filePath + */ + start(ids) { + // maybe internal state and check if already recording (recording = has ffmpeg child process). + this.stop(); + for (const id of ids) { + const session = this.channel.sessions.get(id); + const audioRtp = this._createRtp( + session.producers[STREAM_TYPE.AUDIO], + STREAM_TYPE.AUDIO + ); + audioRtp && this._rtpTransports.audio.push(audioRtp); + for (const type in [STREAM_TYPE.CAMERA, STREAM_TYPE.SCREEN]) { + if (this.videoCount < VIDEO_LIMIT) { + const rtp = this._createRtp(session.producers[type], type); + rtp && this._rtpTransports[type].push(rtp); + } + } + } + this.ffmpeg = new FFMPEG(this.filePath); + this.ffmpeg.spawn(); // args should be base on the rtp transports + this.ffmpeg.once("success", () => { + this.emit("download-ready", this.filePath); + }); + return this.filePath; + } + pause() { + // TODO maybe shouldn't be able to pause + } + stop() { + // TODO + // cleanup all rtp transports + // stop ffmpeg process + Recorder.records.delete(this.uuid); + } + + /** + * @param {http.ServerResponse} res + */ + pipeToResponse(res) { + // TODO check if this can be executed, otherwise end request, or throw error + const fileStream = fs.createReadStream(this._filePath); // may need to be explicitly closed? + res.writeHead(200, { + "Content-Type": `video/${fileType}`, + "Content-Disposition": "inline", + }); + fileStream.pipe(res); // Pipe the file stream to the response + } + + /** + * @param {import("mediasoup").types.Producer} producer + * @param {STREAM_TYPE[keyof STREAM_TYPE]} type + * @return {Promise} probably just create transport with right ports and return that, + */ + async _createRtp(producer, type) { + // TODO + } +} diff --git a/src/services/http.js b/src/services/http.js index 26da53e..8d27957 100644 --- a/src/services/http.js +++ b/src/services/http.js @@ -6,6 +6,7 @@ import * as config from "#src/config.js"; import { Logger, parseBody, extractRequestInfo } from "#src/utils/utils.js"; import { SESSION_CLOSE_CODE } from "#src/models/session.js"; import { Channel } from "#src/models/channel.js"; +import { Recorder } from "#src/models/recorder.js"; /** * @typedef {function} routeCallback @@ -15,6 +16,7 @@ import { Channel } from "#src/models/channel.js"; * @param {string} param2.remoteAddress * @param {string} param2.protocol * @param {string} param2.host + * @param {Object} param2.match name/value mapping of route variables * @param {URLSearchParams} param2.searchParams * @return {http.ServerResponse} */ @@ -77,6 +79,19 @@ export async function start({ httpInterface = config.HTTP_INTERFACE, port = conf return res.end(); }, }); + routeListener.get(`/v${API_VERSION}/recording/`, { + callback: async (req, res, { remoteAddress, match }) => { + try { + const { token } = match; + logger.info(`[${remoteAddress}]: requested recording ${token}`); + Recorder.records.get(token)?.pipeToResponse(res); + // res not ended as we are streaming + } catch (error) { + logger.error(`[${remoteAddress}] failed to obtain recording: ${error.message}`); + return res.end(); + } + }, + }); routeListener.post(`/v${API_VERSION}/disconnect`, { callback: async (req, res, { remoteAddress }) => { try { @@ -183,7 +198,8 @@ class RouteListener { break; } for (const [pattern, options] of registeredRoutes) { - if (pathname === pattern) { + const match = this._extractPattern(pathname, pattern); + if (match) { if (options?.cors) { res.setHeader("Access-Control-Allow-Origin", options.cors); res.setHeader("Access-Control-Allow-Methods", options.methods); @@ -195,6 +211,7 @@ class RouteListener { host, protocol, remoteAddress, + match, searchParams, }); } catch (error) { @@ -212,4 +229,32 @@ class RouteListener { } return res.end(); } + + /** + * Matches a pathname against a pattern with named parameters. + * @param {string} pathname - The URL path requested, e.g., "/channel/6/person/42/" + * @param {string} pattern - The pattern to match, e.g., "/channel//session/" + * @returns {object|undefined} - Returns undefined if no match. If matched, returns an object mapping keys to values, + * the object is empty if matching a pattern with no variables. + * eg: { channelId: "6", sessionId: "42" } | {} | undefined + */ + _extractPattern(pathname, pattern) { + pathname = pathname.replace(/\/+$/, ""); + pattern = pattern.replace(/\/+$/, ""); + const paramNames = []; + const regexPattern = pattern.replace(/<([^>]+)>/g, (_, paramName) => { + paramNames.push(paramName); + return "([^/]+)"; + }); + const regex = new RegExp(`^${regexPattern}$`); + const match = pathname.match(regex); + if (!match) { + return; + } + const params = {}; + paramNames.forEach((name, index) => { + params[name] = match[index + 1]; + }); + return params; + } } diff --git a/src/shared/enums.js b/src/shared/enums.js index 031f4db..f108e0c 100644 --- a/src/shared/enums.js +++ b/src/shared/enums.js @@ -12,6 +12,12 @@ export const WS_CLOSE_CODE = { CHANNEL_FULL: 4109, }; +export const STREAM_TYPE = { + AUDIO: "audio", + CAMERA: "camera", + SCREEN: "screen", +}; + export const SERVER_REQUEST = { /** Requests the creation of a consumer that is used to forward a track to the client */ INIT_CONSUMER: "INIT_CONSUMER", From 70bccf847cd0833111d249f5413f3d78c420e120 Mon Sep 17 00:00:00 2001 From: TSO Date: Fri, 22 Nov 2024 08:37:22 +0100 Subject: [PATCH 02/11] [IMP] discuss: recording feature tests, forwarding, ffmpeg config,... --- src/client.js | 8 +++++++- src/config.js | 18 ++++++++++++++++++ src/models/channel.js | 23 +++++++++++++++++++++-- src/models/recorder.js | 11 ++++++----- src/services/ws.js | 5 ++++- tests/network.test.js | 8 ++++++++ 6 files changed, 64 insertions(+), 9 deletions(-) diff --git a/src/client.js b/src/client.js index e954d25..ada7532 100644 --- a/src/client.js +++ b/src/client.js @@ -85,6 +85,10 @@ const ACTIVE_STATES = new Set([ export class SfuClient extends EventTarget { /** @type {Error[]} */ errors = []; + /** + * @type {{ recording: boolean, webRtc: boolean }} + */ + features = {}; /** @type {SFU_CLIENT_STATE[keyof SFU_CLIENT_STATE]} */ _state = SFU_CLIENT_STATE.DISCONNECTED; /** @type {Bus | undefined} */ @@ -418,7 +422,9 @@ export class SfuClient extends EventTarget { */ webSocket.addEventListener( "message", - () => { + (initDataMessage) => { + const { features } = JSON.parse(initDataMessage.data || initDataMessage); + this.features = features; resolve(new Bus(webSocket)); }, { once: true } diff --git a/src/config.js b/src/config.js index 8f68ca6..24dcb86 100644 --- a/src/config.js +++ b/src/config.js @@ -167,6 +167,24 @@ export const LOG_TIMESTAMP = !FALSY_INPUT.has(process.env.LOG_TIMESTAMP); export const LOG_COLOR = process.env.LOG_COLOR ? Boolean(process.env.LOG_COLOR) : process.stdout.isTTY; +/** + * Whether recording is allowed + * If true, users can request their call to be recorded. + * + * e.g: RECORDING=1 + * + * @type {boolean} + */ +export const RECORDING = Boolean(process.env.RECORDING ?? true); +/** + * The file type of the recording output, this must be a fragmentable type. + * If not set, defaults to mp4 + * + * e.g: RECORDING_FILE_TYPE=mp4 + * + * @type {string} + */ +export const RECORDING_FILE_TYPE = process.env.RECORDING_FILE_TYPE || "mp4"; // ------------------------------------------------------------ // -------------------- SETTINGS -------------------------- diff --git a/src/models/channel.js b/src/models/channel.js index b3f718f..073587a 100644 --- a/src/models/channel.js +++ b/src/models/channel.js @@ -19,6 +19,12 @@ const mediaCodecs = getAllowedCodecs(); * @property {number} screenCount */ +/** + * @typedef {Object} Features + * @property {boolean} recording + * @property {boolean} webRtc + */ + /** * @fires Channel#sessionJoin * @fires Channel#sessionLeave @@ -40,6 +46,7 @@ export class Channel extends EventEmitter { name; /** @type {WithImplicitCoercion} base 64 buffer key */ key; + /** @type {Recorder | undefined} */ recorder; /** @type {import("mediasoup").types.Router}*/ router; @@ -132,10 +139,22 @@ export class Channel extends EventEmitter { this.name = `${remoteAddress}*${this.uuid.slice(-5)}`; this.router = router; this._worker = worker; - this.recorder = new Recorder(this); + if (config.RECORDING) { + this.recorder = new Recorder(this); + } this._onSessionClose = this._onSessionClose.bind(this); } + /** + * @type {Features} + */ + get features() { + return { + recording: Boolean(this.recorder), + webRtc: Boolean(this.router), + }; + } + /** * @returns {Promise<{ uuid: string, remoteAddress: string, sessionsStats: SessionsStats, createDate: string }>} */ @@ -252,7 +271,7 @@ export class Channel extends EventEmitter { } clearTimeout(this._closeTimeout); this.sessions.clear(); - this.recorder.stop(); + this.recorder?.stop(); Channel.records.delete(this.uuid); /** * @event Channel#close diff --git a/src/models/recorder.js b/src/models/recorder.js index 8f3150e..2a404e3 100644 --- a/src/models/recorder.js +++ b/src/models/recorder.js @@ -6,11 +6,11 @@ import fs from "node:fs"; import { EventEmitter } from "node:events"; // TODO remove if unnecessary import { Logger } from "#src/utils/utils.js"; import { STREAM_TYPE } from "#src/shared/enums.js"; +import { RECORDING_FILE_TYPE } from "#src/config.js"; const logger = new Logger("RECORDER"); const temp = os.tmpdir(); -const fileType = "mp4"; // TODO config -const VIDEO_LIMIT = 4; // TODO config (and other name?) +const VIDEO_LIMIT = 4; /** * @typedef {Object} RTPTransports @@ -48,7 +48,7 @@ class FFMPEG extends EventEmitter { "-c:a", "aac", // audio codec "-f", - fileType, + RECORDING_FILE_TYPE, this._filePath, ]; } @@ -116,7 +116,7 @@ export class Recorder extends EventEmitter { constructor(channel) { super(); this.channel = channel; - this.filePath = path.join(temp, `${this.uuid}.${fileType}`); + this.filePath = path.join(temp, `${this.uuid}.${RECORDING_FILE_TYPE}`); Recorder.records.set(this.uuid, this); } @@ -138,6 +138,7 @@ export class Recorder extends EventEmitter { session.producers[STREAM_TYPE.AUDIO], STREAM_TYPE.AUDIO ); + // TODO maybe some logic for priority on session id or stream type audioRtp && this._rtpTransports.audio.push(audioRtp); for (const type in [STREAM_TYPE.CAMERA, STREAM_TYPE.SCREEN]) { if (this.videoCount < VIDEO_LIMIT) { @@ -170,7 +171,7 @@ export class Recorder extends EventEmitter { // TODO check if this can be executed, otherwise end request, or throw error const fileStream = fs.createReadStream(this._filePath); // may need to be explicitly closed? res.writeHead(200, { - "Content-Type": `video/${fileType}`, + "Content-Type": `video/${RECORDING_FILE_TYPE}`, "Content-Disposition": "inline", }); fileStream.pipe(res); // Pipe the file stream to the response diff --git a/src/services/ws.js b/src/services/ws.js index eef232d..48295ac 100644 --- a/src/services/ws.js +++ b/src/services/ws.js @@ -122,7 +122,10 @@ async function connect(webSocket, { channelUUID, jwt }) { if (!session_id) { throw new AuthenticationError("Malformed JWT payload"); } - webSocket.send(); // client can start using ws after this message. + const initData = { + features: channel.features, + }; + webSocket.send(JSON.stringify(initData)); // client can start using ws after this message. const bus = new Bus(webSocket, { batchDelay: config.timeouts.busBatch }); const { session } = Channel.join(channel.uuid, session_id); session.once("close", ({ code }) => { diff --git a/tests/network.test.js b/tests/network.test.js index 1cc385b..6cdb8e2 100644 --- a/tests/network.test.js +++ b/tests/network.test.js @@ -36,6 +36,14 @@ describe("Full network", () => { const [thirdStateChange] = await once(user3.session, "stateChange"); expect(thirdStateChange).toBe(SESSION_STATE.CONNECTED); }); + test("Get features information after connecting", async () => { + const channelUUID = await network.getChannelUUID(); + const user1 = await network.connect(channelUUID, 1); + const [firstStateChange] = await once(user1.session, "stateChange"); + expect(firstStateChange).toBe(SESSION_STATE.CONNECTED); + expect(user1.sfuClient.features.recording).toBe(true); + expect(user1.sfuClient.features.webRtc).toBe(true); + }); test("The session of the server closes when the client is disconnected", async () => { const channelUUID = await network.getChannelUUID(); const user1 = await network.connect(channelUUID, 1); From 91e90b660e25d48c72bb065cdf86eabc220d497e Mon Sep 17 00:00:00 2001 From: TSO Date: Wed, 27 Nov 2024 08:33:24 +0100 Subject: [PATCH 03/11] [IMP] wip --- src/config.js | 34 +++-- src/models/channel.js | 14 +- src/models/recorder.js | 192 +++++++++++++++++--------- src/models/session.js | 77 +++++++++++ src/server.js | 14 +- src/services/http.js | 17 ++- src/services/{rtc.js => resources.js} | 41 +++++- src/utils/errors.js | 4 + src/utils/ffmpeg.js | 0 src/utils/utils.js | 39 ++++++ tests/http.test.js | 2 +- tests/models.test.js | 6 +- tests/rtc.test.js | 8 +- tests/utils/network.js | 6 +- 14 files changed, 349 insertions(+), 105 deletions(-) rename src/services/{rtc.js => resources.js} (68%) create mode 100644 src/utils/ffmpeg.js diff --git a/src/config.js b/src/config.js index 24dcb86..edd5caa 100644 --- a/src/config.js +++ b/src/config.js @@ -1,6 +1,6 @@ import os from "node:os"; -const FALSY_INPUT = new Set(["disable", "false", "none", "no", "0"]); +const FALSY_INPUT = new Set(["disable", "false", "none", "no", "0", "off"]); // ------------------------------------------------------------ // ------------------ ENV VARIABLES ----------------------- @@ -171,20 +171,11 @@ export const LOG_COLOR = process.env.LOG_COLOR * Whether recording is allowed * If true, users can request their call to be recorded. * - * e.g: RECORDING=1 + * e.g: RECORDING=1 or RECORDING=off * * @type {boolean} */ -export const RECORDING = Boolean(process.env.RECORDING ?? true); -/** - * The file type of the recording output, this must be a fragmentable type. - * If not set, defaults to mp4 - * - * e.g: RECORDING_FILE_TYPE=mp4 - * - * @type {string} - */ -export const RECORDING_FILE_TYPE = process.env.RECORDING_FILE_TYPE || "mp4"; +export const RECORDING = !FALSY_INPUT.has(process.env.RECORDING); // ------------------------------------------------------------ // -------------------- SETTINGS -------------------------- @@ -219,6 +210,20 @@ const baseProducerOptions = { zeroRtpOnPause: true, }; +export const recording = Object.freeze({ + directory: os.tmpdir() + "/recordings", + enabled: RECORDING, + maxDuration: 1000 * 60 * 60, // 1 hour + fileTTL: 1000 * 60 * 60 * 24, // 24 hours + fileType: "mp4", + videoLimit: 4, // how many videos can be merged into one recording +}); + +export const dynamicPorts = Object.freeze({ + min: 50000, + max: 59999, +}); + export const rtc = Object.freeze({ // https://mediasoup.org/documentation/v3/mediasoup/api/#WorkerSettings workerSettings: { @@ -247,6 +252,11 @@ export const rtc = Object.freeze({ }, ], }, + plainTransportOptions: { + listenIp: { ip: "0.0.0.0", announcedIp: PUBLIC_IP }, + rtcpMux: true, + comedia: false, + }, // https://mediasoup.org/documentation/v3/mediasoup/api/#WebRtcTransportOptions rtcTransportOptions: { maxSctpMessageSize: MAX_BUF_IN, diff --git a/src/models/channel.js b/src/models/channel.js index 073587a..084f73f 100644 --- a/src/models/channel.js +++ b/src/models/channel.js @@ -4,7 +4,7 @@ import * as config from "#src/config.js"; import { getAllowedCodecs, Logger } from "#src/utils/utils.js"; import { AuthenticationError, OvercrowdedError } from "#src/utils/errors.js"; import { Session, SESSION_CLOSE_CODE } from "#src/models/session.js"; -import { getWorker } from "#src/services/rtc.js"; +import { getWorker } from "#src/services/resources.js"; import { Recorder } from "#src/models/recorder.js"; const logger = new Logger("CHANNEL"); @@ -65,15 +65,16 @@ export class Channel extends EventEmitter { * @param {boolean} [options.useWebRtc=true] whether to use WebRTC: * with webRTC: can stream audio/video * without webRTC: can only use websocket + * @param {string} [options.uploadRoute] the route to which the recording will be uploaded */ - static async create(remoteAddress, issuer, { key, useWebRtc = true } = {}) { + static async create(remoteAddress, issuer, { key, useWebRtc = true, uploadRoute } = {}) { const safeIssuer = `${remoteAddress}::${issuer}`; const oldChannel = Channel.recordsByIssuer.get(safeIssuer); if (oldChannel) { logger.verbose(`reusing channel ${oldChannel.uuid}`); return oldChannel; } - const options = { key }; + const options = { key, uploadRoute }; if (useWebRtc) { options.worker = await getWorker(); options.router = await options.worker.createRouter({ @@ -128,8 +129,9 @@ export class Channel extends EventEmitter { * @param {string} [options.key] * @param {import("mediasoup").types.Worker} [options.worker] * @param {import("mediasoup").types.Router} [options.router] + * @param {string} [options.uploadRoute] the route to which the recording will be uploaded */ - constructor(remoteAddress, { key, worker, router } = {}) { + constructor(remoteAddress, { key, worker, router, uploadRoute } = {}) { super(); const now = new Date(); this.createDate = now.toISOString(); @@ -139,8 +141,8 @@ export class Channel extends EventEmitter { this.name = `${remoteAddress}*${this.uuid.slice(-5)}`; this.router = router; this._worker = worker; - if (config.RECORDING) { - this.recorder = new Recorder(this); + if (config.recording.enabled) { + this.recorder = new Recorder(this, uploadRoute); } this._onSessionClose = this._onSessionClose.bind(this); } diff --git a/src/models/recorder.js b/src/models/recorder.js index 2a404e3..063bd95 100644 --- a/src/models/recorder.js +++ b/src/models/recorder.js @@ -1,24 +1,47 @@ import child_process from "node:child_process"; -import os from "node:os"; import path from "node:path"; import fs from "node:fs"; - import { EventEmitter } from "node:events"; // TODO remove if unnecessary -import { Logger } from "#src/utils/utils.js"; + +import { Logger, formatFfmpegSdp } from "#src/utils/utils.js"; import { STREAM_TYPE } from "#src/shared/enums.js"; -import { RECORDING_FILE_TYPE } from "#src/config.js"; +import { LOG_LEVEL, recording } from "#src/config.js"; +import * as config from "#src/config.js"; +import * as https from "node:https"; +import http from "node:http"; const logger = new Logger("RECORDER"); -const temp = os.tmpdir(); -const VIDEO_LIMIT = 4; - -/** - * @typedef {Object} RTPTransports - * @property {Array} audio - * @property {Array} camera - * @property {Array} screen - */ +fs.mkdir(recording.directory, { recursive: true }, (err) => { + if (err) { + logger.error(err); + } +}); +export function clearDirectory() { + const now = Date.now(); + fs.readdir(recording.directory, (err, files) => { + if (err) { + logger.error(err); + return; + } + for (const file of files) { + const stats = fs.statSync(path.join(recording.directory, file)); + if (stats.mtimeMs < now - config.recording.fileTTL) { + fs.unlink(path.join(recording.directory, file), (err) => { + if (err) { + logger.error(err); + } + logger.info(`Deleted recording ${file}`); + }); + } + fs.unlink(path.join(recording.directory, file), (err) => { + if (err) { + logger.error(err); + } + }); + } + }); +} /** * Wraps the FFMPEG process * TODO move in own file @@ -30,9 +53,8 @@ class FFMPEG extends EventEmitter { _filePath; get _args() { - return [ - "-loglevel", - "debug", // TODO warning in prod + const args = [ + // TODO "-protocol_whitelist", "pipe,udp,rtp", "-fflags", @@ -48,9 +70,13 @@ class FFMPEG extends EventEmitter { "-c:a", "aac", // audio codec "-f", - RECORDING_FILE_TYPE, + recording.fileType, this._filePath, ]; + if (LOG_LEVEL === "debug") { + args.unshift("-loglevel", "debug"); + } + return args; } /** @@ -72,7 +98,7 @@ class FFMPEG extends EventEmitter { if (!this._process.stdin.writable) { throw new Error("FFMPEG stdin not writable."); } - this._process.stdin.write(sdp); + this._process.stdin.write(sdp); // TODO (maybe pass args earlier) this._process.stdin.end(); this._process.stdout.on("data", (chunk) => { @@ -97,27 +123,41 @@ class FFMPEG extends EventEmitter { } export class Recorder extends EventEmitter { - static records = new Map(); - + /** @type {Map} */ + static generatedFiles = new Map(); /** @type {string} */ - uuid = crypto.randomUUID(); + uuid; /** @type {import("#src/models/channel").Channel} */ channel; /** @type {string} */ state; + /** @type {FFMPEG} */ ffmpeg; - /** @type {RTPTransports} */ - _rtpTransports; /** @type {string} */ filePath; + /** @type {number} */ + _limitTimeout; + /** + * @param {string} uuid + * @param {http.ServerResponse} res + */ + static pipeToResponse(uuid, res) { + // TODO check if this can be executed, otherwise end request, or throw error (http service will throw anyways) + const fileStream = fs.createReadStream(Recorder.generatedFiles.get(uuid)); // may need to be explicitly closed? + res.writeHead(200, { + "Content-Type": `video/${recording.fileType}`, + "Content-Disposition": "inline", + }); + fileStream.pipe(res); // Pipe the file stream to the response + } /** * @param {import("#src/models/channel").Channel} channel + * @param {string} destination url to send the file to */ - constructor(channel) { + constructor(channel, destination) { super(); this.channel = channel; - this.filePath = path.join(temp, `${this.uuid}.${RECORDING_FILE_TYPE}`); - Recorder.records.set(this.uuid, this); + this._destination = destination; } /** @returns {number} */ @@ -129,60 +169,82 @@ export class Recorder extends EventEmitter { * @param {Array} ids * @returns {string} filePath */ - start(ids) { - // maybe internal state and check if already recording (recording = has ffmpeg child process). - this.stop(); + async start(ids) { + if (this.ffmpeg) { + return this.filePath; + } + this.uuid = crypto.randomUUID(); + const audioRtps = []; + const videoRtps = []; for (const id of ids) { const session = this.channel.sessions.get(id); - const audioRtp = this._createRtp( - session.producers[STREAM_TYPE.AUDIO], - STREAM_TYPE.AUDIO - ); - // TODO maybe some logic for priority on session id or stream type - audioRtp && this._rtpTransports.audio.push(audioRtp); + const audioRtpData = session.getRtp(STREAM_TYPE.AUDIO); + audioRtpData && audioRtps.push(audioRtpData); for (const type in [STREAM_TYPE.CAMERA, STREAM_TYPE.SCREEN]) { - if (this.videoCount < VIDEO_LIMIT) { - const rtp = this._createRtp(session.producers[type], type); - rtp && this._rtpTransports[type].push(rtp); + if (videoRtps.length < recording.videoLimit) { + const videoRtpData = session.getRtp(type); + videoRtpData && videoRtps.push(videoRtpData); } } } + this.filePath = path.join(recording.directory, `call_${Date.now()}.${recording.fileType}`); this.ffmpeg = new FFMPEG(this.filePath); - this.ffmpeg.spawn(); // args should be base on the rtp transports + try { + await this.ffmpeg.spawn(formatFfmpegSdp(audioRtps, videoRtps)); // args should be base on the rtp transports + } catch (error) { + logger.error(`Failed to start recording: ${error.message}`); + this.ffmpeg?.kill(); + this.ffmpeg = undefined; + return; + } + this._limitTimeout = setTimeout(() => { + this.upload(); + }, recording.maxDuration); + Recorder.generatedFiles.set(this.uuid, this.filePath); this.ffmpeg.once("success", () => { this.emit("download-ready", this.filePath); }); return this.filePath; } - pause() { - // TODO maybe shouldn't be able to pause + update(ids) { + // TODO see if ffmpeg input can be re-configured at runtime, otherwise no support or full restart + return this.filePath; } stop() { - // TODO - // cleanup all rtp transports - // stop ffmpeg process - Recorder.records.delete(this.uuid); + this.ffmpeg?.kill(); + this.uuid = undefined; + this.ffmpeg = undefined; + clearTimeout(this._limitTimeout); } - - /** - * @param {http.ServerResponse} res - */ - pipeToResponse(res) { - // TODO check if this can be executed, otherwise end request, or throw error - const fileStream = fs.createReadStream(this._filePath); // may need to be explicitly closed? - res.writeHead(200, { - "Content-Type": `video/${RECORDING_FILE_TYPE}`, - "Content-Disposition": "inline", + upload() { + this.stop(); + if (!this._destination) { + logger.warn(`No upload destination set for ${this.uuid}`); + return; + } + const fileStream = fs.createReadStream(this.filePath); + const { hostname, pathname, protocol } = new URL(this._destination); + const options = { + hostname, + path: pathname, + method: "POST", + headers: { + "Content-Type": "application/octet-stream", + "Content-Length": fs.statSync(this.filePath).size, + }, + }; + // TODO this should be a special route that has a generous upload limit + const request = (protocol === "https:" ? https : http).request(options, (res) => { + if (res.statusCode === 200) { + logger.info(`File uploaded to ${this._destination}`); + // TODO delete file + } else { + logger.error(`Failed to upload file: ${res.statusCode}`); + } }); - fileStream.pipe(res); // Pipe the file stream to the response - } - - /** - * @param {import("mediasoup").types.Producer} producer - * @param {STREAM_TYPE[keyof STREAM_TYPE]} type - * @return {Promise} probably just create transport with right ports and return that, - */ - async _createRtp(producer, type) { - // TODO + request.once("error", (error) => { + logger.error(`Failed to upload file: ${error.message}`); + }); + fileStream.pipe(request); } } diff --git a/src/models/session.js b/src/models/session.js index 4e5b75a..b09f853 100644 --- a/src/models/session.js +++ b/src/models/session.js @@ -8,6 +8,7 @@ import { SERVER_MESSAGE, SERVER_REQUEST, } from "#src/shared/enums.js"; +import { getPort, releasePort } from "#src/services/resources.js"; /** * @typedef {Object} SessionInfo @@ -42,6 +43,23 @@ import { * @property {import("mediasoup").types.Producer | null} screen */ +/** + * @typedef {Object} RtpData + * @property {import("mediasoup").types.PlainTransport} transport + * @property {string} payloadType + * @property {number} port + * @property {number} clockRate + * @property {string} codec + * @property {string} channels + */ + +/** + * @typedef {Object} RtpDataByType + * @property {RtpData | null} audio + * @property {RtpData | null} camera + * @property {RtpData | null} screen + */ + const logger = new Logger("SESSION"); export const SESSION_STATE = Object.freeze({ @@ -91,6 +109,15 @@ export class Session extends EventEmitter { camera: null, screen: null, }; + /** + * Transports and their information used to expose streams on a dynamic port. + * @type {Map} + */ + _rtp = { + audio: null, + camera: null, + screen: null, + }; /** @type {import("#src/models/channel").Channel} */ _channel; /** @type {Error[]} */ @@ -466,6 +493,56 @@ export class Session extends EventEmitter { } } + /** + * @param {STREAM_TYPE[keyof STREAM_TYPE]} type + * @return {Promise} + */ + async getRtp(type) { + if (this._rtp[type]) { + return this._rtp[type]; + } + const producer = this.producers[type]; + if (!producer) { + return; + } + const transport = await this._channel.router.createWebRtcTransport( + config.rtc.plainTransportOptions + ); + const port = getPort(); + await transport.connect({ + ip: "127.0.0.1", // just local, we only transport to a local child process + port, + }); + // TODO may want to use producer.getStats() to get the codec info + // for val of producer.getStats().values() { if val.type === "codec": val.minetype, val.clockRate,... } + //const codecData = this._channel.router.rtpCapabilities.codecs.find( + // (codec) => codec.kind === producer.kind + //); + const codecData = producer.rtpParameters.codecs[0]; + // TODO if we can hot swap consumers, the transport should be owned by the recorder + // TODO if not possible to swap here or even at ffmpeg level, worst case is to compose recording and merge it with ffmpeg again. + const consumer = await transport.consume({ + producerId: producer.id, + rtpCapabilities: producer.rtpParameters, + paused: true, + }); + this.once("close", () => { + consumer.close(); + transport.close(); + releasePort(port); + this._rtp[type] = null; + }); + this._rtp[type] = { + transport, + port, + payloadType: codecData.preferredPayloadType || codecData.payloadType, + clockRate: codecData.clockRate, + codec: codecData.mimeType.replace(`${producer.kind}`, ""), + channels: producer.kind === "audio" ? codecData.channels : undefined, + }; + return this._rtp[type]; + } + _broadcastInfo() { this._broadcast({ name: SERVER_MESSAGE.INFO_CHANGE, diff --git a/src/server.js b/src/server.js index 18ba10d..89ad1e3 100644 --- a/src/server.js +++ b/src/server.js @@ -1,22 +1,30 @@ -import * as rtc from "#src/services/rtc.js"; +import * as resources from "#src/services/resources.js"; import * as http from "#src/services/http.js"; import * as auth from "#src/services/auth.js"; import { Logger } from "#src/utils/utils.js"; import { Channel } from "#src/models/channel.js"; +import { clearDirectory } from "#src/models/recorder.js"; const logger = new Logger("SERVER", { logLevel: "all" }); +let fileCleanupInterval; async function run() { + clearDirectory(); + fileCleanupInterval = setInterval(() => { + clearDirectory(); + }, 1000 * 60 * 60 * 24); await auth.start(); - await rtc.start(); + await resources.start(); await http.start(); logger.info(`ready - PID: ${process.pid}`); } function cleanup() { + clearInterval(fileCleanupInterval); + clearDirectory(); Channel.closeAll(); http.close(); - rtc.close(); + resources.close(); logger.info("cleanup complete"); } diff --git a/src/services/http.js b/src/services/http.js index 8d27957..820f804 100644 --- a/src/services/http.js +++ b/src/services/http.js @@ -61,10 +61,13 @@ export async function start({ httpInterface = config.HTTP_INTERFACE, port = conf res.statusCode = 403; // forbidden return res.end(); } - const channel = await Channel.create(remoteAddress, claims.iss, { + const { webRTC, uploadRoute } = searchParams; + const options = { key: claims.key, - useWebRtc: searchParams.get("webRTC") !== "false", - }); + useWebRtc: webRTC !== "false", + uploadRoute, + }; + const channel = await Channel.create(remoteAddress, claims.iss, options); res.setHeader("Content-Type", "application/json"); res.statusCode = 200; return res.end( @@ -79,12 +82,12 @@ export async function start({ httpInterface = config.HTTP_INTERFACE, port = conf return res.end(); }, }); - routeListener.get(`/v${API_VERSION}/recording/`, { + routeListener.get(`/v${API_VERSION}/recording/`, { callback: async (req, res, { remoteAddress, match }) => { try { - const { token } = match; - logger.info(`[${remoteAddress}]: requested recording ${token}`); - Recorder.records.get(token)?.pipeToResponse(res); + const { uuid } = match; + logger.info(`[${remoteAddress}]: requested recording ${uuid}`); + Recorder.pipeToResponse(uuid, res); // res not ended as we are streaming } catch (error) { logger.error(`[${remoteAddress}] failed to obtain recording: ${error.message}`); diff --git a/src/services/rtc.js b/src/services/resources.js similarity index 68% rename from src/services/rtc.js rename to src/services/resources.js index 9662995..3b16883 100644 --- a/src/services/rtc.js +++ b/src/services/resources.js @@ -2,8 +2,14 @@ import mediasoup from "mediasoup"; import * as config from "#src/config.js"; import { Logger } from "#src/utils/utils.js"; +import { PortLimitReachedError } from "#src/utils/errors.js"; -const logger = new Logger("RTC"); +const port_span = config.dynamicPorts.max - config.dynamicPorts.min; +let port_offset = 0; +/** @type {Set} */ +const usedPorts = new Set(); + +const logger = new Logger("RESOURCES"); /** @type {Set} */ const workers = new Set(); @@ -13,6 +19,7 @@ export async function start() { for (let i = 0; i < config.NUM_WORKERS; ++i) { await makeWorker(); } + logger.info(`${port_span} dynamic ports available`); logger.info(`initialized ${workers.size} mediasoup workers`); logger.info( `transport(RTC) layer at ${config.PUBLIC_IP}:${config.RTC_MIN_PORT}-${config.RTC_MAX_PORT}` @@ -24,6 +31,8 @@ export function close() { worker.appData.webRtcServer.close(); worker.close(); } + port_offset = 0; + usedPorts.clear(); workers.clear(); } @@ -64,3 +73,33 @@ export async function getWorker() { logger.debug(`worker ${leastUsedWorker.pid} with ${lowestUsage} ru_maxrss was selected`); return leastUsedWorker; } + +/** + * @returns {number} + */ +function _getPort() { + port_offset++; + return config.dynamicPorts.min + (port_offset % port_span); +} +/** + * Returns a dynamic port that is not in use. + * @returns {number} + */ +export function getPort() { + let port = _getPort(); + if (usedPorts.size === port_span) { + throw new PortLimitReachedError(); + } + while (usedPorts.has(port)) { + port = _getPort(); + } + usedPorts.add(port); + return port; +} + +/** + * @param {number} port + */ +export function releasePort(port) { + usedPorts.delete(port); +} diff --git a/src/utils/errors.js b/src/utils/errors.js index 5eb7855..62ee4f9 100644 --- a/src/utils/errors.js +++ b/src/utils/errors.js @@ -5,3 +5,7 @@ export class AuthenticationError extends Error { export class OvercrowdedError extends Error { name = "OvercrowdedError"; } + +export class PortLimitReachedError extends Error { + name = "PortLimitReachedError"; +} diff --git a/src/utils/ffmpeg.js b/src/utils/ffmpeg.js new file mode 100644 index 0000000..e69de29 diff --git a/src/utils/utils.js b/src/utils/utils.js index d1c6b43..ddee4c4 100644 --- a/src/utils/utils.js +++ b/src/utils/utils.js @@ -165,3 +165,42 @@ export function getAllowedCodecs() { } return codecs; } + +/** + * hard-coded ffmpeg sdp fragments for layouts with 1...4 videos + * TODO make the right resizing and vstack/hstack params + */ +const LAYOUT = { + 1: "TODO layout for 1 video", + 2: "TODO layout for 2 videos", + 3: "TODO layout for 3 videos", + 4: "TODO layout for 4 videos", +}; + +/** + * TODO + * @param {RtpData[]} audioRtps + * @param {RtpData[]} videoRtps + * @return {string[]} + */ +export function formatFfmpegSdp(audioRtps, videoRtps) { + // array of strings containing the sdp for ffmpeg, related to the stacking of videos + const sdp = ["v=0", "o=- 0 0 IN IP4 127.0.0.1", "s=FFmpeg", "c=IN IP4 127.0.0.1", "t=0 0"]; + const layout = LAYOUT[videoRtps.length]; + if (!layout) { + throw new Error(`unsupported layout for ${videoRtps.length} videos`); + } + for (const audioRtp of audioRtps) { + sdp.push(`m=audio ${audioRtp.port} RTP/AVP ${audioRtp.payloadType}`); + sdp.push(`a=rtpmap:${audioRtp.payloadType} ${audioRtp.codec}/${audioRtp.clockRate}`); + sdp.push(`a=sendonly`); + } + for (const videoRtp of videoRtps) { + // TODO do something with layout. Layout may contain a format function that takes below values as params, or the whole videoRtps[]. + sdp.push(`m=video ${videoRtp.port} RTP/AVP ${videoRtp.payloadType}`); + sdp.push(`a=rtpmap:${videoRtp.payloadType} ${videoRtp.codec}/${videoRtp.clockRate}`); + sdp.push(`a=sendonly`); + } + // TODO, layout only a small part of the full SDP. + return sdp; +} diff --git a/tests/http.test.js b/tests/http.test.js index bb70864..f4a58de 100644 --- a/tests/http.test.js +++ b/tests/http.test.js @@ -72,7 +72,7 @@ describe("HTTP", () => { expect(response.ok).toBe(true); const { uuid, url } = await response.json(); expect(Channel.records.get(uuid)).toBeDefined(); - expect(url).toBe(`http://${config.PUBLIC_ADDRESS}:${config.PORT}`); + expect(url).toBe(`http://${config.PUBLIC_IP}:${config.PORT}`); }); test("/noop", async () => { const response = await fetch(`http://${HTTP_INTERFACE}:${PORT}/v${API_VERSION}/noop`, { diff --git a/tests/models.test.js b/tests/models.test.js index ce8a20c..be41778 100644 --- a/tests/models.test.js +++ b/tests/models.test.js @@ -1,17 +1,17 @@ import { describe, beforeEach, afterEach, expect, jest } from "@jest/globals"; -import * as rtc from "#src/services/rtc.js"; +import * as resources from "#src/services/resources.js"; import { Channel } from "#src/models/channel.js"; import { timeouts, CHANNEL_SIZE } from "#src/config.js"; import { OvercrowdedError } from "#src/utils/errors.js"; describe("Models", () => { beforeEach(async () => { - await rtc.start(); + await resources.start(); }); afterEach(() => { Channel.closeAll(); - rtc.close(); + resources.close(); }); test("Create channel and session", async () => { const channel = await Channel.create("testRemote", "testIssuer"); diff --git a/tests/rtc.test.js b/tests/rtc.test.js index 5926c78..96ebcd5 100644 --- a/tests/rtc.test.js +++ b/tests/rtc.test.js @@ -1,19 +1,19 @@ import { afterEach, beforeEach, describe, expect } from "@jest/globals"; -import * as rtc from "#src/services/rtc.js"; +import * as resources from "#src/services/resources.js"; import * as config from "#src/config.js"; describe("rtc service", () => { beforeEach(async () => { - await rtc.start(); + await resources.start(); }); afterEach(() => { - rtc.close(); + resources.close(); }); test("worker load should be evenly distributed", async () => { const usedWorkers = new Set(); for (let i = 0; i < config.NUM_WORKERS; ++i) { - const worker = await rtc.getWorker(); + const worker = await resources.getWorker(); const router = await worker.createRouter({}); const webRtcServer = await worker.createWebRtcServer(config.rtc.rtcServerOptions); const promises = []; diff --git a/tests/utils/network.js b/tests/utils/network.js index d9ff48d..622a0d1 100644 --- a/tests/utils/network.js +++ b/tests/utils/network.js @@ -6,7 +6,7 @@ import * as fakeParameters from "mediasoup-client/lib/test/fakeParameters"; import * as auth from "#src/services/auth.js"; import * as http from "#src/services/http.js"; -import * as rtc from "#src/services/rtc.js"; +import * as resources from "#src/services/resources.js"; import { SfuClient, SFU_CLIENT_STATE } from "#src/client.js"; import { Channel } from "#src/models/channel.js"; @@ -39,7 +39,7 @@ export class LocalNetwork { async start(hostname, port) { this.hostname = hostname; this.port = port; - await rtc.start(); + await resources.start(); await http.start({ hostname, port }); await auth.start(HMAC_B64_KEY); } @@ -122,6 +122,6 @@ export class LocalNetwork { Channel.closeAll(); auth.close(); http.close(); - rtc.close(); + resources.close(); } } From 482dd0db18fe51d0e5cf52a604d2eae53059717b Mon Sep 17 00:00:00 2001 From: TSO Date: Wed, 27 Nov 2024 13:36:28 +0100 Subject: [PATCH 04/11] [IMP] WIP: ffmpeg formatting --- src/models/recorder.js | 93 +++-------------------------- src/utils/ffmpeg.js | 129 +++++++++++++++++++++++++++++++++++++++++ src/utils/utils.js | 39 ------------- 3 files changed, 137 insertions(+), 124 deletions(-) diff --git a/src/models/recorder.js b/src/models/recorder.js index 063bd95..a353174 100644 --- a/src/models/recorder.js +++ b/src/models/recorder.js @@ -1,14 +1,14 @@ -import child_process from "node:child_process"; import path from "node:path"; import fs from "node:fs"; +import * as https from "node:https"; +import http from "node:http"; import { EventEmitter } from "node:events"; // TODO remove if unnecessary -import { Logger, formatFfmpegSdp } from "#src/utils/utils.js"; +import { Logger } from "#src/utils/utils.js"; import { STREAM_TYPE } from "#src/shared/enums.js"; -import { LOG_LEVEL, recording } from "#src/config.js"; +import { FFMPEG } from "#src/utils/ffmpeg.js"; +import { recording } from "#src/config.js"; import * as config from "#src/config.js"; -import * as https from "node:https"; -import http from "node:http"; const logger = new Logger("RECORDER"); @@ -42,85 +42,6 @@ export function clearDirectory() { } }); } -/** - * Wraps the FFMPEG process - * TODO move in own file - */ -class FFMPEG extends EventEmitter { - /** @type {child_process.ChildProcess} */ - _process; - /** @type {string} */ - _filePath; - - get _args() { - const args = [ - // TODO - "-protocol_whitelist", - "pipe,udp,rtp", - "-fflags", - "+genpts", - "-f", - "sdp", - "-i", - "pipe:0", - "-movflags", - "frag_keyframe+empty_moov+default_base_moof", // fragmented - "-c:v", - "libx264", // vid codec - "-c:a", - "aac", // audio codec - "-f", - recording.fileType, - this._filePath, - ]; - if (LOG_LEVEL === "debug") { - args.unshift("-loglevel", "debug"); - } - return args; - } - - /** - * @param {string} filePath - */ - constructor(filePath) { - super(); - this._filePath = filePath; - } - - /** - * @param {String[]} [sdp] - */ - async spawn(sdp) { - this._process = child_process.spawn("ffmpeg", this._args, { - stdio: ["pipe", "pipe", process.stderr], - }); - - if (!this._process.stdin.writable) { - throw new Error("FFMPEG stdin not writable."); - } - this._process.stdin.write(sdp); // TODO (maybe pass args earlier) - this._process.stdin.end(); - - this._process.stdout.on("data", (chunk) => { - this.emit("data", chunk); // Emit data chunks as they become available - // may need to ues this to pipe to request if file stream does not work - }); - - this._process.on("close", (code) => { - if (code === 0) { - this.emit("success"); - } - }); - - logger.debug( - `FFMPEG process (pid:${this._process.pid}) spawned, outputting to ${this._filePath}` - ); - } - - kill() { - this._process?.kill("SIGINT"); - } -} export class Recorder extends EventEmitter { /** @type {Map} */ @@ -190,7 +111,7 @@ export class Recorder extends EventEmitter { this.filePath = path.join(recording.directory, `call_${Date.now()}.${recording.fileType}`); this.ffmpeg = new FFMPEG(this.filePath); try { - await this.ffmpeg.spawn(formatFfmpegSdp(audioRtps, videoRtps)); // args should be base on the rtp transports + await this.ffmpeg.spawn(audioRtps, videoRtps); // args should be base on the rtp transports } catch (error) { logger.error(`Failed to start recording: ${error.message}`); this.ffmpeg?.kill(); @@ -208,6 +129,8 @@ export class Recorder extends EventEmitter { } update(ids) { // TODO see if ffmpeg input can be re-configured at runtime, otherwise no support or full restart + // could also see if the consumer of the RtpTransport can be swapped at runtime, in which case, RtpTransport should + // be owned by the Recorder (4 RtpTransport per recorder, and consume on demand). return this.filePath; } stop() { diff --git a/src/utils/ffmpeg.js b/src/utils/ffmpeg.js index e69de29..3f8c17f 100644 --- a/src/utils/ffmpeg.js +++ b/src/utils/ffmpeg.js @@ -0,0 +1,129 @@ +import child_process from "node:child_process"; +import { EventEmitter } from "node:events"; + +import { Logger } from "#src/utils/utils.js"; +import { recording, LOG_LEVEL } from "#src/config.js"; + +const logger = new Logger("FFMPEG"); + +/** + * hard-coded ffmpeg sdp fragments for layouts with 1...4 videos + * TODO make the right resizing and vstack/hstack params + */ +const LAYOUT = { + 1: "", + 2: "a=filter:complex [0:v][1:v]hstack=inputs=2[v]; -map [v]", + 3: "a=filter:complex [0:v][1:v]hstack=inputs=2[top];[top][2:v]vstack=inputs=2[v]; -map [v]", + 4: "a=filter:complex [0:v][1:v]hstack=inputs=2[top];[2:v][3:v]hstack=inputs=2[bottom];[top][bottom]vstack=inputs=2[v]; -map [v]", +}; + +/** + * TODO + * @param {RtpData[]} audioRtps + * @param {RtpData[]} videoRtps + * @return {string} + */ +function formatFfmpegSdp(audioRtps, videoRtps) { + const sdp = ["v=0", "o=- 0 0 IN IP4 127.0.0.1", "s=FFmpeg", "c=IN IP4 127.0.0.1", "t=0 0"]; + const layout = LAYOUT[videoRtps.length]; + if (!layout) { + throw new Error(`unsupported layout for ${videoRtps.length} videos`); + } + for (const audioRtp of audioRtps) { + sdp.push(`m=audio ${audioRtp.port} RTP/AVP ${audioRtp.payloadType}`); + sdp.push(`a=rtpmap:${audioRtp.payloadType} ${audioRtp.codec}/${audioRtp.clockRate}`); + sdp.push(`a=sendonly`); + } + sdp.push(`-c:a aac -b:a 160k -ac 2 -filter_complex amerge=inputs=${audioRtps.length}`); + if (videoRtps.length > 0) { + sdp.push( + "-movflags", + "frag_keyframe+empty_moov+default_base_moof", // fragmented for streaming although could use another format if dropping the pipe feature + "-c:v", + "mp4v" + ); + for (const videoRtp of videoRtps) { + sdp.push(`m=video ${videoRtp.port} RTP/AVP ${videoRtp.payloadType}`); + sdp.push(`a=rtpmap:${videoRtp.payloadType} ${videoRtp.codec}/${videoRtp.clockRate}`); + sdp.push(`a=sendonly`); + } + } + // TODO, layout only a small part of the full SDP. + return sdp.join("\n"); +} + +/** + * Wraps the FFMPEG process + * TODO move in own file + */ +export class FFMPEG extends EventEmitter { + /** @type {child_process.ChildProcess} */ + _process; + /** @type {string} */ + _filePath; + + get _args() { + const args = [ + // TODO + "-protocol_whitelist", + "pipe,udp,rtp", + "-fflags", + "+genpts", + "-f", + "sdp", + "-i", + "pipe:0", + "-f", + recording.fileType, + this._filePath, + ]; + if (LOG_LEVEL === "debug") { + args.unshift("-loglevel", "debug"); + } + return args; + } + + /** + * @param {string} filePath + */ + constructor(filePath) { + super(); + this._filePath = filePath; + } + + /** + * @param {RtpData[]} audioRtps + * @param {RtpData[]} videoRtps + */ + async spawn(audioRtps, videoRtps) { + const sdp = formatFfmpegSdp(audioRtps, videoRtps); + this._process = child_process.spawn("ffmpeg", this._args, { + stdio: ["pipe", "pipe", process.stderr], + }); + + if (!this._process.stdin.writable) { + throw new Error("FFMPEG stdin not writable."); + } + this._process.stdin.write(sdp); // TODO (maybe pass args earlier) + this._process.stdin.end(); + + this._process.stdout.on("data", (chunk) => { + this.emit("data", chunk); // Emit data chunks as they become available + // may need to ues this to pipe to request if file stream does not work + }); + + this._process.on("close", (code) => { + if (code === 0) { + this.emit("success"); + } + }); + + logger.debug( + `FFMPEG process (pid:${this._process.pid}) spawned, outputting to ${this._filePath}` + ); + } + + kill() { + this._process?.kill("SIGINT"); + } +} diff --git a/src/utils/utils.js b/src/utils/utils.js index ddee4c4..d1c6b43 100644 --- a/src/utils/utils.js +++ b/src/utils/utils.js @@ -165,42 +165,3 @@ export function getAllowedCodecs() { } return codecs; } - -/** - * hard-coded ffmpeg sdp fragments for layouts with 1...4 videos - * TODO make the right resizing and vstack/hstack params - */ -const LAYOUT = { - 1: "TODO layout for 1 video", - 2: "TODO layout for 2 videos", - 3: "TODO layout for 3 videos", - 4: "TODO layout for 4 videos", -}; - -/** - * TODO - * @param {RtpData[]} audioRtps - * @param {RtpData[]} videoRtps - * @return {string[]} - */ -export function formatFfmpegSdp(audioRtps, videoRtps) { - // array of strings containing the sdp for ffmpeg, related to the stacking of videos - const sdp = ["v=0", "o=- 0 0 IN IP4 127.0.0.1", "s=FFmpeg", "c=IN IP4 127.0.0.1", "t=0 0"]; - const layout = LAYOUT[videoRtps.length]; - if (!layout) { - throw new Error(`unsupported layout for ${videoRtps.length} videos`); - } - for (const audioRtp of audioRtps) { - sdp.push(`m=audio ${audioRtp.port} RTP/AVP ${audioRtp.payloadType}`); - sdp.push(`a=rtpmap:${audioRtp.payloadType} ${audioRtp.codec}/${audioRtp.clockRate}`); - sdp.push(`a=sendonly`); - } - for (const videoRtp of videoRtps) { - // TODO do something with layout. Layout may contain a format function that takes below values as params, or the whole videoRtps[]. - sdp.push(`m=video ${videoRtp.port} RTP/AVP ${videoRtp.payloadType}`); - sdp.push(`a=rtpmap:${videoRtp.payloadType} ${videoRtp.codec}/${videoRtp.clockRate}`); - sdp.push(`a=sendonly`); - } - // TODO, layout only a small part of the full SDP. - return sdp; -} From 04ed5903ac6866557d9a7e22f31468cca2a371f7 Mon Sep 17 00:00:00 2001 From: TSO Date: Thu, 28 Nov 2024 08:01:32 +0100 Subject: [PATCH 05/11] [IMP] WIP: / --- src/models/channel.js | 12 +++++++++++- src/models/recorder.js | 27 ++++++++++++++++++++++----- src/services/http.js | 27 ++++++++++++++++++++++++--- src/services/ws.js | 5 +---- src/utils/ffmpeg.js | 10 ++-------- 5 files changed, 60 insertions(+), 21 deletions(-) diff --git a/src/models/channel.js b/src/models/channel.js index 084f73f..b22064a 100644 --- a/src/models/channel.js +++ b/src/models/channel.js @@ -30,6 +30,16 @@ const mediaCodecs = getAllowedCodecs(); * @fires Channel#sessionLeave * @fires Channel#close */ + +/** + * @typedef {Object} ChannelStats + * @property {string} uuid + * @property {string} remoteAddress + * @property {SessionsStats} sessionsStats + * @property {string} createDate + * @property {boolean} webRtcEnabled + */ + export class Channel extends EventEmitter { /** @type {Map} */ static records = new Map(); @@ -158,7 +168,7 @@ export class Channel extends EventEmitter { } /** - * @returns {Promise<{ uuid: string, remoteAddress: string, sessionsStats: SessionsStats, createDate: string }>} + * @returns {Promise} */ async getStats() { return { diff --git a/src/models/recorder.js b/src/models/recorder.js index a353174..c5d6a9a 100644 --- a/src/models/recorder.js +++ b/src/models/recorder.js @@ -12,6 +12,12 @@ import * as config from "#src/config.js"; const logger = new Logger("RECORDER"); +export const RECORDER_STATE = { + IDLE: "IDLE", + RECORDING: "RECORDING", + UPLOADING: "UPLOADING", +}; + fs.mkdir(recording.directory, { recursive: true }, (err) => { if (err) { logger.error(err); @@ -50,14 +56,16 @@ export class Recorder extends EventEmitter { uuid; /** @type {import("#src/models/channel").Channel} */ channel; - /** @type {string} */ - state; /** @type {FFMPEG} */ ffmpeg; /** @type {string} */ filePath; + /** @type {string} */ + _destination; /** @type {number} */ _limitTimeout; + /** @type {RECORDER_STATE[keyof RECORDER_STATE]} */ + _state = RECORDER_STATE.IDLE; /** * @param {string} uuid * @param {http.ServerResponse} res @@ -81,9 +89,18 @@ export class Recorder extends EventEmitter { this._destination = destination; } - /** @returns {number} */ - get videoCount() { - return this._rtpTransports.camera.length + this._rtpTransports.screen.length; + /** + * @param {RECORDER_STATE[keyof RECORDER_STATE]} state + * @fires Recorder#stateChange + */ + set state(state) { + this._state = state; + /** + * stateChange event. + * @event Recorder#stateChange + * @type {string} `RECORDER_STATE` + */ + this.emit("stateChange", state); } /** diff --git a/src/services/http.js b/src/services/http.js index 820f804..27e6640 100644 --- a/src/services/http.js +++ b/src/services/http.js @@ -33,12 +33,22 @@ let httpServer; export async function start({ httpInterface = config.HTTP_INTERFACE, port = config.PORT } = {}) { logger.info("starting..."); const routeListener = new RouteListener(); + /** + * A no-operation endpoint that returns a simple "ok" response. + * + * @returns {http.ServerResponse>}. + */ routeListener.get(`/v${API_VERSION}/noop`, { callback: (req, res) => { res.statusCode = 200; return res.end(JSON.stringify({ result: "ok" })); }, }); + /** + * Retrieves statistics for all channels. + * + * @returns {http.ServerResponse>} + */ routeListener.get(`/v${API_VERSION}/stats`, { callback: async (req, res) => { const proms = []; @@ -50,6 +60,12 @@ export async function start({ httpInterface = config.HTTP_INTERFACE, port = conf return res.end(JSON.stringify(channelStats)); }, }); + /** + * @param {URLSearchParams} searchParams (query parameters) + * @param {"true" | "false"} searchParams.webRTC whether to use WebRTC or not + * @param {string} searchParams.uploadRoute the route to which recordings will be uploaded + * @returns {http.ServerResponse>} + */ routeListener.get(`/v${API_VERSION}/channel`, { callback: async (req, res, { host, protocol, remoteAddress, searchParams }) => { try { @@ -61,11 +77,10 @@ export async function start({ httpInterface = config.HTTP_INTERFACE, port = conf res.statusCode = 403; // forbidden return res.end(); } - const { webRTC, uploadRoute } = searchParams; const options = { key: claims.key, - useWebRtc: webRTC !== "false", - uploadRoute, + useWebRtc: searchParams.get("webRTC") !== "false", + uploadRoute: searchParams.get("uploadRoute"), }; const channel = await Channel.create(remoteAddress, claims.iss, options); res.setHeader("Content-Type", "application/json"); @@ -82,6 +97,9 @@ export async function start({ httpInterface = config.HTTP_INTERFACE, port = conf return res.end(); }, }); + /** + * Streams a recording with the specified UUID. + */ routeListener.get(`/v${API_VERSION}/recording/`, { callback: async (req, res, { remoteAddress, match }) => { try { @@ -95,6 +113,9 @@ export async function start({ httpInterface = config.HTTP_INTERFACE, port = conf } }, }); + /** + * Disconnects sessions from channels based on the provided JWT. + */ routeListener.post(`/v${API_VERSION}/disconnect`, { callback: async (req, res, { remoteAddress }) => { try { diff --git a/src/services/ws.js b/src/services/ws.js index 48295ac..826414f 100644 --- a/src/services/ws.js +++ b/src/services/ws.js @@ -122,10 +122,7 @@ async function connect(webSocket, { channelUUID, jwt }) { if (!session_id) { throw new AuthenticationError("Malformed JWT payload"); } - const initData = { - features: channel.features, - }; - webSocket.send(JSON.stringify(initData)); // client can start using ws after this message. + webSocket.send(JSON.stringify({ features: channel.features })); // client can start using ws after this message. const bus = new Bus(webSocket, { batchDelay: config.timeouts.busBatch }); const { session } = Channel.join(channel.uuid, session_id); session.once("close", ({ code }) => { diff --git a/src/utils/ffmpeg.js b/src/utils/ffmpeg.js index 3f8c17f..3f3c411 100644 --- a/src/utils/ffmpeg.js +++ b/src/utils/ffmpeg.js @@ -62,7 +62,7 @@ export class FFMPEG extends EventEmitter { /** @type {string} */ _filePath; - get _args() { + get _processArgs() { const args = [ // TODO "-protocol_whitelist", @@ -97,7 +97,7 @@ export class FFMPEG extends EventEmitter { */ async spawn(audioRtps, videoRtps) { const sdp = formatFfmpegSdp(audioRtps, videoRtps); - this._process = child_process.spawn("ffmpeg", this._args, { + this._process = child_process.spawn("ffmpeg", this._processArgs, { stdio: ["pipe", "pipe", process.stderr], }); @@ -106,12 +106,6 @@ export class FFMPEG extends EventEmitter { } this._process.stdin.write(sdp); // TODO (maybe pass args earlier) this._process.stdin.end(); - - this._process.stdout.on("data", (chunk) => { - this.emit("data", chunk); // Emit data chunks as they become available - // may need to ues this to pipe to request if file stream does not work - }); - this._process.on("close", (code) => { if (code === 0) { this.emit("success"); From 78282aa5ab0be3bc93f71cd07761ed04cd12f5c0 Mon Sep 17 00:00:00 2001 From: TSO Date: Thu, 28 Nov 2024 08:34:23 +0100 Subject: [PATCH 06/11] [IMP] WIP: / --- src/models/recorder.js | 17 +++++++++++++---- src/services/http.js | 2 +- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/src/models/recorder.js b/src/models/recorder.js index c5d6a9a..6005502 100644 --- a/src/models/recorder.js +++ b/src/models/recorder.js @@ -104,7 +104,7 @@ export class Recorder extends EventEmitter { } /** - * @param {Array} ids + * @param {Array} ids TODO may specify more than just ids, maybe we want specific streams. could be some tuple [id, mediaTypes] * @returns {string} filePath */ async start(ids) { @@ -145,9 +145,18 @@ export class Recorder extends EventEmitter { return this.filePath; } update(ids) { - // TODO see if ffmpeg input can be re-configured at runtime, otherwise no support or full restart - // could also see if the consumer of the RtpTransport can be swapped at runtime, in which case, RtpTransport should - // be owned by the Recorder (4 RtpTransport per recorder, and consume on demand). + /** TODO see if ffmpeg input can be re-configured at runtime, otherwise full restart + * Possibilities for hot-swap: + * - ffmpeg stdin is writable, so it may be possible to write new sdp (with new inputs) to it + * - could see if the consumer of the RtpTransport can be swapped at runtime, in which case, RtpTransport should + * be owned by the Recorder (4 RtpTransport per recorder, and consume on demand). + * If hot-swap is not possible: + * Kill the ffmpeg process and register the path in a queue (array). + * Keep killing and starting processes as update is called, + * kill should happen as late as possible (when next process has started) to avoid losses. + * When "upload" is called, first use ffmpeg again to merge all the files in the queue. + * then upload that real file. (if queue.length === 1, just upload that file). + */ return this.filePath; } stop() { diff --git a/src/services/http.js b/src/services/http.js index 27e6640..2ca7de7 100644 --- a/src/services/http.js +++ b/src/services/http.js @@ -80,7 +80,7 @@ export async function start({ httpInterface = config.HTTP_INTERFACE, port = conf const options = { key: claims.key, useWebRtc: searchParams.get("webRTC") !== "false", - uploadRoute: searchParams.get("uploadRoute"), + uploadRoute: searchParams.get("uploadRoute"), // TODO this route should be constrained to avoid being use for DDoS (eg for a malicious Odoo.sh customer) }; const channel = await Channel.create(remoteAddress, claims.iss, options); res.setHeader("Content-Type", "application/json"); From 56cf697373c61fc985f0926d608448060f8152af Mon Sep 17 00:00:00 2001 From: TSO Date: Thu, 28 Nov 2024 13:40:45 +0100 Subject: [PATCH 07/11] [IMP] WIP: / --- src/models/recorder.js | 93 ++++++++++++++++++++++++++---------------- src/services/http.js | 15 ++++++- src/utils/ffmpeg.js | 7 +--- 3 files changed, 71 insertions(+), 44 deletions(-) diff --git a/src/models/recorder.js b/src/models/recorder.js index 6005502..3d4350a 100644 --- a/src/models/recorder.js +++ b/src/models/recorder.js @@ -16,6 +16,7 @@ export const RECORDER_STATE = { IDLE: "IDLE", RECORDING: "RECORDING", UPLOADING: "UPLOADING", + UPDATING: "UPDATING", }; fs.mkdir(recording.directory, { recursive: true }, (err) => { @@ -59,26 +60,14 @@ export class Recorder extends EventEmitter { /** @type {FFMPEG} */ ffmpeg; /** @type {string} */ - filePath; - /** @type {string} */ _destination; /** @type {number} */ _limitTimeout; + /** @type {string[]} */ + _tempFilePathAccumulator = []; /** @type {RECORDER_STATE[keyof RECORDER_STATE]} */ _state = RECORDER_STATE.IDLE; - /** - * @param {string} uuid - * @param {http.ServerResponse} res - */ - static pipeToResponse(uuid, res) { - // TODO check if this can be executed, otherwise end request, or throw error (http service will throw anyways) - const fileStream = fs.createReadStream(Recorder.generatedFiles.get(uuid)); // may need to be explicitly closed? - res.writeHead(200, { - "Content-Type": `video/${recording.fileType}`, - "Content-Disposition": "inline", - }); - fileStream.pipe(res); // Pipe the file stream to the response - } + /** * @param {import("#src/models/channel").Channel} channel * @param {string} destination url to send the file to @@ -105,13 +94,22 @@ export class Recorder extends EventEmitter { /** * @param {Array} ids TODO may specify more than just ids, maybe we want specific streams. could be some tuple [id, mediaTypes] - * @returns {string} filePath */ async start(ids) { if (this.ffmpeg) { - return this.filePath; + logger.debug("Already recording"); + return; } + this._limitTimeout = setTimeout(() => { + this.upload(); + }, recording.maxDuration); this.uuid = crypto.randomUUID(); + return this._start_fragment(ids); + } + + async _start_fragment(ids) { + const oldProcess = this.ffmpeg; + this.state = RECORDER_STATE.UPDATING; const audioRtps = []; const videoRtps = []; for (const id of ids) { @@ -125,26 +123,20 @@ export class Recorder extends EventEmitter { } } } - this.filePath = path.join(recording.directory, `call_${Date.now()}.${recording.fileType}`); - this.ffmpeg = new FFMPEG(this.filePath); + const tempPath = path.join(recording.directory, `call_${Date.now()}.${recording.fileType}`); + this.ffmpeg = new FFMPEG(tempPath); try { await this.ffmpeg.spawn(audioRtps, videoRtps); // args should be base on the rtp transports + this.state = RECORDER_STATE.RECORDING; } catch (error) { logger.error(`Failed to start recording: ${error.message}`); - this.ffmpeg?.kill(); - this.ffmpeg = undefined; - return; + this.stop(); } - this._limitTimeout = setTimeout(() => { - this.upload(); - }, recording.maxDuration); - Recorder.generatedFiles.set(this.uuid, this.filePath); - this.ffmpeg.once("success", () => { - this.emit("download-ready", this.filePath); - }); - return this.filePath; + oldProcess?.kill(); + this._tempFilePathAccumulator.push(tempPath); } - update(ids) { + + async update(ids) { /** TODO see if ffmpeg input can be re-configured at runtime, otherwise full restart * Possibilities for hot-swap: * - ffmpeg stdin is writable, so it may be possible to write new sdp (with new inputs) to it @@ -157,21 +149,41 @@ export class Recorder extends EventEmitter { * When "upload" is called, first use ffmpeg again to merge all the files in the queue. * then upload that real file. (if queue.length === 1, just upload that file). */ - return this.filePath; + await this._start_fragment(ids); } stop() { this.ffmpeg?.kill(); this.uuid = undefined; this.ffmpeg = undefined; + this._tempFilePathAccumulator = []; // TODO probably also delete all files here clearTimeout(this._limitTimeout); + this.state = RECORDER_STATE.IDLE; } - upload() { + + /** + * @fires Recorder#ready + */ + async upload() { + const filePaths = this._tempFilePathAccumulator; this.stop(); if (!this._destination) { logger.warn(`No upload destination set for ${this.uuid}`); return; } - const fileStream = fs.createReadStream(this.filePath); + this.state = RECORDER_STATE.UPLOADING; + let filePath; + if (filePaths.length === 1) { + filePath = filePaths[0]; + } else { + filePath = await this._mergeFiles(filePaths); + } + Recorder.generatedFiles.set(this.uuid, filePath); + /** + * @event Recorder#ready + * @type {string} `filePath` + */ + this.emit("ready", filePath); + const fileStream = fs.createReadStream(filePath); const { hostname, pathname, protocol } = new URL(this._destination); const options = { hostname, @@ -179,10 +191,10 @@ export class Recorder extends EventEmitter { method: "POST", headers: { "Content-Type": "application/octet-stream", - "Content-Length": fs.statSync(this.filePath).size, + "Content-Length": fs.statSync(filePath).size, }, }; - // TODO this should be a special route that has a generous upload limit + // TODO implement the route and route-passing in odoo/discuss const request = (protocol === "https:" ? https : http).request(options, (res) => { if (res.statusCode === 200) { logger.info(`File uploaded to ${this._destination}`); @@ -195,5 +207,14 @@ export class Recorder extends EventEmitter { logger.error(`Failed to upload file: ${error.message}`); }); fileStream.pipe(request); + this.state = RECORDER_STATE.IDLE; + } + + /** + * @param {string[]} filePaths + */ + async _mergeFiles(filePaths) { + // TODO + return filePaths[1]; } } diff --git a/src/services/http.js b/src/services/http.js index 2ca7de7..6ec9bcd 100644 --- a/src/services/http.js +++ b/src/services/http.js @@ -7,6 +7,8 @@ import { Logger, parseBody, extractRequestInfo } from "#src/utils/utils.js"; import { SESSION_CLOSE_CODE } from "#src/models/session.js"; import { Channel } from "#src/models/channel.js"; import { Recorder } from "#src/models/recorder.js"; +import fs from "node:fs"; +import path from "node:path"; /** * @typedef {function} routeCallback @@ -105,8 +107,17 @@ export async function start({ httpInterface = config.HTTP_INTERFACE, port = conf try { const { uuid } = match; logger.info(`[${remoteAddress}]: requested recording ${uuid}`); - Recorder.pipeToResponse(uuid, res); - // res not ended as we are streaming + const filePath = Recorder.generatedFiles.get(uuid); + if (!filePath) { + res.statusCode = 404; + return res.end(); + } + res.setHeader("Content-Type", "application/octet-stream"); + res.setHeader( + "Content-Disposition", + `attachment; filename="${path.basename(filePath)}"` + ); + return fs.createReadStream(filePath).pipe(res); } catch (error) { logger.error(`[${remoteAddress}] failed to obtain recording: ${error.message}`); return res.end(); diff --git a/src/utils/ffmpeg.js b/src/utils/ffmpeg.js index 3f3c411..c0686ec 100644 --- a/src/utils/ffmpeg.js +++ b/src/utils/ffmpeg.js @@ -36,12 +36,7 @@ function formatFfmpegSdp(audioRtps, videoRtps) { } sdp.push(`-c:a aac -b:a 160k -ac 2 -filter_complex amerge=inputs=${audioRtps.length}`); if (videoRtps.length > 0) { - sdp.push( - "-movflags", - "frag_keyframe+empty_moov+default_base_moof", // fragmented for streaming although could use another format if dropping the pipe feature - "-c:v", - "mp4v" - ); + sdp.push("-c:v", "mp4v"); for (const videoRtp of videoRtps) { sdp.push(`m=video ${videoRtp.port} RTP/AVP ${videoRtp.payloadType}`); sdp.push(`a=rtpmap:${videoRtp.payloadType} ${videoRtp.codec}/${videoRtp.clockRate}`); From 8cb51cc444a5b1a758416fb61d7a1aa38d759541 Mon Sep 17 00:00:00 2001 From: TSO Date: Fri, 29 Nov 2024 11:13:39 +0100 Subject: [PATCH 08/11] [IMP] WIP: / (reminder to install ffmpeg on server) --- src/config.js | 4 ++- src/models/recorder.js | 79 +++++++++++++++++++----------------------- src/models/session.js | 6 +++- src/services/http.js | 30 +++++++++++++++- src/services/ws.js | 3 +- src/utils/ffmpeg.js | 61 ++++++++++++++++++++++---------- 6 files changed, 117 insertions(+), 66 deletions(-) diff --git a/src/config.js b/src/config.js index edd5caa..a243e4e 100644 --- a/src/config.js +++ b/src/config.js @@ -216,7 +216,9 @@ export const recording = Object.freeze({ maxDuration: 1000 * 60 * 60, // 1 hour fileTTL: 1000 * 60 * 60 * 24, // 24 hours fileType: "mp4", - videoLimit: 4, // how many videos can be merged into one recording + audioLimit: 20, + cameraLimit: 4, // how many camera can be merged into one recording + screenLimit: 1, }); export const dynamicPorts = Object.freeze({ diff --git a/src/models/recorder.js b/src/models/recorder.js index 3d4350a..9f4d67f 100644 --- a/src/models/recorder.js +++ b/src/models/recorder.js @@ -1,7 +1,5 @@ import path from "node:path"; import fs from "node:fs"; -import * as https from "node:https"; -import http from "node:http"; import { EventEmitter } from "node:events"; // TODO remove if unnecessary import { Logger } from "#src/utils/utils.js"; @@ -9,6 +7,7 @@ import { STREAM_TYPE } from "#src/shared/enums.js"; import { FFMPEG } from "#src/utils/ffmpeg.js"; import { recording } from "#src/config.js"; import * as config from "#src/config.js"; +import { upload as httpUpload } from "#src/services/http.js"; const logger = new Logger("RECORDER"); @@ -50,6 +49,10 @@ export function clearDirectory() { }); } +/** + * @fires Recorder#stateChange + * @fires Recorder#ready + */ export class Recorder extends EventEmitter { /** @type {Map} */ static generatedFiles = new Map(); @@ -111,22 +114,31 @@ export class Recorder extends EventEmitter { const oldProcess = this.ffmpeg; this.state = RECORDER_STATE.UPDATING; const audioRtps = []; - const videoRtps = []; + const cameraRtps = []; + const screenRtps = []; for (const id of ids) { const session = this.channel.sessions.get(id); - const audioRtpData = session.getRtp(STREAM_TYPE.AUDIO); - audioRtpData && audioRtps.push(audioRtpData); - for (const type in [STREAM_TYPE.CAMERA, STREAM_TYPE.SCREEN]) { - if (videoRtps.length < recording.videoLimit) { - const videoRtpData = session.getRtp(type); - videoRtpData && videoRtps.push(videoRtpData); - } + if (!session) { + logger.warn(`Session ${id} not found`); + continue; + } + if (audioRtps.length < recording.audioLimit) { + const audioRtpData = session.getRtp(STREAM_TYPE.AUDIO); + audioRtpData && audioRtps.push(audioRtpData); + } + if (cameraRtps.length < recording.cameraLimit) { + const cameraRtpData = session.getRtp(STREAM_TYPE.CAMERA); + cameraRtpData && cameraRtps.push(cameraRtpData); + } + if (screenRtps.length < recording.screenLimit) { + const screenRtpData = session.getRtp(STREAM_TYPE.SCREEN); + screenRtpData && screenRtps.push(screenRtpData); } } const tempPath = path.join(recording.directory, `call_${Date.now()}.${recording.fileType}`); this.ffmpeg = new FFMPEG(tempPath); try { - await this.ffmpeg.spawn(audioRtps, videoRtps); // args should be base on the rtp transports + await this.ffmpeg.start(audioRtps, screenRtps, cameraRtps); // args should be base on the rtp transports this.state = RECORDER_STATE.RECORDING; } catch (error) { logger.error(`Failed to start recording: ${error.message}`); @@ -170,43 +182,19 @@ export class Recorder extends EventEmitter { logger.warn(`No upload destination set for ${this.uuid}`); return; } - this.state = RECORDER_STATE.UPLOADING; - let filePath; - if (filePaths.length === 1) { - filePath = filePaths[0]; - } else { - filePath = await this._mergeFiles(filePaths); + if (filePaths.length === 0) { + logger.warn(`No files to upload for ${this.uuid}`); + return; } + this.state = RECORDER_STATE.UPLOADING; + const filePath = await this._mergeFiles(filePaths); Recorder.generatedFiles.set(this.uuid, filePath); /** * @event Recorder#ready - * @type {string} `filePath` + * @type {string} `uuid` */ - this.emit("ready", filePath); - const fileStream = fs.createReadStream(filePath); - const { hostname, pathname, protocol } = new URL(this._destination); - const options = { - hostname, - path: pathname, - method: "POST", - headers: { - "Content-Type": "application/octet-stream", - "Content-Length": fs.statSync(filePath).size, - }, - }; - // TODO implement the route and route-passing in odoo/discuss - const request = (protocol === "https:" ? https : http).request(options, (res) => { - if (res.statusCode === 200) { - logger.info(`File uploaded to ${this._destination}`); - // TODO delete file - } else { - logger.error(`Failed to upload file: ${res.statusCode}`); - } - }); - request.once("error", (error) => { - logger.error(`Failed to upload file: ${error.message}`); - }); - fileStream.pipe(request); + this.emit("ready", this.uuid); + httpUpload(filePath, this._destination); this.state = RECORDER_STATE.IDLE; } @@ -215,6 +203,9 @@ export class Recorder extends EventEmitter { */ async _mergeFiles(filePaths) { // TODO - return filePaths[1]; + if (filePaths.length === 1) { + return filePaths[0]; + } + return filePaths[1]; // TODO merge logic with FFMPEG } } diff --git a/src/models/session.js b/src/models/session.js index b09f853..c0c6a93 100644 --- a/src/models/session.js +++ b/src/models/session.js @@ -51,6 +51,7 @@ import { getPort, releasePort } from "#src/services/resources.js"; * @property {number} clockRate * @property {string} codec * @property {string} channels + * @property {string} label */ /** @@ -101,6 +102,8 @@ export class Session extends EventEmitter { }); /** @type {string} */ remote; + /** @type {string} */ + userName; /** @type {Map} */ _consumers = new Map(); /** @type {Producers} */ @@ -156,7 +159,7 @@ export class Session extends EventEmitter { * @returns {string} */ get name() { - return `${this._channel.name}:${this.id}@${this.remote}`; + return `${this._channel.name}:${this.id}@${this.remote} (${this.userName})`; } /** @@ -539,6 +542,7 @@ export class Session extends EventEmitter { clockRate: codecData.clockRate, codec: codecData.mimeType.replace(`${producer.kind}`, ""), channels: producer.kind === "audio" ? codecData.channels : undefined, + label: this.userName, }; return this._rtp[type]; } diff --git a/src/services/http.js b/src/services/http.js index 6ec9bcd..f910a32 100644 --- a/src/services/http.js +++ b/src/services/http.js @@ -9,6 +9,7 @@ import { Channel } from "#src/models/channel.js"; import { Recorder } from "#src/models/recorder.js"; import fs from "node:fs"; import path from "node:path"; +import * as https from "node:https"; /** * @typedef {function} routeCallback @@ -64,7 +65,7 @@ export async function start({ httpInterface = config.HTTP_INTERFACE, port = conf }); /** * @param {URLSearchParams} searchParams (query parameters) - * @param {"true" | "false"} searchParams.webRTC whether to use WebRTC or not + * @param {undefined | "false"} searchParams.webRTC whether to use WebRTC or not * @param {string} searchParams.uploadRoute the route to which recordings will be uploaded * @returns {http.ServerResponse>} */ @@ -174,6 +175,33 @@ export function close() { httpServer?.close(); } +export function upload(filePath, destination) { + const fileStream = fs.createReadStream(filePath); + const { hostname, pathname, protocol } = new URL(destination); + const options = { + hostname, + path: pathname, + method: "POST", + headers: { + "Content-Type": "application/octet-stream", + "Content-Length": fs.statSync(filePath).size, + }, + }; + // TODO implement the route and route-passing in odoo/discuss + const request = (protocol === "https:" ? https : http).request(options, (res) => { + if (res.statusCode === 200) { + logger.info(`File uploaded to ${destination}`); + // TODO delete file + } else { + logger.error(`Failed to upload file: ${res.statusCode}`); + } + }); + request.on("error", (error) => { + logger.error(`Failed to upload file: ${error.message}`); + }); + fileStream.pipe(request); +} + class RouteListener { /** @type {Map} */ GETs = new Map(); diff --git a/src/services/ws.js b/src/services/ws.js index 826414f..42652dd 100644 --- a/src/services/ws.js +++ b/src/services/ws.js @@ -106,7 +106,7 @@ async function connect(webSocket, { channelUUID, jwt }) { let channel = Channel.records.get(channelUUID); /** @type {{sfu_channel_uuid: string, session_id: number, ice_servers: Object[] }} */ const authResult = await verify(jwt, channel?.key); - const { sfu_channel_uuid, session_id, ice_servers } = authResult; + const { sfu_channel_uuid, session_id, ice_servers, user_name } = authResult; if (!channelUUID && sfu_channel_uuid) { // Cases where the channelUUID is not provided in the credentials for backwards compatibility with version 1.1 and earlier. channel = Channel.records.get(sfu_channel_uuid); @@ -125,6 +125,7 @@ async function connect(webSocket, { channelUUID, jwt }) { webSocket.send(JSON.stringify({ features: channel.features })); // client can start using ws after this message. const bus = new Bus(webSocket, { batchDelay: config.timeouts.busBatch }); const { session } = Channel.join(channel.uuid, session_id); + session.userName = user_name; session.once("close", ({ code }) => { let wsCloseCode = WS_CLOSE_CODE.CLEAN; switch (code) { diff --git a/src/utils/ffmpeg.js b/src/utils/ffmpeg.js index c0686ec..dd027a4 100644 --- a/src/utils/ffmpeg.js +++ b/src/utils/ffmpeg.js @@ -10,40 +10,63 @@ const logger = new Logger("FFMPEG"); * hard-coded ffmpeg sdp fragments for layouts with 1...4 videos * TODO make the right resizing and vstack/hstack params */ -const LAYOUT = { - 1: "", - 2: "a=filter:complex [0:v][1:v]hstack=inputs=2[v]; -map [v]", - 3: "a=filter:complex [0:v][1:v]hstack=inputs=2[top];[top][2:v]vstack=inputs=2[v]; -map [v]", - 4: "a=filter:complex [0:v][1:v]hstack=inputs=2[top];[2:v][3:v]hstack=inputs=2[bottom];[top][bottom]vstack=inputs=2[v]; -map [v]", + +const drawText = (label, index) => `[${index}:v]drawtext=text='${label}':x=10:y=h-30[v${index}]`; + +const SCREEN_LAYOUT = { + 1: (labels) => `a=filter:complex ${drawText(labels[0], 0)}; -map [v0]`, + 2: (labels) => + `a=filter:complex ${drawText(labels[0], 0)};${drawText( + labels[1], + 1 + )};[v0][v1]hstack=inputs=2[v]; -map [v]`, + 3: (labels) => + `a=filter:complex ${drawText(labels[0], 0)};${drawText( + labels[1], + 1 + )};[v0][v1]hstack=inputs=2[top];${drawText( + labels[2], + 2 + )};[top][v2]vstack=inputs=2[v]; -map [v]`, + 4: (labels) => + `a=filter:complex ${drawText(labels[0], 0)};${drawText( + labels[1], + 1 + )};[v0][v1]hstack=inputs=2[top];${drawText(labels[2], 2)};${drawText( + labels[3], + 3 + )};[v2][v3]hstack=inputs=2[bottom];[top][bottom]vstack=inputs=2[v]; -map [v]`, }; /** * TODO * @param {RtpData[]} audioRtps - * @param {RtpData[]} videoRtps + * @param {RtpData[]} cameraRtps + * @param {RtpData[]} screenRtps * @return {string} */ -function formatFfmpegSdp(audioRtps, videoRtps) { +function formatFfmpegSdp({ audioRtps, screenRtps, cameraRtps }) { + logger.info(`TODO: ${screenRtps}`); const sdp = ["v=0", "o=- 0 0 IN IP4 127.0.0.1", "s=FFmpeg", "c=IN IP4 127.0.0.1", "t=0 0"]; - const layout = LAYOUT[videoRtps.length]; - if (!layout) { - throw new Error(`unsupported layout for ${videoRtps.length} videos`); - } for (const audioRtp of audioRtps) { sdp.push(`m=audio ${audioRtp.port} RTP/AVP ${audioRtp.payloadType}`); sdp.push(`a=rtpmap:${audioRtp.payloadType} ${audioRtp.codec}/${audioRtp.clockRate}`); sdp.push(`a=sendonly`); } sdp.push(`-c:a aac -b:a 160k -ac 2 -filter_complex amerge=inputs=${audioRtps.length}`); - if (videoRtps.length > 0) { + if (cameraRtps.length > 0) { + const layout = SCREEN_LAYOUT[cameraRtps.length]; + if (!layout) { + throw new Error(`unsupported layout for ${cameraRtps.length} videos`); + } sdp.push("-c:v", "mp4v"); - for (const videoRtp of videoRtps) { + for (const videoRtp of cameraRtps) { sdp.push(`m=video ${videoRtp.port} RTP/AVP ${videoRtp.payloadType}`); sdp.push(`a=rtpmap:${videoRtp.payloadType} ${videoRtp.codec}/${videoRtp.clockRate}`); sdp.push(`a=sendonly`); } + sdp.push(`-filter_complex`, layout(cameraRtps.map((rtp) => rtp.label))); } - // TODO, layout only a small part of the full SDP. return sdp.join("\n"); } @@ -87,11 +110,13 @@ export class FFMPEG extends EventEmitter { } /** - * @param {RtpData[]} audioRtps - * @param {RtpData[]} videoRtps + * @param {Object} rtpInputs + * @param {RtpData[]} rtpInputs.audioRtps + * @param {RtpData[]} rtpInputs.screenRtps + * @param {RtpData[]} rtpInputs.cameraRtps */ - async spawn(audioRtps, videoRtps) { - const sdp = formatFfmpegSdp(audioRtps, videoRtps); + async start(rtpInputs) { + const sdp = formatFfmpegSdp(rtpInputs); this._process = child_process.spawn("ffmpeg", this._processArgs, { stdio: ["pipe", "pipe", process.stderr], }); From 98bad37c577074a42c5a97742db31b4275154ab5 Mon Sep 17 00:00:00 2001 From: TSO Date: Fri, 29 Nov 2024 11:37:40 +0100 Subject: [PATCH 09/11] [IMP] WIP: / scaling --- src/utils/ffmpeg.js | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/src/utils/ffmpeg.js b/src/utils/ffmpeg.js index dd027a4..5f2024b 100644 --- a/src/utils/ffmpeg.js +++ b/src/utils/ffmpeg.js @@ -14,28 +14,31 @@ const logger = new Logger("FFMPEG"); const drawText = (label, index) => `[${index}:v]drawtext=text='${label}':x=10:y=h-30[v${index}]`; const SCREEN_LAYOUT = { - 1: (labels) => `a=filter:complex ${drawText(labels[0], 0)}; -map [v0]`, + 1: (labels) => `a=filter:complex ${drawText(labels[0], 0)},scale=1280:720[v0]; -map [v0]`, 2: (labels) => - `a=filter:complex ${drawText(labels[0], 0)};${drawText( + `a=filter:complex ${drawText(labels[0], 0)},scale=640:720[v0];${drawText( labels[1], 1 - )};[v0][v1]hstack=inputs=2[v]; -map [v]`, + )},scale=640:720[v1];[v0][v1]hstack=inputs=2[v]; -map [v]`, 3: (labels) => - `a=filter:complex ${drawText(labels[0], 0)};${drawText( + `a=filter:complex ${drawText(labels[0], 0)},scale=640:360[v0];${drawText( labels[1], 1 - )};[v0][v1]hstack=inputs=2[top];${drawText( + )},scale=640:360[v1];[v0][v1]hstack=inputs=2[top];${drawText( labels[2], 2 - )};[top][v2]vstack=inputs=2[v]; -map [v]`, + )},scale=1280:360[v2];[top][v2]vstack=inputs=2[v]; -map [v]`, 4: (labels) => - `a=filter:complex ${drawText(labels[0], 0)};${drawText( + `a=filter:complex ${drawText(labels[0], 0)},scale=640:360[v0];${drawText( labels[1], 1 - )};[v0][v1]hstack=inputs=2[top];${drawText(labels[2], 2)};${drawText( + )},scale=640:360[v1];[v0][v1]hstack=inputs=2[top];${drawText( + labels[2], + 2 + )},scale=640:360[v2];${drawText( labels[3], 3 - )};[v2][v3]hstack=inputs=2[bottom];[top][bottom]vstack=inputs=2[v]; -map [v]`, + )},scale=640:360[v3];[v2][v3]hstack=inputs=2[bottom];[top][bottom]vstack=inputs=2[v]; -map [v]`, }; /** @@ -53,19 +56,19 @@ function formatFfmpegSdp({ audioRtps, screenRtps, cameraRtps }) { sdp.push(`a=rtpmap:${audioRtp.payloadType} ${audioRtp.codec}/${audioRtp.clockRate}`); sdp.push(`a=sendonly`); } - sdp.push(`-c:a aac -b:a 160k -ac 2 -filter_complex amerge=inputs=${audioRtps.length}`); + sdp.push(`-c:a aac -b:a 128k -ac 2 -filter_complex amerge=inputs=${audioRtps.length}`); if (cameraRtps.length > 0) { const layout = SCREEN_LAYOUT[cameraRtps.length]; if (!layout) { throw new Error(`unsupported layout for ${cameraRtps.length} videos`); } - sdp.push("-c:v", "mp4v"); for (const videoRtp of cameraRtps) { sdp.push(`m=video ${videoRtp.port} RTP/AVP ${videoRtp.payloadType}`); sdp.push(`a=rtpmap:${videoRtp.payloadType} ${videoRtp.codec}/${videoRtp.clockRate}`); sdp.push(`a=sendonly`); } sdp.push(`-filter_complex`, layout(cameraRtps.map((rtp) => rtp.label))); + sdp.push("-c:v libx264"); // TODO move outside of the condition, should also account for screenRtps } return sdp.join("\n"); } @@ -91,6 +94,10 @@ export class FFMPEG extends EventEmitter { "sdp", "-i", "pipe:0", + "-vf", + "scale=1280:720", // 720p + "-r", + "30", // 30fps "-f", recording.fileType, this._filePath, From 7140d373b021ad2e24fe8b83d1cff7973bb4e409 Mon Sep 17 00:00:00 2001 From: TSO Date: Fri, 29 Nov 2024 12:20:18 +0100 Subject: [PATCH 10/11] [IMP] WIP: / --- src/config.js | 2 ++ src/models/recorder.js | 23 +++++++++++++++++------ src/models/session.js | 2 +- src/utils/ffmpeg.js | 16 +++++++++++----- 4 files changed, 31 insertions(+), 12 deletions(-) diff --git a/src/config.js b/src/config.js index a243e4e..1663636 100644 --- a/src/config.js +++ b/src/config.js @@ -216,6 +216,8 @@ export const recording = Object.freeze({ maxDuration: 1000 * 60 * 60, // 1 hour fileTTL: 1000 * 60 * 60 * 24, // 24 hours fileType: "mp4", + videoCodec: "libx264", + audioCodec: "aac", audioLimit: 20, cameraLimit: 4, // how many camera can be merged into one recording screenLimit: 1, diff --git a/src/models/recorder.js b/src/models/recorder.js index 9f4d67f..5267788 100644 --- a/src/models/recorder.js +++ b/src/models/recorder.js @@ -113,8 +113,11 @@ export class Recorder extends EventEmitter { async _start_fragment(ids) { const oldProcess = this.ffmpeg; this.state = RECORDER_STATE.UPDATING; + /** @type {RtpData[]} */ const audioRtps = []; + /** @type {RtpData[]} */ const cameraRtps = []; + /** @type {RtpData[]} */ const screenRtps = []; for (const id of ids) { const session = this.channel.sessions.get(id); @@ -123,22 +126,25 @@ export class Recorder extends EventEmitter { continue; } if (audioRtps.length < recording.audioLimit) { - const audioRtpData = session.getRtp(STREAM_TYPE.AUDIO); + const audioRtpData = await session.getRtp(STREAM_TYPE.AUDIO); audioRtpData && audioRtps.push(audioRtpData); } if (cameraRtps.length < recording.cameraLimit) { - const cameraRtpData = session.getRtp(STREAM_TYPE.CAMERA); + const cameraRtpData = await session.getRtp(STREAM_TYPE.CAMERA); cameraRtpData && cameraRtps.push(cameraRtpData); } if (screenRtps.length < recording.screenLimit) { - const screenRtpData = session.getRtp(STREAM_TYPE.SCREEN); + const screenRtpData = await session.getRtp(STREAM_TYPE.SCREEN); screenRtpData && screenRtps.push(screenRtpData); } } - const tempPath = path.join(recording.directory, `call_${Date.now()}.${recording.fileType}`); + const tempPath = path.join( + recording.directory, + `__FRAGMENT__-${this.uuid}-${Date.now()}.${recording.fileType}` + ); this.ffmpeg = new FFMPEG(tempPath); try { - await this.ffmpeg.start(audioRtps, screenRtps, cameraRtps); // args should be base on the rtp transports + await this.ffmpeg.merge({ audioRtps, screenRtps, cameraRtps }); // args should be base on the rtp transports this.state = RECORDER_STATE.RECORDING; } catch (error) { logger.error(`Failed to start recording: ${error.message}`); @@ -206,6 +212,11 @@ export class Recorder extends EventEmitter { if (filePaths.length === 1) { return filePaths[0]; } - return filePaths[1]; // TODO merge logic with FFMPEG + const ffmpeg = new FFMPEG( + path.join(recording.directory, `__MERGED__-${this.uuid}.${recording.fileType}`) + ); + // should await for ffmpeg complete event. + await ffmpeg.concat(filePaths); + return ""; // TODO merge logic with FFMPEG } } diff --git a/src/models/session.js b/src/models/session.js index c0c6a93..0017723 100644 --- a/src/models/session.js +++ b/src/models/session.js @@ -498,7 +498,7 @@ export class Session extends EventEmitter { /** * @param {STREAM_TYPE[keyof STREAM_TYPE]} type - * @return {Promise} + * @return {RtpData} */ async getRtp(type) { if (this._rtp[type]) { diff --git a/src/utils/ffmpeg.js b/src/utils/ffmpeg.js index 5f2024b..a410dd5 100644 --- a/src/utils/ffmpeg.js +++ b/src/utils/ffmpeg.js @@ -48,7 +48,7 @@ const SCREEN_LAYOUT = { * @param {RtpData[]} screenRtps * @return {string} */ -function formatFfmpegSdp({ audioRtps, screenRtps, cameraRtps }) { +function formatSdp({ audioRtps, screenRtps, cameraRtps }) { logger.info(`TODO: ${screenRtps}`); const sdp = ["v=0", "o=- 0 0 IN IP4 127.0.0.1", "s=FFmpeg", "c=IN IP4 127.0.0.1", "t=0 0"]; for (const audioRtp of audioRtps) { @@ -56,7 +56,9 @@ function formatFfmpegSdp({ audioRtps, screenRtps, cameraRtps }) { sdp.push(`a=rtpmap:${audioRtp.payloadType} ${audioRtp.codec}/${audioRtp.clockRate}`); sdp.push(`a=sendonly`); } - sdp.push(`-c:a aac -b:a 128k -ac 2 -filter_complex amerge=inputs=${audioRtps.length}`); + sdp.push( + `-c:a ${recording.audioCodec} -b:a 128k -ac 2 -filter_complex amerge=inputs=${audioRtps.length}` + ); if (cameraRtps.length > 0) { const layout = SCREEN_LAYOUT[cameraRtps.length]; if (!layout) { @@ -68,7 +70,7 @@ function formatFfmpegSdp({ audioRtps, screenRtps, cameraRtps }) { sdp.push(`a=sendonly`); } sdp.push(`-filter_complex`, layout(cameraRtps.map((rtp) => rtp.label))); - sdp.push("-c:v libx264"); // TODO move outside of the condition, should also account for screenRtps + sdp.push(`-c:v ${recording.videoCodec}`); // TODO move outside of the condition, should also account for screenRtps } return sdp.join("\n"); } @@ -122,8 +124,8 @@ export class FFMPEG extends EventEmitter { * @param {RtpData[]} rtpInputs.screenRtps * @param {RtpData[]} rtpInputs.cameraRtps */ - async start(rtpInputs) { - const sdp = formatFfmpegSdp(rtpInputs); + async merge(rtpInputs) { + const sdp = formatSdp(rtpInputs); this._process = child_process.spawn("ffmpeg", this._processArgs, { stdio: ["pipe", "pipe", process.stderr], }); @@ -144,6 +146,10 @@ export class FFMPEG extends EventEmitter { ); } + concat(filePaths) { + return filePaths[0]; + } + kill() { this._process?.kill("SIGINT"); } From e7e1995cb8d560af2f7644dc106d4c6360ec980f Mon Sep 17 00:00:00 2001 From: TSO Date: Tue, 17 Dec 2024 14:18:21 +0100 Subject: [PATCH 11/11] [FIX] fixup: concat --- src/models/recorder.js | 1 + src/utils/ffmpeg.js | 28 ++++++++++++++++++++-------- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/src/models/recorder.js b/src/models/recorder.js index 5267788..d9b7fed 100644 --- a/src/models/recorder.js +++ b/src/models/recorder.js @@ -125,6 +125,7 @@ export class Recorder extends EventEmitter { logger.warn(`Session ${id} not found`); continue; } + // TODO could be parallelized if (audioRtps.length < recording.audioLimit) { const audioRtpData = await session.getRtp(STREAM_TYPE.AUDIO); audioRtpData && audioRtps.push(audioRtpData); diff --git a/src/utils/ffmpeg.js b/src/utils/ffmpeg.js index a410dd5..f7b42f6 100644 --- a/src/utils/ffmpeg.js +++ b/src/utils/ffmpeg.js @@ -125,20 +125,20 @@ export class FFMPEG extends EventEmitter { * @param {RtpData[]} rtpInputs.cameraRtps */ async merge(rtpInputs) { - const sdp = formatSdp(rtpInputs); - this._process = child_process.spawn("ffmpeg", this._processArgs, { + this._startProcess(this._processArgs, formatSdp(rtpInputs)); + } + + _startProcess(args, sdp) { + this._process = child_process.spawn("ffmpeg", args, { stdio: ["pipe", "pipe", process.stderr], }); - if (!this._process.stdin.writable) { throw new Error("FFMPEG stdin not writable."); } - this._process.stdin.write(sdp); // TODO (maybe pass args earlier) + this._process.stdin.write(sdp); this._process.stdin.end(); this._process.on("close", (code) => { - if (code === 0) { - this.emit("success"); - } + this.emit("complete", code); }); logger.debug( @@ -147,7 +147,19 @@ export class FFMPEG extends EventEmitter { } concat(filePaths) { - return filePaths[0]; + this._startProcess( + this._processArgs, // TODO check if all args make sense or if need custom + `-f concat -safe 0 -i ${filePaths.join("\n")} -c copy` // SDP + ); + return new Promise((resolve, reject) => { + this.once("complete", (code) => { + if (code === 0) { + resolve(this._filePath); + } else { + reject(new Error(`FFMPEG exited with code ${code}`)); + } + }); + }); } kill() {