From 57e9a6c476328b7e4f1cb7368f9ce33d3c99fc6f Mon Sep 17 00:00:00 2001 From: ThanhDodeurOdoo Date: Mon, 22 Sep 2025 11:09:42 +0200 Subject: [PATCH 01/12] [IMP] recording TODO --- src/config.ts | 24 ++++++++++++++++++++++++ src/models/channel.ts | 10 ++++++++-- src/models/recorder.ts | 18 ++++++++++++++++++ src/models/session.ts | 6 ++++++ src/services/auth.ts | 3 ++- src/services/http.ts | 3 ++- src/services/ws.ts | 3 ++- 7 files changed, 62 insertions(+), 5 deletions(-) create mode 100644 src/models/recorder.ts diff --git a/src/config.ts b/src/config.ts index 23474ca..53d218e 100644 --- a/src/config.ts +++ b/src/config.ts @@ -64,6 +64,11 @@ export const HTTP_INTERFACE: string = process.env.HTTP_INTERFACE || "0.0.0.0"; */ export const PORT: number = Number(process.env.PORT) || 8070; +/** + * Whether the recording feature is enabled, true by default. + */ +export const RECORDING: boolean = !FALSY_INPUT.has(process.env.LOG_TIMESTAMP!); + /** * The number of workers to spawn (up to core limits) to manage RTC servers. * 0 < NUM_WORKERS <= os.availableParallelism() @@ -197,6 +202,25 @@ export const timeouts: TimeoutConfig = Object.freeze({ busBatch: process.env.JEST_WORKER_ID ? 10 : 300 }); +export const recording = Object.freeze({ + directory: os.tmpdir() + "/recordings", + enabled: RECORDING, + maxDuration: 1000 * 60 * 60, // 1 hour, could be a env-var. + fileTTL: 1000 * 60 * 60 * 24, // 24 hours + fileType: "mp4", + videoCodec: "libx264", + audioCodec: "aac", + audioLimit: 20, + cameraLimit: 4, // how many camera can be merged into one recording + screenLimit: 1, +}); + +export const dynamicPorts = Object.freeze({ + min: 50000, + max: 59999, +}); + + // how many errors can occur before the session is closed, recovery attempts will be made until this limit is reached export const maxSessionErrors: number = 6; diff --git a/src/models/channel.ts b/src/models/channel.ts index 91889cc..f543ddd 100644 --- a/src/models/channel.ts +++ b/src/models/channel.ts @@ -12,6 +12,7 @@ import { type SessionId, type SessionInfo } from "#src/models/session.ts"; +import { Recorder } from "#src/models/recorder.ts"; import { getWorker, type RtcWorker } from "#src/services/rtc.ts"; const logger = new Logger("CHANNEL"); @@ -53,6 +54,7 @@ interface ChannelCreateOptions { key?: string; /** Whether to enable WebRTC functionality */ useWebRtc?: boolean; + useRecording?: boolean; } interface JoinResult { /** The channel instance */ @@ -87,6 +89,8 @@ export class Channel extends EventEmitter { public readonly sessions = new Map(); /** mediasoup Worker handling this channel */ private readonly _worker?: RtcWorker; + /** Manages the recording of this channel, undefined if the feature is disabled */ + private recorder?: Recorder; /** Timeout for auto-closing empty channels */ private _closeTimeout?: NodeJS.Timeout; @@ -102,7 +106,7 @@ export class Channel extends EventEmitter { issuer: string, options: ChannelCreateOptions = {} ): Promise { - const { key, useWebRtc = true } = options; + const { key, useWebRtc = true, useRecording = true } = options; const safeIssuer = `${remoteAddress}::${issuer}`; const oldChannel = Channel.recordsByIssuer.get(safeIssuer); if (oldChannel) { @@ -112,7 +116,7 @@ export class Channel extends EventEmitter { const channelOptions: ChannelCreateOptions & { worker?: Worker; router?: Router; - } = { key }; + } = { key, useRecording: useWebRtc && useRecording }; if (useWebRtc) { channelOptions.worker = await getWorker(); channelOptions.router = await channelOptions.worker.createRouter({ @@ -183,6 +187,8 @@ export class Channel extends EventEmitter { const now = new Date(); this.createDate = now.toISOString(); this.remoteAddress = remoteAddress; + this.recorder = config.recording.enabled && options.useRecording ? new Recorder(this) : undefined; + this.recorder?.todo(); this.key = key ? Buffer.from(key, "base64") : undefined; this.uuid = crypto.randomUUID(); this.name = `${remoteAddress}*${this.uuid.slice(-5)}`; diff --git a/src/models/recorder.ts b/src/models/recorder.ts new file mode 100644 index 0000000..36f5484 --- /dev/null +++ b/src/models/recorder.ts @@ -0,0 +1,18 @@ +import {EventEmitter} from "node:events"; +import type { Channel } from "./channel"; +import {Logger} from "#src/utils/utils.ts"; + +const logger = new Logger("RECORDER"); + +export class Recorder extends EventEmitter { + channel: Channel; + + constructor(channel: Channel) { + super(); + this.channel = channel; + } + + todo() { + logger.warn("TODO: Everything"); + } +} diff --git a/src/models/session.ts b/src/models/session.ts index 40c0884..4bea7a7 100644 --- a/src/models/session.ts +++ b/src/models/session.ts @@ -56,6 +56,9 @@ export enum SESSION_CLOSE_CODE { KICKED = "kicked", ERROR = "error" } +export interface SessionPermissions { + recording?: boolean; +} export interface TransportConfig { /** Transport identifier */ id: string; @@ -135,6 +138,9 @@ export class Session extends EventEmitter { camera: null, screen: null }; + public permissions: SessionPermissions = { + recording: false + }; /** Parent channel containing this session */ private readonly _channel: Channel; /** Recovery timeouts for failed consumers */ diff --git a/src/services/auth.ts b/src/services/auth.ts index 4c4c5aa..89c2357 100644 --- a/src/services/auth.ts +++ b/src/services/auth.ts @@ -3,7 +3,7 @@ import crypto from "node:crypto"; import * as config from "#src/config.ts"; import { Logger } from "#src/utils/utils.ts"; import { AuthenticationError } from "#src/utils/errors.ts"; -import type { SessionId } from "#src/models/session.ts"; +import type { SessionId, SessionPermissions } from "#src/models/session.ts"; import type { StringLike } from "#src/shared/types.ts"; /** @@ -43,6 +43,7 @@ interface PrivateJWTClaims { sfu_channel_uuid?: string; session_id?: SessionId; ice_servers?: object[]; + permissions: SessionPermissions, sessionIdsByChannel?: Record; /** If provided when requesting a channel, this key will be used instead of the global key to verify JWTs related to this channel */ key?: string; diff --git a/src/services/http.ts b/src/services/http.ts index c00d9b5..ebc8790 100644 --- a/src/services/http.ts +++ b/src/services/http.ts @@ -98,7 +98,8 @@ function setupRoutes(routeListener: RouteListener): void { } const channel = await Channel.create(remoteAddress, claims.iss, { key: claims.key, - useWebRtc: searchParams.get("webRTC") !== "false" + useWebRtc: searchParams.get("webRTC") !== "false", + useRecording: searchParams.get("recording") !== "false" }); res.setHeader("Content-Type", "application/json"); res.statusCode = 200; diff --git a/src/services/ws.ts b/src/services/ws.ts index 5ca1bda..021a323 100644 --- a/src/services/ws.ts +++ b/src/services/ws.ts @@ -112,7 +112,7 @@ function connect(webSocket: WebSocket, credentials: Credentials): Session { const { channelUUID, jwt } = credentials; let channel = channelUUID ? Channel.records.get(channelUUID) : undefined; const authResult = verify(jwt, channel?.key); - const { sfu_channel_uuid, session_id } = authResult; + const { sfu_channel_uuid, session_id, permissions } = authResult; if (!channelUUID && sfu_channel_uuid) { // Cases where the channelUUID is not provided in the credentials for backwards compatibility with version 1.1 and earlier. channel = Channel.records.get(sfu_channel_uuid); @@ -131,6 +131,7 @@ function connect(webSocket: WebSocket, credentials: Credentials): Session { webSocket.send(""); // client can start using ws after this message. const bus = new Bus(webSocket, { batchDelay: config.timeouts.busBatch }); const { session } = Channel.join(channel.uuid, session_id); + session.permissions = permissions; session.once("close", ({ code }: { code: string }) => { let wsCloseCode = WS_CLOSE_CODE.CLEAN; switch (code) { From f22ceba0acf94fc1ebb9b5f24713c96345434576 Mon Sep 17 00:00:00 2001 From: ThanhDodeurOdoo Date: Wed, 24 Sep 2025 14:34:45 +0200 Subject: [PATCH 02/12] fixup --- src/client.ts | 9 +++++++-- src/models/channel.ts | 4 ++-- src/models/session.ts | 9 ++++++++- src/services/auth.ts | 2 +- src/services/ws.ts | 6 ++++-- src/shared/types.ts | 5 +++++ 6 files changed, 27 insertions(+), 8 deletions(-) diff --git a/src/client.ts b/src/client.ts index f1bb5eb..c207e55 100644 --- a/src/client.ts +++ b/src/client.ts @@ -18,7 +18,7 @@ import { SERVER_REQUEST, WS_CLOSE_CODE } from "#src/shared/enums.ts"; -import type { JSONSerializable, StreamType, BusMessage } from "#src/shared/types"; +import type { JSONSerializable, StreamType, BusMessage, AvailableFeatures } from "#src/shared/types"; import type { TransportConfig, SessionId, SessionInfo } from "#src/models/session"; interface Consumers { @@ -141,6 +141,10 @@ const ACTIVE_STATES = new Set([ export class SfuClient extends EventTarget { /** Connection errors encountered */ public errors: Error[] = []; + public availableFeatures: AvailableFeatures = { + "rtc": false, + "recording": false, + }; /** Current client state */ private _state: SfuClientState = SfuClientState.DISCONNECTED; /** Communication bus */ @@ -445,7 +449,8 @@ export class SfuClient extends EventTarget { */ webSocket.addEventListener( "message", - () => { + (message) => { + this.availableFeatures = JSON.parse(message.data) as AvailableFeatures; resolve(new Bus(webSocket)); }, { once: true } diff --git a/src/models/channel.ts b/src/models/channel.ts index f543ddd..2933799 100644 --- a/src/models/channel.ts +++ b/src/models/channel.ts @@ -85,12 +85,12 @@ export class Channel extends EventEmitter { public readonly key?: Buffer; /** mediasoup Router for media routing */ public readonly router?: Router; + /** Manages the recording of this channel, undefined if the feature is disabled */ + public readonly recorder?: Recorder; /** Active sessions in this channel */ public readonly sessions = new Map(); /** mediasoup Worker handling this channel */ private readonly _worker?: RtcWorker; - /** Manages the recording of this channel, undefined if the feature is disabled */ - private recorder?: Recorder; /** Timeout for auto-closing empty channels */ private _closeTimeout?: NodeJS.Timeout; diff --git a/src/models/session.ts b/src/models/session.ts index 4bea7a7..0ddba7a 100644 --- a/src/models/session.ts +++ b/src/models/session.ts @@ -20,7 +20,7 @@ import { SERVER_REQUEST, STREAM_TYPE } from "#src/shared/enums.ts"; -import type { JSONSerializable, StreamType, BusMessage } from "#src/shared/types"; +import type {JSONSerializable, StreamType, BusMessage, AvailableFeatures } from "#src/shared/types"; import type { Bus } from "#src/shared/bus.ts"; import type { Channel } from "#src/models/channel.ts"; @@ -167,6 +167,13 @@ export class Session extends EventEmitter { this.setMaxListeners(config.CHANNEL_SIZE * 2); } + get availableFeatures(): AvailableFeatures { + return { + "rtc": Boolean(this._channel.router), + "recording": Boolean(this._channel.router && this._channel.recorder && this.permissions.recording) + } + } + get name(): string { return `${this._channel.name}:${this.id}@${this.remote}`; } diff --git a/src/services/auth.ts b/src/services/auth.ts index 89c2357..7efef4a 100644 --- a/src/services/auth.ts +++ b/src/services/auth.ts @@ -43,7 +43,7 @@ interface PrivateJWTClaims { sfu_channel_uuid?: string; session_id?: SessionId; ice_servers?: object[]; - permissions: SessionPermissions, + permissions?: SessionPermissions, sessionIdsByChannel?: Record; /** If provided when requesting a channel, this key will be used instead of the global key to verify JWTs related to this channel */ key?: string; diff --git a/src/services/ws.ts b/src/services/ws.ts index 021a323..54fd8cd 100644 --- a/src/services/ws.ts +++ b/src/services/ws.ts @@ -128,10 +128,12 @@ function connect(webSocket: WebSocket, credentials: Credentials): Session { if (!session_id) { throw new AuthenticationError("Malformed JWT payload"); } - webSocket.send(""); // client can start using ws after this message. const bus = new Bus(webSocket, { batchDelay: config.timeouts.busBatch }); const { session } = Channel.join(channel.uuid, session_id); - session.permissions = permissions; + if (permissions) { + Object.assign(session.permissions, permissions); + } + webSocket.send(JSON.stringify(session.availableFeatures)); // client can start using ws after this message. session.once("close", ({ code }: { code: string }) => { let wsCloseCode = WS_CLOSE_CODE.CLEAN; switch (code) { diff --git a/src/shared/types.ts b/src/shared/types.ts index 4210662..3f661ff 100644 --- a/src/shared/types.ts +++ b/src/shared/types.ts @@ -10,6 +10,11 @@ export type StreamType = "audio" | "camera" | "screen"; export type StringLike = Buffer | string; +export type AvailableFeatures = { + "rtc": boolean, + "recording": boolean, +} + import type { DownloadStates } from "#src/client.ts"; import type { SessionId, SessionInfo, TransportConfig } from "#src/models/session.ts"; From e5aaa79ac00b6bba87fbd164b748d2f727be4357 Mon Sep 17 00:00:00 2001 From: ThanhDodeurOdoo Date: Tue, 21 Oct 2025 10:09:57 +0200 Subject: [PATCH 03/12] [IMP] wip/poc --- src/client.ts | 17 +++++++++++++++++ src/config.ts | 2 +- src/models/channel.ts | 1 - src/models/recorder.ts | 16 ++++++++++++++-- src/models/session.ts | 39 +++++++++++++++++++++++++++++++++++++-- src/services/ws.ts | 4 +--- src/shared/enums.ts | 6 +++++- src/shared/types.ts | 2 ++ src/utils/utils.ts | 17 +++++++++++++++++ tests/network.test.ts | 12 ++++++++++++ 10 files changed, 106 insertions(+), 10 deletions(-) diff --git a/src/client.ts b/src/client.ts index c207e55..bb07ede 100644 --- a/src/client.ts +++ b/src/client.ts @@ -260,6 +260,23 @@ export class SfuClient extends EventTarget { await Promise.all(proms); return stats; } + async startRecording() { + return this._bus?.request( + { + name: CLIENT_REQUEST.START_RECORDING, + }, + { batch: true } + ); + } + + async stopRecording() { + return this._bus?.request( + { + name: CLIENT_REQUEST.STOP_RECORDING, + }, + { batch: true } + ); + } /** * Updates the server with the info of the session (isTalking, isCameraOn,...) so that it can broadcast it to the diff --git a/src/config.ts b/src/config.ts index 53d218e..3a7f614 100644 --- a/src/config.ts +++ b/src/config.ts @@ -67,7 +67,7 @@ export const PORT: number = Number(process.env.PORT) || 8070; /** * Whether the recording feature is enabled, true by default. */ -export const RECORDING: boolean = !FALSY_INPUT.has(process.env.LOG_TIMESTAMP!); +export const RECORDING: boolean = !FALSY_INPUT.has(process.env.RECORDING!); /** * The number of workers to spawn (up to core limits) to manage RTC servers. diff --git a/src/models/channel.ts b/src/models/channel.ts index 2933799..31973b5 100644 --- a/src/models/channel.ts +++ b/src/models/channel.ts @@ -188,7 +188,6 @@ export class Channel extends EventEmitter { this.createDate = now.toISOString(); this.remoteAddress = remoteAddress; this.recorder = config.recording.enabled && options.useRecording ? new Recorder(this) : undefined; - this.recorder?.todo(); this.key = key ? Buffer.from(key, "base64") : undefined; this.uuid = crypto.randomUUID(); this.name = `${remoteAddress}*${this.uuid.slice(-5)}`; diff --git a/src/models/recorder.ts b/src/models/recorder.ts index 36f5484..3da6f98 100644 --- a/src/models/recorder.ts +++ b/src/models/recorder.ts @@ -6,13 +6,25 @@ const logger = new Logger("RECORDER"); export class Recorder extends EventEmitter { channel: Channel; + state: "started" | "stopped" = "stopped"; + ffmpeg = null; constructor(channel: Channel) { super(); this.channel = channel; } - todo() { - logger.warn("TODO: Everything"); + async start() { + this.state = "started"; + logger.trace("TO IMPLEMENT"); + // TODO ffmpeg instance creation, start + return { state: this.state }; + } + + async stop() { + this.state = "stopped"; + logger.trace("TO IMPLEMENT"); + // TODO ffmpeg instance stop, cleanup, save,... + return { state: this.state }; } } diff --git a/src/models/session.ts b/src/models/session.ts index 0ddba7a..b50eb81 100644 --- a/src/models/session.ts +++ b/src/models/session.ts @@ -110,6 +110,7 @@ const logger = new Logger("SESSION"); * * @fires Session#stateChange - Emitted when session state changes * @fires Session#close - Emitted when session is closed + * @fires Session#producer - Emitted when a new producer is created */ export class Session extends EventEmitter { /** Communication bus for WebSocket messaging */ @@ -138,9 +139,9 @@ export class Session extends EventEmitter { camera: null, screen: null }; - public permissions: SessionPermissions = { + public readonly permissions: SessionPermissions = Object.seal({ recording: false - }; + }); /** Parent channel containing this session */ private readonly _channel: Channel; /** Recovery timeouts for failed consumers */ @@ -184,9 +185,26 @@ export class Session extends EventEmitter { set state(state: SESSION_STATE) { this._state = state; + /** + * @event Session#stateChange + * @type {{ state: SESSION_STATE }} + */ this.emit("stateChange", state); } + updatePermissions(permissions: SessionPermissions | undefined): void { + if (!permissions) { + return; + } + for (const key of Object.keys(this.permissions) as (keyof SessionPermissions)[]) { + const newVal = permissions[key]; + if (newVal === undefined) { + continue; + } + this.permissions[key] = Boolean(permissions[key]); + } + } + async getProducerBitRates(): Promise { const bitRates: ProducerBitRates = {}; const proms: Promise[] = []; @@ -643,8 +661,25 @@ export class Session extends EventEmitter { logger.debug(`[${this.name}] producing ${type}: ${codec?.mimeType}`); this._updateRemoteConsumers(); this._broadcastInfo(); + /** + * @event Session#producer + * @type {{ type: StreamType, producer: Producer }} + */ + this.emit("producer", { type, producer }); return { id: producer.id }; } + case CLIENT_REQUEST.START_RECORDING: { + if (this.permissions.recording && this._channel.recorder) { + return this._channel.recorder.start(); + } + return; + } + case CLIENT_REQUEST.STOP_RECORDING: { + if (this.permissions.recording && this._channel.recorder) { + return this._channel.recorder.stop(); + } + return; + } default: logger.warn(`[${this.name}] Unknown request type: ${name}`); throw new Error(`Unknown request type: ${name}`); diff --git a/src/services/ws.ts b/src/services/ws.ts index 54fd8cd..d6f6bea 100644 --- a/src/services/ws.ts +++ b/src/services/ws.ts @@ -130,9 +130,7 @@ function connect(webSocket: WebSocket, credentials: Credentials): Session { } const bus = new Bus(webSocket, { batchDelay: config.timeouts.busBatch }); const { session } = Channel.join(channel.uuid, session_id); - if (permissions) { - Object.assign(session.permissions, permissions); - } + session.updatePermissions(permissions); webSocket.send(JSON.stringify(session.availableFeatures)); // client can start using ws after this message. session.once("close", ({ code }: { code: string }) => { let wsCloseCode = WS_CLOSE_CODE.CLEAN; diff --git a/src/shared/enums.ts b/src/shared/enums.ts index a8703d6..e5a3a92 100644 --- a/src/shared/enums.ts +++ b/src/shared/enums.ts @@ -33,7 +33,11 @@ export enum CLIENT_REQUEST { /** Requests the server to connect the server-to-client transport */ CONNECT_STC_TRANSPORT = "CONNECT_STC_TRANSPORT", /** Requests the creation of a consumer that is used to upload a track to the server */ - INIT_PRODUCER = "INIT_PRODUCER" + INIT_PRODUCER = "INIT_PRODUCER", + /** Requests to start recording of the call */ + START_RECORDING = "START_RECORDING", + /** Requests to stop recording of the call */ + STOP_RECORDING = "STOP_RECORDING" } export enum CLIENT_MESSAGE { diff --git a/src/shared/types.ts b/src/shared/types.ts index 3f661ff..f59c891 100644 --- a/src/shared/types.ts +++ b/src/shared/types.ts @@ -54,6 +54,8 @@ export type BusMessage = name: typeof CLIENT_REQUEST.INIT_PRODUCER; payload: { type: StreamType; kind: MediaKind; rtpParameters: RtpParameters }; } + | { name: typeof CLIENT_REQUEST.START_RECORDING; payload?: never } + | { name: typeof CLIENT_REQUEST.STOP_RECORDING; payload?: never } | { name: typeof SERVER_MESSAGE.BROADCAST; payload: { senderId: SessionId; message: JSONSerializable }; diff --git a/src/utils/utils.ts b/src/utils/utils.ts index 39773cb..8c668af 100644 --- a/src/utils/utils.ts +++ b/src/utils/utils.ts @@ -12,6 +12,7 @@ const ASCII = { green: "\x1b[32m", yellow: "\x1b[33m", white: "\x1b[37m", + cyan: "\x1b[36m", default: "\x1b[0m" } } as const; @@ -48,6 +49,19 @@ export interface ParseBodyOptions { json?: boolean; } +function getCallChain(depth: number = 8): string { + const stack = new Error().stack?.split("\n").slice(2, depth + 2) ?? []; + return stack + .map(line => { + const match = line.trim().match(/^at\s+(.*?)\s+\(/); + return match ? match[1] : null; + }) + .slice(1, depth + 1) + .filter(Boolean) + .reverse() + .join(" > "); +} + export class Logger { private readonly _name: string; private readonly _colorize: (text: string, color?: string) => string; @@ -83,6 +97,9 @@ export class Logger { verbose(text: string): void { this._log(console.log, ":VERBOSE:", text, ASCII.color.white); } + trace(message: string, { depth = 8 }: { depth?: number } = {}): void { + this._log(console.log, ":TRACE:", `${getCallChain(depth)} ${message}`, ASCII.color.cyan); + } private _generateTimeStamp(): string { const now = new Date(); return now.toISOString() + " "; diff --git a/tests/network.test.ts b/tests/network.test.ts index 6a90558..080787f 100644 --- a/tests/network.test.ts +++ b/tests/network.test.ts @@ -284,4 +284,16 @@ describe("Full network", () => { expect(event1.detail.payload.message).toBe(message); expect(event2.detail.payload.message).toBe(message); }); + test("POC RECORDING", async () => { + const channelUUID = await network.getChannelUUID(); + const user1 = await network.connect(channelUUID, 1); + await once(user1.session, "stateChange"); + const sender = await network.connect(channelUUID, 3); + await once(sender.session, "stateChange"); + sender.session.updatePermissions({ recording: true }); + const startResult = await sender.sfuClient.startRecording() as { state: string }; + expect(startResult.state).toBe("started"); + const stopResult = await sender.sfuClient.stopRecording() as { state: string }; + expect(stopResult.state).toBe("stopped"); + }); }); From acd9ceb9ab2eaad69f6559749378093f51e24a8b Mon Sep 17 00:00:00 2001 From: ThanhDodeurOdoo Date: Thu, 23 Oct 2025 06:58:13 +0200 Subject: [PATCH 04/12] [IMP] rec addr --- src/models/channel.ts | 9 +++++---- src/models/recorder.ts | 5 ++++- src/services/http.ts | 25 ++++++++++++++++++++++++- 3 files changed, 33 insertions(+), 6 deletions(-) diff --git a/src/models/channel.ts b/src/models/channel.ts index 31973b5..5ecb72b 100644 --- a/src/models/channel.ts +++ b/src/models/channel.ts @@ -54,7 +54,7 @@ interface ChannelCreateOptions { key?: string; /** Whether to enable WebRTC functionality */ useWebRtc?: boolean; - useRecording?: boolean; + recordingAddress?: string | null; } interface JoinResult { /** The channel instance */ @@ -106,7 +106,7 @@ export class Channel extends EventEmitter { issuer: string, options: ChannelCreateOptions = {} ): Promise { - const { key, useWebRtc = true, useRecording = true } = options; + const { key, useWebRtc = true, recordingAddress } = options; const safeIssuer = `${remoteAddress}::${issuer}`; const oldChannel = Channel.recordsByIssuer.get(safeIssuer); if (oldChannel) { @@ -116,7 +116,7 @@ export class Channel extends EventEmitter { const channelOptions: ChannelCreateOptions & { worker?: Worker; router?: Router; - } = { key, useRecording: useWebRtc && useRecording }; + } = { key, recordingAddress: useWebRtc ? recordingAddress : null }; if (useWebRtc) { channelOptions.worker = await getWorker(); channelOptions.router = await channelOptions.worker.createRouter({ @@ -187,7 +187,7 @@ export class Channel extends EventEmitter { const now = new Date(); this.createDate = now.toISOString(); this.remoteAddress = remoteAddress; - this.recorder = config.recording.enabled && options.useRecording ? new Recorder(this) : undefined; + this.recorder = config.recording.enabled && options.recordingAddress ? new Recorder(this, options.recordingAddress) : undefined; this.key = key ? Buffer.from(key, "base64") : undefined; this.uuid = crypto.randomUUID(); this.name = `${remoteAddress}*${this.uuid.slice(-5)}`; @@ -300,6 +300,7 @@ export class Channel extends EventEmitter { * @fires Channel#close */ close(): void { + this.recorder?.stop(); for (const session of this.sessions.values()) { session.off("close", this._onSessionClose); session.close({ code: SESSION_CLOSE_CODE.CHANNEL_CLOSED }); diff --git a/src/models/recorder.ts b/src/models/recorder.ts index 3da6f98..d57bb52 100644 --- a/src/models/recorder.ts +++ b/src/models/recorder.ts @@ -8,10 +8,13 @@ export class Recorder extends EventEmitter { channel: Channel; state: "started" | "stopped" = "stopped"; ffmpeg = null; + /** Path to which the final recording will be uploaded to */ + recordingAddress: string; - constructor(channel: Channel) { + constructor(channel: Channel, recordingAddress: string) { super(); this.channel = channel; + this.recordingAddress = recordingAddress; } async start() { diff --git a/src/services/http.ts b/src/services/http.ts index ebc8790..bb20ef3 100644 --- a/src/services/http.ts +++ b/src/services/http.ts @@ -79,6 +79,29 @@ function setupRoutes(routeListener: RouteListener): void { return res.end(JSON.stringify(channelStats)); } }); + /** + * GET /v1/channel + * + * Creates (or reuses) a media channel for the authenticated client. + * + * ### Headers + * - `Authorization: Bearer ` — required. + * The JWT must include the `iss` (issuer) claim identifying the caller. + * + * ### Query Parameters + * - `webRTC` — optional, defaults to `"true"`. + * When set to `"false"`, disables WebRTC setup and creates a non-media channel. + * - `recordingAddress` — optional. + * If provided, enables recording and specifies the destination address + * for recorded media streams. This address should most likely include a secret token, + * so that it can be used publicly. For example http://example.com/recording/123?token=asdasdasdasd + * + * ### Responses + * - `200 OK` — returns `{ uuid: string, url: string }` + * - `401 Unauthorized` — missing or invalid Authorization header + * - `403 Forbidden` — missing `iss` claim + * - `500 Internal Server Error` — failed to create the channel + */ routeListener.get(`/v${API_VERSION}/channel`, { callback: async (req, res, { host, protocol, remoteAddress, searchParams }) => { try { @@ -99,7 +122,7 @@ function setupRoutes(routeListener: RouteListener): void { const channel = await Channel.create(remoteAddress, claims.iss, { key: claims.key, useWebRtc: searchParams.get("webRTC") !== "false", - useRecording: searchParams.get("recording") !== "false" + recordingAddress: searchParams.get("recordingAddress") }); res.setHeader("Content-Type", "application/json"); res.statusCode = 200; From 4abe114ca05922b3ebcc9804dcc800ae275a3f99 Mon Sep 17 00:00:00 2001 From: ThanhDodeurOdoo Date: Thu, 23 Oct 2025 14:16:41 +0200 Subject: [PATCH 05/12] [IMP] fixup --- src/models/channel.ts | 2 +- src/models/ffmpeg.ts | 8 ++++++ src/models/recorder.ts | 36 ++++++++++++++++++++------- src/server.ts | 6 ++--- src/services/{rtc.ts => resources.ts} | 19 +++++++++++++- tests/models.test.ts | 6 ++--- tests/rtc.test.ts | 8 +++--- tests/utils/network.ts | 6 ++--- 8 files changed, 67 insertions(+), 24 deletions(-) create mode 100644 src/models/ffmpeg.ts rename src/services/{rtc.ts => resources.ts} (83%) diff --git a/src/models/channel.ts b/src/models/channel.ts index 5ecb72b..e9ecc3a 100644 --- a/src/models/channel.ts +++ b/src/models/channel.ts @@ -13,7 +13,7 @@ import { type SessionInfo } from "#src/models/session.ts"; import { Recorder } from "#src/models/recorder.ts"; -import { getWorker, type RtcWorker } from "#src/services/rtc.ts"; +import { getWorker, type RtcWorker } from "#src/services/resources.ts"; const logger = new Logger("CHANNEL"); diff --git a/src/models/ffmpeg.ts b/src/models/ffmpeg.ts new file mode 100644 index 0000000..48189b9 --- /dev/null +++ b/src/models/ffmpeg.ts @@ -0,0 +1,8 @@ +import { EventEmitter } from "node:events"; + +export class FFMPEG extends EventEmitter { + + constructor() { + super(); + } +} diff --git a/src/models/recorder.ts b/src/models/recorder.ts index d57bb52..c80a33b 100644 --- a/src/models/recorder.ts +++ b/src/models/recorder.ts @@ -1,13 +1,19 @@ -import {EventEmitter} from "node:events"; +import { EventEmitter } from "node:events"; import type { Channel } from "./channel"; -import {Logger} from "#src/utils/utils.ts"; +import { getFolder } from "#src/services/resources.ts"; +import { Logger } from "#src/utils/utils.ts"; +export enum RECORDER_STATE { + STARTED = "started", + STOPPED = "stopped", +} const logger = new Logger("RECORDER"); export class Recorder extends EventEmitter { channel: Channel; - state: "started" | "stopped" = "stopped"; + state: RECORDER_STATE = RECORDER_STATE.STOPPED; ffmpeg = null; + destPath: string | undefined; /** Path to which the final recording will be uploaded to */ recordingAddress: string; @@ -18,16 +24,28 @@ export class Recorder extends EventEmitter { } async start() { - this.state = "started"; - logger.trace("TO IMPLEMENT"); - // TODO ffmpeg instance creation, start + if (this.state === RECORDER_STATE.STOPPED) { + const { path, sealFolder } = getFolder(); + this.destPath = path; + this.once("stopped", sealFolder); + this.state = RECORDER_STATE.STARTED; + logger.trace("TO IMPLEMENT"); + // TODO ffmpeg instance creation for recording to destPath with proper name, start, build timestamps object + } + return { state: this.state }; } async stop() { - this.state = "stopped"; - logger.trace("TO IMPLEMENT"); - // TODO ffmpeg instance stop, cleanup, save,... + if (this.state === RECORDER_STATE.STARTED) { + + logger.trace("TO IMPLEMENT"); + this.emit("stopped"); + // TODO ffmpeg instance stop, cleanup, + // only resolve promise and switch state when completely ready to start a new recording. + this.state = RECORDER_STATE.STOPPED; + } + return { state: this.state }; } } diff --git a/src/server.ts b/src/server.ts index 80ab97d..2d36a96 100644 --- a/src/server.ts +++ b/src/server.ts @@ -1,4 +1,4 @@ -import * as rtc from "#src/services/rtc.ts"; +import * as resources from "#src/services/resources.ts"; import * as http from "#src/services/http.ts"; import * as auth from "#src/services/auth.ts"; import { Logger } from "#src/utils/utils.ts"; @@ -8,7 +8,7 @@ const logger = new Logger("SERVER", { logLevel: "all" }); async function run(): Promise { auth.start(); - await rtc.start(); + await resources.start(); await http.start(); logger.info(`ready - PID: ${process.pid}`); } @@ -16,7 +16,7 @@ async function run(): Promise { function cleanup(): void { Channel.closeAll(); http.close(); - rtc.close(); + resources.close(); logger.info("cleanup complete"); } diff --git a/src/services/rtc.ts b/src/services/resources.ts similarity index 83% rename from src/services/rtc.ts rename to src/services/resources.ts index 2e546c7..9f7cff2 100644 --- a/src/services/rtc.ts +++ b/src/services/resources.ts @@ -10,7 +10,7 @@ export interface RtcWorker extends mediasoup.types.Worker { }; } -const logger = new Logger("RTC"); +const logger = new Logger("RESOURCES"); const workers = new Set(); export async function start(): Promise { @@ -76,3 +76,20 @@ export async function getWorker(): Promise { logger.debug(`worker ${leastUsedWorker!.pid} with ${lowestUsage} ru_maxrss was selected`); return leastUsedWorker; } + +export function getFolder() { + // create a temp folder at a path, returns the path and a function to seal the folder + return { + path: "", + sealFolder: () => { + // move the content into a permanent folder location so it can easily be retrieved for processing later + // or directly forward for transcription + }, + } +} + +export function getPort() { +} + +export function releasePort(port: number) { +} diff --git a/tests/models.test.ts b/tests/models.test.ts index d84f49b..0b00d9d 100644 --- a/tests/models.test.ts +++ b/tests/models.test.ts @@ -1,17 +1,17 @@ import { describe, beforeEach, afterEach, expect, jest } from "@jest/globals"; -import * as rtc from "#src/services/rtc"; +import * as resources from "#src/services/resources"; import { Channel } from "#src/models/channel"; import { timeouts, CHANNEL_SIZE } from "#src/config"; import { OvercrowdedError } from "#src/utils/errors"; describe("Models", () => { beforeEach(async () => { - await rtc.start(); + await resources.start(); }); afterEach(() => { Channel.closeAll(); - rtc.close(); + resources.close(); }); test("Create channel and session", async () => { const channel = await Channel.create("testRemote", "testIssuer"); diff --git a/tests/rtc.test.ts b/tests/rtc.test.ts index 08c188d..37caa6f 100644 --- a/tests/rtc.test.ts +++ b/tests/rtc.test.ts @@ -1,19 +1,19 @@ import { afterEach, beforeEach, describe, expect } from "@jest/globals"; -import * as rtc from "#src/services/rtc"; +import * as resources from "#src/services/resources"; import * as config from "#src/config"; describe("rtc service", () => { beforeEach(async () => { - await rtc.start(); + await resources.start(); }); afterEach(() => { - rtc.close(); + resources.close(); }); test("worker load should be evenly distributed", async () => { const usedWorkers = new Set(); for (let i = 0; i < config.NUM_WORKERS; ++i) { - const worker = await rtc.getWorker(); + const worker = await resources.getWorker(); const router = await worker.createRouter({}); const webRtcServer = await worker.createWebRtcServer(config.rtc.rtcServerOptions); const promises = []; diff --git a/tests/utils/network.ts b/tests/utils/network.ts index dace247..9ac1d16 100644 --- a/tests/utils/network.ts +++ b/tests/utils/network.ts @@ -5,7 +5,7 @@ import * as fakeParameters from "mediasoup-client/lib/test/fakeParameters"; import * as auth from "#src/services/auth"; import * as http from "#src/services/http"; -import * as rtc from "#src/services/rtc"; +import * as resources from "#src/services/resources"; import { SfuClient, SfuClientState } from "#src/client"; import { Channel } from "#src/models/channel"; import type { Session } from "#src/models/session"; @@ -69,7 +69,7 @@ export class LocalNetwork { this.port = port; // Start all services in correct order - await rtc.start(); + await resources.start(); await http.start({ httpInterface: hostname, port }); await auth.start(HMAC_B64_KEY); } @@ -217,7 +217,7 @@ export class LocalNetwork { // Stop all services auth.close(); http.close(); - rtc.close(); + resources.close(); // Clear network info this.hostname = undefined; From e855f75313a2d5d1e5e72f442c79c14f63a3a9e6 Mon Sep 17 00:00:00 2001 From: ThanhDodeurOdoo Date: Wed, 29 Oct 2025 14:38:18 +0100 Subject: [PATCH 06/12] [IMP] wip --- src/config.ts | 5 +++-- src/models/recorder.ts | 15 ++++++++++++--- src/services/resources.ts | 29 +++++++++++++++++++++++------ src/utils/errors.ts | 4 ++++ 4 files changed, 42 insertions(+), 11 deletions(-) diff --git a/src/config.ts b/src/config.ts index 3a7f614..b7d209e 100644 --- a/src/config.ts +++ b/src/config.ts @@ -65,9 +65,9 @@ export const HTTP_INTERFACE: string = process.env.HTTP_INTERFACE || "0.0.0.0"; export const PORT: number = Number(process.env.PORT) || 8070; /** - * Whether the recording feature is enabled, true by default. + * Whether the recording feature is enabled, false by default. */ -export const RECORDING: boolean = !FALSY_INPUT.has(process.env.RECORDING!); +export const RECORDING: boolean = Boolean(process.env.RECORDING); /** * The number of workers to spawn (up to core limits) to manage RTC servers. @@ -215,6 +215,7 @@ export const recording = Object.freeze({ screenLimit: 1, }); +// TODO: This should probably be env variable, and at least documented so that deployment can open these ports. export const dynamicPorts = Object.freeze({ min: 50000, max: 59999, diff --git a/src/models/recorder.ts b/src/models/recorder.ts index c80a33b..71eadb3 100644 --- a/src/models/recorder.ts +++ b/src/models/recorder.ts @@ -32,20 +32,29 @@ export class Recorder extends EventEmitter { logger.trace("TO IMPLEMENT"); // TODO ffmpeg instance creation for recording to destPath with proper name, start, build timestamps object } - + this._record(); return { state: this.state }; } async stop() { if (this.state === RECORDER_STATE.STARTED) { - logger.trace("TO IMPLEMENT"); this.emit("stopped"); // TODO ffmpeg instance stop, cleanup, // only resolve promise and switch state when completely ready to start a new recording. this.state = RECORDER_STATE.STOPPED; } - return { state: this.state }; } + + /** + * @param video whether we want to record videos or not (will always record audio) + */ + _record(video: boolean = false) { + console.trace(`TO IMPLEMENT: recording channel ${this.channel.name}, video: ${video}`); + // iterate all producers on all sessions of the channel, create a ffmpeg for each, + // save them on a map by session id+type. + // check if recording for that session id+type is already in progress + // add listener to the channel for producer creation (and closure). + } } diff --git a/src/services/resources.ts b/src/services/resources.ts index 9f7cff2..46561af 100644 --- a/src/services/resources.ts +++ b/src/services/resources.ts @@ -3,6 +3,10 @@ import type { WebRtcServerOptions } from "mediasoup/node/lib/types"; import * as config from "#src/config.ts"; import { Logger } from "#src/utils/utils.ts"; +import { PortLimitReachedError } from "#src/utils/errors.ts"; +import os from "node:os"; + +const availablePorts: Set = new Set(); export interface RtcWorker extends mediasoup.types.Worker { appData: { @@ -12,6 +16,7 @@ export interface RtcWorker extends mediasoup.types.Worker { const logger = new Logger("RESOURCES"); const workers = new Set(); +const tempDir = os.tmpdir() + "/ongoing_recordings"; export async function start(): Promise { logger.info("starting..."); @@ -22,6 +27,10 @@ export async function start(): Promise { logger.info( `transport(RTC) layer at ${config.PUBLIC_IP}:${config.RTC_MIN_PORT}-${config.RTC_MAX_PORT}` ); + for (let i = config.dynamicPorts.min; i <= config.dynamicPorts.max; i++) { + availablePorts.add(i); + } + logger.info(`${availablePorts.size} dynamic ports available [${config.dynamicPorts.min}-${config.dynamicPorts.max}]`); } export function close(): void { @@ -78,18 +87,26 @@ export async function getWorker(): Promise { } export function getFolder() { - // create a temp folder at a path, returns the path and a function to seal the folder + const tempName = `${Date.now()}`; + const path = `${tempDir}/${tempName}`; + // TODO we may want to track these temp folders to remove them periodically (although os.tempDir() has already such a mechanism) return { - path: "", - sealFolder: () => { - // move the content into a permanent folder location so it can easily be retrieved for processing later - // or directly forward for transcription + path, + sealFolder: (name: string = tempName) => { + // TODO move whatever is in path to + console.log(`${config.recording.directory}/${name}`); }, } } -export function getPort() { +export function getPort(): number { + const port = availablePorts.values().next().value; + if (!port) { + throw new PortLimitReachedError(); + } + return port; } export function releasePort(port: number) { + availablePorts.add(port); } diff --git a/src/utils/errors.ts b/src/utils/errors.ts index 5eb7855..62ee4f9 100644 --- a/src/utils/errors.ts +++ b/src/utils/errors.ts @@ -5,3 +5,7 @@ export class AuthenticationError extends Error { export class OvercrowdedError extends Error { name = "OvercrowdedError"; } + +export class PortLimitReachedError extends Error { + name = "PortLimitReachedError"; +} From 886f009091a1d6371b315c6743f2b2483111092e Mon Sep 17 00:00:00 2001 From: ThanhDodeurOdoo Date: Thu, 30 Oct 2025 11:07:15 +0100 Subject: [PATCH 07/12] [IMP] wip --- src/models/recorder.ts | 10 ++++---- src/services/resources.ts | 49 +++++++++++++++++++++++++-------------- 2 files changed, 38 insertions(+), 21 deletions(-) diff --git a/src/models/recorder.ts b/src/models/recorder.ts index 71eadb3..ed98c75 100644 --- a/src/models/recorder.ts +++ b/src/models/recorder.ts @@ -25,9 +25,11 @@ export class Recorder extends EventEmitter { async start() { if (this.state === RECORDER_STATE.STOPPED) { - const { path, sealFolder } = getFolder(); - this.destPath = path; - this.once("stopped", sealFolder); + const folder = getFolder(); + this.destPath = folder.path; + this.once("stopped", ({ name }) => { + folder.seal(name); + }); this.state = RECORDER_STATE.STARTED; logger.trace("TO IMPLEMENT"); // TODO ffmpeg instance creation for recording to destPath with proper name, start, build timestamps object @@ -39,7 +41,7 @@ export class Recorder extends EventEmitter { async stop() { if (this.state === RECORDER_STATE.STARTED) { logger.trace("TO IMPLEMENT"); - this.emit("stopped"); + this.emit("stopped", { name: "test" }); // TODO ffmpeg instance stop, cleanup, // only resolve promise and switch state when completely ready to start a new recording. this.state = RECORDER_STATE.STOPPED; diff --git a/src/services/resources.ts b/src/services/resources.ts index 46561af..63fc62a 100644 --- a/src/services/resources.ts +++ b/src/services/resources.ts @@ -7,6 +7,7 @@ import { PortLimitReachedError } from "#src/utils/errors.ts"; import os from "node:os"; const availablePorts: Set = new Set(); +let unique = 1; export interface RtcWorker extends mediasoup.types.Worker { appData: { @@ -14,6 +15,8 @@ export interface RtcWorker extends mediasoup.types.Worker { }; } +// TODO maybe write some docstring, file used to manage resources such as folders, workers, ports + const logger = new Logger("RESOURCES"); const workers = new Set(); const tempDir = os.tmpdir() + "/ongoing_recordings"; @@ -86,27 +89,39 @@ export async function getWorker(): Promise { return leastUsedWorker; } -export function getFolder() { - const tempName = `${Date.now()}`; - const path = `${tempDir}/${tempName}`; - // TODO we may want to track these temp folders to remove them periodically (although os.tempDir() has already such a mechanism) - return { - path, - sealFolder: (name: string = tempName) => { - // TODO move whatever is in path to - console.log(`${config.recording.directory}/${name}`); - }, +class Folder { + path: string; + + constructor(path: string) { + this.path = path; + } + + seal(name: string) { + console.trace(`TO IMPLEMENT, MOVING TO ${config.recording.directory}/${name}`); } } -export function getPort(): number { - const port = availablePorts.values().next().value; - if (!port) { - throw new PortLimitReachedError(); +export function getFolder(): Folder { + return new Folder(`${tempDir}/${Date.now()}-${unique++}`); +} + +class DynamicPort { + number: number; + + constructor(number: number) { + availablePorts.delete(number); + this.number = number; + } + + release() { + availablePorts.add(this.number); } - return port; } -export function releasePort(port: number) { - availablePorts.add(port); +export function getPort(): DynamicPort { + const number = availablePorts.values().next().value; + if (!number) { + throw new PortLimitReachedError(); + } + return new DynamicPort(number); } From 358ce983c0b601e3a0415c015ac828544525de29 Mon Sep 17 00:00:00 2001 From: ThanhDodeurOdoo Date: Tue, 4 Nov 2025 08:50:58 +0100 Subject: [PATCH 08/12] [WIP] discuss: folder move --- src/client.ts | 6 ++++++ src/models/recorder.ts | 14 ++++++-------- src/services/resources.ts | 11 ++++++++--- 3 files changed, 20 insertions(+), 11 deletions(-) diff --git a/src/client.ts b/src/client.ts index bb07ede..94fedb5 100644 --- a/src/client.ts +++ b/src/client.ts @@ -261,6 +261,9 @@ export class SfuClient extends EventTarget { return stats; } async startRecording() { + if (this.state !== SfuClientState.CONNECTED) { + throw new Error("InvalidState"); + } return this._bus?.request( { name: CLIENT_REQUEST.START_RECORDING, @@ -270,6 +273,9 @@ export class SfuClient extends EventTarget { } async stopRecording() { + if (this.state !== SfuClientState.CONNECTED) { + throw new Error("InvalidState"); + } return this._bus?.request( { name: CLIENT_REQUEST.STOP_RECORDING, diff --git a/src/models/recorder.ts b/src/models/recorder.ts index ed98c75..c2a0c5e 100644 --- a/src/models/recorder.ts +++ b/src/models/recorder.ts @@ -1,6 +1,7 @@ import { EventEmitter } from "node:events"; import type { Channel } from "./channel"; import { getFolder } from "#src/services/resources.ts"; +import type { Folder } from "#src/services/resources.ts"; import { Logger } from "#src/utils/utils.ts"; export enum RECORDER_STATE { @@ -12,8 +13,8 @@ const logger = new Logger("RECORDER"); export class Recorder extends EventEmitter { channel: Channel; state: RECORDER_STATE = RECORDER_STATE.STOPPED; + folder: Folder | undefined; ffmpeg = null; - destPath: string | undefined; /** Path to which the final recording will be uploaded to */ recordingAddress: string; @@ -25,14 +26,10 @@ export class Recorder extends EventEmitter { async start() { if (this.state === RECORDER_STATE.STOPPED) { - const folder = getFolder(); - this.destPath = folder.path; - this.once("stopped", ({ name }) => { - folder.seal(name); - }); + this.folder = getFolder(); this.state = RECORDER_STATE.STARTED; logger.trace("TO IMPLEMENT"); - // TODO ffmpeg instance creation for recording to destPath with proper name, start, build timestamps object + // TODO ffmpeg instance creation for recording to folder.path with proper name, start, build timestamps object } this._record(); return { state: this.state }; @@ -41,7 +38,8 @@ export class Recorder extends EventEmitter { async stop() { if (this.state === RECORDER_STATE.STARTED) { logger.trace("TO IMPLEMENT"); - this.emit("stopped", { name: "test" }); + await this.folder!.seal("test-name"); + this.folder = undefined; // TODO ffmpeg instance stop, cleanup, // only resolve promise and switch state when completely ready to start a new recording. this.state = RECORDER_STATE.STOPPED; diff --git a/src/services/resources.ts b/src/services/resources.ts index 63fc62a..899d405 100644 --- a/src/services/resources.ts +++ b/src/services/resources.ts @@ -5,6 +5,8 @@ import * as config from "#src/config.ts"; import { Logger } from "#src/utils/utils.ts"; import { PortLimitReachedError } from "#src/utils/errors.ts"; import os from "node:os"; +import fs from "node:fs/promises"; +import path from "node:path"; const availablePorts: Set = new Set(); let unique = 1; @@ -89,15 +91,18 @@ export async function getWorker(): Promise { return leastUsedWorker; } -class Folder { +export class Folder { path: string; constructor(path: string) { this.path = path; } - seal(name: string) { - console.trace(`TO IMPLEMENT, MOVING TO ${config.recording.directory}/${name}`); + async seal(name: string) { + const destinationPath = path.join(config.recording.directory, name); + await fs.rename(this.path, destinationPath); + this.path = destinationPath; + logger.verbose(`Moved folder from ${this.path} to ${destinationPath}`); } } From 27d7693356019bf975090c76b096d013fea70d74 Mon Sep 17 00:00:00 2001 From: ThanhDodeurOdoo Date: Tue, 4 Nov 2025 14:05:31 +0100 Subject: [PATCH 09/12] [WIP] recording: state broadcasting, updated tests --- package-lock.json | 17 +++++++++++++++++ package.json | 1 + src/client.ts | 33 ++++++++++++++++++++++++++------- src/config.ts | 14 +++++++------- src/models/channel.ts | 29 ++++++++++++++++++++++++++++- src/models/recorder.ts | 37 +++++++++++++++++++++++++++---------- src/models/session.ts | 28 +++++++++++++++++----------- src/services/ws.ts | 2 +- src/shared/enums.ts | 4 +++- src/shared/types.ts | 11 ++++++++--- tests/network.test.ts | 28 ++++++++++------------------ tests/utils/network.ts | 26 ++++++++++++++++++++------ 12 files changed, 165 insertions(+), 65 deletions(-) diff --git a/package-lock.json b/package-lock.json index 152cfca..f2c5b56 100644 --- a/package-lock.json +++ b/package-lock.json @@ -23,6 +23,7 @@ "@typescript-eslint/eslint-plugin": "8.32.1", "@typescript-eslint/parser": "8.32.1", "eslint": "8.57.1", + "eslint-config-prettier": "^10.1.8", "eslint-plugin-import": "^2.25.3", "eslint-plugin-jest": "28.11.0", "eslint-plugin-node": "^11.1.0", @@ -2982,6 +2983,22 @@ "url": "https://opencollective.com/eslint" } }, + "node_modules/eslint-config-prettier": { + "version": "10.1.8", + "resolved": "https://registry.npmjs.org/eslint-config-prettier/-/eslint-config-prettier-10.1.8.tgz", + "integrity": "sha512-82GZUjRS0p/jganf6q1rEO25VSoHH0hKPCTrgillPjdI/3bgBhAE1QzHrHTizjpRvy6pGAvKjDJtk2pF9NDq8w==", + "dev": true, + "license": "MIT", + "bin": { + "eslint-config-prettier": "bin/cli.js" + }, + "funding": { + "url": "https://opencollective.com/eslint-config-prettier" + }, + "peerDependencies": { + "eslint": ">=7.0.0" + } + }, "node_modules/eslint-import-resolver-node": { "version": "0.3.9", "resolved": "https://registry.npmjs.org/eslint-import-resolver-node/-/eslint-import-resolver-node-0.3.9.tgz", diff --git a/package.json b/package.json index e05078b..9deb2c6 100644 --- a/package.json +++ b/package.json @@ -37,6 +37,7 @@ "@typescript-eslint/eslint-plugin": "8.32.1", "@typescript-eslint/parser": "8.32.1", "eslint": "8.57.1", + "eslint-config-prettier": "^10.1.8", "eslint-plugin-import": "^2.25.3", "eslint-plugin-jest": "28.11.0", "eslint-plugin-node": "^11.1.0", diff --git a/src/client.ts b/src/client.ts index 94fedb5..cdd986b 100644 --- a/src/client.ts +++ b/src/client.ts @@ -18,7 +18,13 @@ import { SERVER_REQUEST, WS_CLOSE_CODE } from "#src/shared/enums.ts"; -import type { JSONSerializable, StreamType, BusMessage, AvailableFeatures } from "#src/shared/types"; +import type { + JSONSerializable, + StreamType, + BusMessage, + AvailableFeatures, + StartupData +} from "#src/shared/types"; import type { TransportConfig, SessionId, SessionInfo } from "#src/models/session"; interface Consumers { @@ -55,11 +61,13 @@ export enum CLIENT_UPDATE { /** A session has left the channel */ DISCONNECT = "disconnect", /** Session info has changed */ - INFO_CHANGE = "info_change" + INFO_CHANGE = "info_change", + CHANNEL_INFO_CHANGE = "channel_info_change" } type ClientUpdatePayload = | { senderId: SessionId; message: JSONSerializable } | { sessionId: SessionId } + | { isRecording: boolean } | Record | { type: StreamType; @@ -142,9 +150,10 @@ export class SfuClient extends EventTarget { /** Connection errors encountered */ public errors: Error[] = []; public availableFeatures: AvailableFeatures = { - "rtc": false, - "recording": false, + rtc: false, + recording: false }; + public isRecording: boolean = false; /** Current client state */ private _state: SfuClientState = SfuClientState.DISCONNECTED; /** Communication bus */ @@ -266,7 +275,7 @@ export class SfuClient extends EventTarget { } return this._bus?.request( { - name: CLIENT_REQUEST.START_RECORDING, + name: CLIENT_REQUEST.START_RECORDING }, { batch: true } ); @@ -278,7 +287,7 @@ export class SfuClient extends EventTarget { } return this._bus?.request( { - name: CLIENT_REQUEST.STOP_RECORDING, + name: CLIENT_REQUEST.STOP_RECORDING }, { batch: true } ); @@ -473,7 +482,13 @@ export class SfuClient extends EventTarget { webSocket.addEventListener( "message", (message) => { - this.availableFeatures = JSON.parse(message.data) as AvailableFeatures; + if (message.data) { + const { availableFeatures, isRecording } = JSON.parse( + message.data + ) as StartupData; + this.availableFeatures = availableFeatures; + this.isRecording = isRecording; + } resolve(new Bus(webSocket)); }, { once: true } @@ -604,6 +619,10 @@ export class SfuClient extends EventTarget { case SERVER_MESSAGE.INFO_CHANGE: this._updateClient(CLIENT_UPDATE.INFO_CHANGE, payload); break; + case SERVER_MESSAGE.CHANNEL_INFO_CHANGE: + this.isRecording = payload.isRecording; + this._updateClient(CLIENT_UPDATE.CHANNEL_INFO_CHANGE, payload); + break; } } diff --git a/src/config.ts b/src/config.ts index b7d209e..7376b1b 100644 --- a/src/config.ts +++ b/src/config.ts @@ -11,6 +11,7 @@ import type { ProducerOptions } from "mediasoup-client/lib/Producer"; const FALSY_INPUT = new Set(["disable", "false", "none", "no", "0"]); type LogLevel = "none" | "error" | "warn" | "info" | "debug" | "verbose"; type WorkerLogLevel = "none" | "error" | "warn" | "debug"; +const testingMode = Boolean(process.env.JEST_WORKER_ID); // ------------------------------------------------------------ // ------------------ ENV VARIABLES ----------------------- @@ -22,7 +23,7 @@ type WorkerLogLevel = "none" | "error" | "warn" | "debug"; * e.g: AUTH_KEY=u6bsUQEWrHdKIuYplirRnbBmLbrKV5PxKG7DtA71mng= */ export const AUTH_KEY: string = process.env.AUTH_KEY!; -if (!AUTH_KEY && !process.env.JEST_WORKER_ID) { +if (!AUTH_KEY && !testingMode) { throw new Error( "AUTH_KEY env variable is required, it is not possible to authenticate requests without it" ); @@ -34,7 +35,7 @@ if (!AUTH_KEY && !process.env.JEST_WORKER_ID) { * e.g: PUBLIC_IP=190.165.1.70 */ export const PUBLIC_IP: string = process.env.PUBLIC_IP!; -if (!PUBLIC_IP && !process.env.JEST_WORKER_ID) { +if (!PUBLIC_IP && !testingMode) { throw new Error( "PUBLIC_IP env variable is required, clients cannot establish webRTC connections without it" ); @@ -67,7 +68,7 @@ export const PORT: number = Number(process.env.PORT) || 8070; /** * Whether the recording feature is enabled, false by default. */ -export const RECORDING: boolean = Boolean(process.env.RECORDING); +export const RECORDING: boolean = Boolean(process.env.RECORDING) || testingMode; /** * The number of workers to spawn (up to core limits) to manage RTC servers. @@ -199,7 +200,7 @@ export const timeouts: TimeoutConfig = Object.freeze({ // how long before a channel is closed after the last session leaves channel: 60 * 60_000, // how long to wait to gather messages before sending through the bus - busBatch: process.env.JEST_WORKER_ID ? 10 : 300 + busBatch: testingMode ? 10 : 300 }); export const recording = Object.freeze({ @@ -212,16 +213,15 @@ export const recording = Object.freeze({ audioCodec: "aac", audioLimit: 20, cameraLimit: 4, // how many camera can be merged into one recording - screenLimit: 1, + screenLimit: 1 }); // TODO: This should probably be env variable, and at least documented so that deployment can open these ports. export const dynamicPorts = Object.freeze({ min: 50000, - max: 59999, + max: 59999 }); - // how many errors can occur before the session is closed, recovery attempts will be made until this limit is reached export const maxSessionErrors: number = 6; diff --git a/src/models/channel.ts b/src/models/channel.ts index e9ecc3a..0a89517 100644 --- a/src/models/channel.ts +++ b/src/models/channel.ts @@ -14,6 +14,7 @@ import { } from "#src/models/session.ts"; import { Recorder } from "#src/models/recorder.ts"; import { getWorker, type RtcWorker } from "#src/services/resources.ts"; +import { SERVER_MESSAGE } from "#src/shared/enums.ts"; const logger = new Logger("CHANNEL"); @@ -187,7 +188,11 @@ export class Channel extends EventEmitter { const now = new Date(); this.createDate = now.toISOString(); this.remoteAddress = remoteAddress; - this.recorder = config.recording.enabled && options.recordingAddress ? new Recorder(this, options.recordingAddress) : undefined; + this.recorder = + config.recording.enabled && options.recordingAddress + ? new Recorder(this, options.recordingAddress) + : undefined; + this.recorder?.on("stateChange", () => this._broadcastState()); this.key = key ? Buffer.from(key, "base64") : undefined; this.uuid = crypto.randomUUID(); this.name = `${remoteAddress}*${this.uuid.slice(-5)}`; @@ -315,6 +320,28 @@ export class Channel extends EventEmitter { this.emit("close", this.uuid); } + /** + * Broadcast the state of this channel to all its participants + */ + private _broadcastState() { + for (const session of this.sessions.values()) { + // TODO maybe the following should be on session and some can be made in common with the startupData getter. + if (!session.bus) { + logger.warn(`tried to broadcast state to session ${session.id}, but had no Bus`); + continue; + } + session.bus.send( + { + name: SERVER_MESSAGE.CHANNEL_INFO_CHANGE, + payload: { + isRecording: Boolean(this.recorder?.isRecording) + } + }, + { batch: true } + ); + } + } + /** * @param event - Close event with session ID * @fires Channel#sessionLeave diff --git a/src/models/recorder.ts b/src/models/recorder.ts index c2a0c5e..6f4afa2 100644 --- a/src/models/recorder.ts +++ b/src/models/recorder.ts @@ -1,22 +1,22 @@ import { EventEmitter } from "node:events"; -import type { Channel } from "./channel"; -import { getFolder } from "#src/services/resources.ts"; -import type { Folder } from "#src/services/resources.ts"; +import { getFolder, type Folder } from "#src/services/resources.ts"; import { Logger } from "#src/utils/utils.ts"; +import type { Channel } from "./channel"; + export enum RECORDER_STATE { STARTED = "started", - STOPPED = "stopped", + STOPPED = "stopped" } const logger = new Logger("RECORDER"); export class Recorder extends EventEmitter { channel: Channel; - state: RECORDER_STATE = RECORDER_STATE.STOPPED; folder: Folder | undefined; ffmpeg = null; /** Path to which the final recording will be uploaded to */ recordingAddress: string; + private _state: RECORDER_STATE = RECORDER_STATE.STOPPED; constructor(channel: Channel, recordingAddress: string) { super(); @@ -26,10 +26,10 @@ export class Recorder extends EventEmitter { async start() { if (this.state === RECORDER_STATE.STOPPED) { - this.folder = getFolder(); + this.folder = getFolder(); this.state = RECORDER_STATE.STARTED; - logger.trace("TO IMPLEMENT"); - // TODO ffmpeg instance creation for recording to folder.path with proper name, start, build timestamps object + logger.trace("TO IMPLEMENT"); + // TODO ffmpeg instance creation for recording to folder.path with proper name, start, build timestamps object } this._record(); return { state: this.state }; @@ -38,7 +38,11 @@ export class Recorder extends EventEmitter { async stop() { if (this.state === RECORDER_STATE.STARTED) { logger.trace("TO IMPLEMENT"); - await this.folder!.seal("test-name"); + try { + await this.folder!.seal("test-name"); + } catch { + logger.verbose("failed to save the recording"); // TODO maybe warn and give more info + } this.folder = undefined; // TODO ffmpeg instance stop, cleanup, // only resolve promise and switch state when completely ready to start a new recording. @@ -47,11 +51,24 @@ export class Recorder extends EventEmitter { return { state: this.state }; } + get isRecording(): boolean { + return this.state === RECORDER_STATE.STARTED; + } + + get state(): RECORDER_STATE { + return this._state; + } + + set state(state: RECORDER_STATE) { + this._state = state; + this.emit("stateChange", state); + } + /** * @param video whether we want to record videos or not (will always record audio) */ _record(video: boolean = false) { - console.trace(`TO IMPLEMENT: recording channel ${this.channel.name}, video: ${video}`); + logger.trace(`TO IMPLEMENT: recording channel ${this.channel.name}, video: ${video}`); // iterate all producers on all sessions of the channel, create a ffmpeg for each, // save them on a map by session id+type. // check if recording for that session id+type is already in progress diff --git a/src/models/session.ts b/src/models/session.ts index b50eb81..739775b 100644 --- a/src/models/session.ts +++ b/src/models/session.ts @@ -1,14 +1,14 @@ import { EventEmitter } from "node:events"; import type { - IceParameters, - IceCandidate, - DtlsParameters, - SctpParameters, Consumer, + DtlsParameters, + IceCandidate, + IceParameters, Producer, - WebRtcTransport, - RtpCapabilities + RtpCapabilities, + SctpParameters, + WebRtcTransport } from "mediasoup/node/lib/types"; import * as config from "#src/config.ts"; @@ -20,9 +20,10 @@ import { SERVER_REQUEST, STREAM_TYPE } from "#src/shared/enums.ts"; -import type {JSONSerializable, StreamType, BusMessage, AvailableFeatures } from "#src/shared/types"; +import type { BusMessage, JSONSerializable, StartupData, StreamType } from "#src/shared/types"; import type { Bus } from "#src/shared/bus.ts"; import type { Channel } from "#src/models/channel.ts"; +import { RECORDER_STATE } from "#src/models/recorder.ts"; export type SessionId = number | string; export type SessionInfo = { @@ -168,11 +169,16 @@ export class Session extends EventEmitter { this.setMaxListeners(config.CHANNEL_SIZE * 2); } - get availableFeatures(): AvailableFeatures { + get startupData(): StartupData { return { - "rtc": Boolean(this._channel.router), - "recording": Boolean(this._channel.router && this._channel.recorder && this.permissions.recording) - } + availableFeatures: { + rtc: Boolean(this._channel.router), + recording: Boolean( + this._channel.router && this._channel.recorder && this.permissions.recording + ) + }, + isRecording: this._channel.recorder?.state === RECORDER_STATE.STARTED + }; } get name(): string { diff --git a/src/services/ws.ts b/src/services/ws.ts index d6f6bea..620fca6 100644 --- a/src/services/ws.ts +++ b/src/services/ws.ts @@ -131,7 +131,7 @@ function connect(webSocket: WebSocket, credentials: Credentials): Session { const bus = new Bus(webSocket, { batchDelay: config.timeouts.busBatch }); const { session } = Channel.join(channel.uuid, session_id); session.updatePermissions(permissions); - webSocket.send(JSON.stringify(session.availableFeatures)); // client can start using ws after this message. + webSocket.send(JSON.stringify(session.startupData)); // client can start using ws after this message. session.once("close", ({ code }: { code: string }) => { let wsCloseCode = WS_CLOSE_CODE.CLEAN; switch (code) { diff --git a/src/shared/enums.ts b/src/shared/enums.ts index e5a3a92..3d9ea18 100644 --- a/src/shared/enums.ts +++ b/src/shared/enums.ts @@ -24,7 +24,9 @@ export enum SERVER_MESSAGE { /** Signals the clients that one of the session in their channel has left */ SESSION_LEAVE = "SESSION_LEAVE", /** Signals the clients that the info (talking, mute,...) of one of the session in their channel has changed */ - INFO_CHANGE = "S_INFO_CHANGE" + INFO_CHANGE = "S_INFO_CHANGE", + /** Signals the clients that the info of the channel (isRecording,...) has changed */ + CHANNEL_INFO_CHANGE = "C_INFO_CHANGE" } export enum CLIENT_REQUEST { diff --git a/src/shared/types.ts b/src/shared/types.ts index f59c891..0453813 100644 --- a/src/shared/types.ts +++ b/src/shared/types.ts @@ -10,10 +10,14 @@ export type StreamType = "audio" | "camera" | "screen"; export type StringLike = Buffer | string; +export type StartupData = { + availableFeatures: AvailableFeatures; + isRecording: boolean; +}; export type AvailableFeatures = { - "rtc": boolean, - "recording": boolean, -} + rtc: boolean; + recording: boolean; +}; import type { DownloadStates } from "#src/client.ts"; import type { SessionId, SessionInfo, TransportConfig } from "#src/models/session.ts"; @@ -62,6 +66,7 @@ export type BusMessage = } | { name: typeof SERVER_MESSAGE.SESSION_LEAVE; payload: { sessionId: SessionId } } | { name: typeof SERVER_MESSAGE.INFO_CHANGE; payload: Record } + | { name: typeof SERVER_MESSAGE.CHANNEL_INFO_CHANGE; payload: { isRecording: boolean } } | { name: typeof SERVER_REQUEST.INIT_CONSUMER; payload: { diff --git a/tests/network.test.ts b/tests/network.test.ts index 080787f..fd21080 100644 --- a/tests/network.test.ts +++ b/tests/network.test.ts @@ -141,11 +141,9 @@ describe("Full network", () => { test("A client can forward a track to other clients", async () => { const channelUUID = await network.getChannelUUID(); const user1 = await network.connect(channelUUID, 1); - await once(user1.session, "stateChange"); const user2 = await network.connect(channelUUID, 2); - await once(user2.session, "stateChange"); const sender = await network.connect(channelUUID, 3); - await once(sender.session, "stateChange"); + await Promise.all([user1.isConnected, user2.isConnected, sender.isConnected]); const track = new FakeMediaStreamTrack({ kind: "audio" }); await sender.sfuClient.updateUpload(STREAM_TYPE.AUDIO, track); const prom1 = once(user1.sfuClient, "update"); @@ -161,9 +159,8 @@ describe("Full network", () => { test("Recovery attempts are made if the production fails, a failure does not close the connection", async () => { const channelUUID = await network.getChannelUUID(); const user = await network.connect(channelUUID, 1); - await once(user.session, "stateChange"); const sender = await network.connect(channelUUID, 3); - await once(sender.session, "stateChange"); + await Promise.all([user.isConnected, sender.isConnected]); const track = new FakeMediaStreamTrack({ kind: "audio" }); // closing the transport so the `updateUpload` should fail. // @ts-expect-error accessing private property for testing purposes @@ -175,9 +172,8 @@ describe("Full network", () => { test("Recovery attempts are made if the consumption fails, a failure does not close the connection", async () => { const channelUUID = await network.getChannelUUID(); const user = await network.connect(channelUUID, 1); - await once(user.session, "stateChange"); const sender = await network.connect(channelUUID, 3); - await once(sender.session, "stateChange"); + await Promise.all([user.isConnected, sender.isConnected]); const track = new FakeMediaStreamTrack({ kind: "audio" }); // closing the transport so the consumption should fail. // @ts-expect-error accessing private property for testing purposes @@ -191,9 +187,8 @@ describe("Full network", () => { test("The client can obtain download and upload statistics", async () => { const channelUUID = await network.getChannelUUID(); const user1 = await network.connect(channelUUID, 1); - await once(user1.session, "stateChange"); const sender = await network.connect(channelUUID, 3); - await once(sender.session, "stateChange"); + await Promise.all([user1.isConnected, sender.isConnected]); const track = new FakeMediaStreamTrack({ kind: "audio" }); await sender.sfuClient.updateUpload(STREAM_TYPE.AUDIO, track); await once(user1.sfuClient, "update"); @@ -206,9 +201,8 @@ describe("Full network", () => { test("The client can update the state of their downloads", async () => { const channelUUID = await network.getChannelUUID(); const user1 = await network.connect(channelUUID, 1234); - await once(user1.session, "stateChange"); const sender = await network.connect(channelUUID, 123); - await once(sender.session, "stateChange"); + await Promise.all([user1.isConnected, sender.isConnected]); const track = new FakeMediaStreamTrack({ kind: "audio" }); await sender.sfuClient.updateUpload(STREAM_TYPE.AUDIO, track); await once(user1.sfuClient, "update"); @@ -228,9 +222,8 @@ describe("Full network", () => { test("The client can update the state of their upload", async () => { const channelUUID = await network.getChannelUUID(); const user1 = await network.connect(channelUUID, 1234); - await once(user1.session, "stateChange"); const sender = await network.connect(channelUUID, 123); - await once(sender.session, "stateChange"); + await Promise.all([user1.isConnected, sender.isConnected]); const track = new FakeMediaStreamTrack({ kind: "video" }); await sender.sfuClient.updateUpload(STREAM_TYPE.CAMERA, track); await once(user1.sfuClient, "update"); @@ -287,13 +280,12 @@ describe("Full network", () => { test("POC RECORDING", async () => { const channelUUID = await network.getChannelUUID(); const user1 = await network.connect(channelUUID, 1); - await once(user1.session, "stateChange"); const sender = await network.connect(channelUUID, 3); - await once(sender.session, "stateChange"); - sender.session.updatePermissions({ recording: true }); - const startResult = await sender.sfuClient.startRecording() as { state: string }; + await Promise.all([user1.isConnected, sender.isConnected]); + expect(sender.sfuClient.availableFeatures.recording).toBe(true); + const startResult = (await sender.sfuClient.startRecording()) as { state: string }; expect(startResult.state).toBe("started"); - const stopResult = await sender.sfuClient.stopRecording() as { state: string }; + const stopResult = (await sender.sfuClient.stopRecording()) as { state: string }; expect(stopResult.state).toBe("stopped"); }); }); diff --git a/tests/utils/network.ts b/tests/utils/network.ts index 9ac1d16..33d6cfc 100644 --- a/tests/utils/network.ts +++ b/tests/utils/network.ts @@ -37,6 +37,7 @@ interface ConnectionResult { session: Session; /** Client-side SFU client instance */ sfuClient: SfuClient; + isConnected: Promise; } /** @@ -71,7 +72,7 @@ export class LocalNetwork { // Start all services in correct order await resources.start(); await http.start({ httpInterface: hostname, port }); - await auth.start(HMAC_B64_KEY); + auth.start(HMAC_B64_KEY); } /** @@ -90,9 +91,9 @@ export class LocalNetwork { iss: `http://${this.hostname}:${this.port}/`, key }); - + const TEST_RECORDING_ADDRESS = "http://localhost:8080"; // TODO maybe to change and use that later const response = await fetch( - `http://${this.hostname}:${this.port}/v${http.API_VERSION}/channel?webRTC=${useWebRtc}`, + `http://${this.hostname}:${this.port}/v${http.API_VERSION}/channel?webRTC=${useWebRtc}&recordingAddress=${TEST_RECORDING_ADDRESS}`, { method: "GET", headers: { @@ -167,17 +168,30 @@ export class LocalNetwork { break; } }; - sfuClient.addEventListener("stateChange", handleStateChange as EventListener); }); + const isConnected = new Promise((resolve, reject) => { + const connectedHandler = (event: CustomEvent) => { + const { state } = event.detail; + if (state === SfuClientState.CONNECTED) { + sfuClient.removeEventListener("stateChange", connectedHandler as EventListener); + resolve(true); + } + }; + sfuClient.addEventListener("stateChange", connectedHandler as EventListener); + }); + // Start connection sfuClient.connect( `ws://${this.hostname}:${this.port}`, this.makeJwt( { sfu_channel_uuid: channelUUID, - session_id: sessionId + session_id: sessionId, + permissions: { + recording: true + } }, key ), @@ -198,7 +212,7 @@ export class LocalNetwork { throw new Error(`Session ${sessionId} not found in channel ${channelUUID}`); } - return { session, sfuClient }; + return { session, sfuClient, isConnected }; } /** From a183da5efcb781b207034be06c9c342c6f5f53e5 Mon Sep 17 00:00:00 2001 From: ThanhDodeurOdoo Date: Wed, 5 Nov 2025 10:39:54 +0100 Subject: [PATCH 10/12] [WIP] recording: ts version bump, bundle, simpler rec api --- package-lock.json | 196 +++++++++++++++++++++++++---------------- package.json | 10 +-- rollup.config.js | 3 - src/client.ts | 4 +- src/models/channel.ts | 2 + src/models/recorder.ts | 4 +- src/models/session.ts | 1 + src/services/auth.ts | 2 +- src/shared/types.ts | 2 +- tests/network.test.ts | 8 +- tests/utils/network.ts | 1 + tsconfig.json | 4 - tsconfig_bundle.json | 4 +- 13 files changed, 141 insertions(+), 100 deletions(-) diff --git a/package-lock.json b/package-lock.json index f2c5b56..89355c4 100644 --- a/package-lock.json +++ b/package-lock.json @@ -16,12 +16,12 @@ "@jest/globals": "^29.6.2", "@rollup/plugin-commonjs": "^25.0.7", "@rollup/plugin-node-resolve": "^13.0.4", - "@rollup/plugin-typescript": "^10.0.1", + "@rollup/plugin-typescript": "^12.1.2", "@types/jest": "^29.5.0", - "@types/node": "^20.5.0", + "@types/node": "^22.13.14", "@types/ws": "^8.18.1", - "@typescript-eslint/eslint-plugin": "8.32.1", - "@typescript-eslint/parser": "8.32.1", + "@typescript-eslint/eslint-plugin": "^8.46.3", + "@typescript-eslint/parser": "^8.46.3", "eslint": "8.57.1", "eslint-config-prettier": "^10.1.8", "eslint-plugin-import": "^2.25.3", @@ -36,7 +36,7 @@ "rollup": "^2.79.1", "rollup-plugin-license": "3.2.0", "ts-jest": "^29.3.4", - "typescript": "~5.4.3" + "typescript": "~5.9.3" }, "engines": { "node": ">=22.16.0" @@ -1344,20 +1344,20 @@ "dev": true }, "node_modules/@rollup/plugin-typescript": { - "version": "10.0.1", - "resolved": "https://registry.npmjs.org/@rollup/plugin-typescript/-/plugin-typescript-10.0.1.tgz", - "integrity": "sha512-wBykxRLlX7EzL8BmUqMqk5zpx2onnmRMSw/l9M1sVfkJvdwfxogZQVNUM9gVMJbjRLDR5H6U0OMOrlDGmIV45A==", + "version": "12.3.0", + "resolved": "https://registry.npmjs.org/@rollup/plugin-typescript/-/plugin-typescript-12.3.0.tgz", + "integrity": "sha512-7DP0/p7y3t67+NabT9f8oTBFE6gGkto4SA6Np2oudYmZE/m1dt8RB0SjL1msMxFpLo631qjRCcBlAbq1ml/Big==", "dev": true, "license": "MIT", "dependencies": { - "@rollup/pluginutils": "^5.0.1", + "@rollup/pluginutils": "^5.1.0", "resolve": "^1.22.1" }, "engines": { "node": ">=14.0.0" }, "peerDependencies": { - "rollup": "^2.14.0||^3.0.0", + "rollup": "^2.14.0||^3.0.0||^4.0.0", "tslib": "*", "typescript": ">=3.7.0" }, @@ -1535,12 +1535,13 @@ "dev": true }, "node_modules/@types/node": { - "version": "20.10.4", - "resolved": "https://registry.npmjs.org/@types/node/-/node-20.10.4.tgz", - "integrity": "sha512-D08YG6rr8X90YB56tSIuBaddy/UXAA9RKJoFvrsnogAum/0pmjkgi4+2nx96A330FmioegBWmEYQ+syqCFaveg==", + "version": "22.19.0", + "resolved": "https://registry.npmjs.org/@types/node/-/node-22.19.0.tgz", + "integrity": "sha512-xpr/lmLPQEj+TUnHmR+Ab91/glhJvsqcjB+yY0Ix9GO70H6Lb4FHH5GeqdOE5btAx7eIMwuHkp4H2MSkLcqWbA==", "dev": true, + "license": "MIT", "dependencies": { - "undici-types": "~5.26.4" + "undici-types": "~6.21.0" } }, "node_modules/@types/node-fetch": { @@ -1601,17 +1602,17 @@ "dev": true }, "node_modules/@typescript-eslint/eslint-plugin": { - "version": "8.32.1", - "resolved": "https://registry.npmjs.org/@typescript-eslint/eslint-plugin/-/eslint-plugin-8.32.1.tgz", - "integrity": "sha512-6u6Plg9nP/J1GRpe/vcjjabo6Uc5YQPAMxsgQyGC/I0RuukiG1wIe3+Vtg3IrSCVJDmqK3j8adrtzXSENRtFgg==", + "version": "8.46.3", + "resolved": "https://registry.npmjs.org/@typescript-eslint/eslint-plugin/-/eslint-plugin-8.46.3.tgz", + "integrity": "sha512-sbaQ27XBUopBkRiuY/P9sWGOWUW4rl8fDoHIUmLpZd8uldsTyB4/Zg6bWTegPoTLnKj9Hqgn3QD6cjPNB32Odw==", "dev": true, "license": "MIT", "dependencies": { "@eslint-community/regexpp": "^4.10.0", - "@typescript-eslint/scope-manager": "8.32.1", - "@typescript-eslint/type-utils": "8.32.1", - "@typescript-eslint/utils": "8.32.1", - "@typescript-eslint/visitor-keys": "8.32.1", + "@typescript-eslint/scope-manager": "8.46.3", + "@typescript-eslint/type-utils": "8.46.3", + "@typescript-eslint/utils": "8.46.3", + "@typescript-eslint/visitor-keys": "8.46.3", "graphemer": "^1.4.0", "ignore": "^7.0.0", "natural-compare": "^1.4.0", @@ -1625,9 +1626,9 @@ "url": "https://opencollective.com/typescript-eslint" }, "peerDependencies": { - "@typescript-eslint/parser": "^8.0.0 || ^8.0.0-alpha.0", + "@typescript-eslint/parser": "^8.46.3", "eslint": "^8.57.0 || ^9.0.0", - "typescript": ">=4.8.4 <5.9.0" + "typescript": ">=4.8.4 <6.0.0" } }, "node_modules/@typescript-eslint/eslint-plugin/node_modules/ignore": { @@ -1641,16 +1642,16 @@ } }, "node_modules/@typescript-eslint/parser": { - "version": "8.32.1", - "resolved": "https://registry.npmjs.org/@typescript-eslint/parser/-/parser-8.32.1.tgz", - "integrity": "sha512-LKMrmwCPoLhM45Z00O1ulb6jwyVr2kr3XJp+G+tSEZcbauNnScewcQwtJqXDhXeYPDEjZ8C1SjXm015CirEmGg==", + "version": "8.46.3", + "resolved": "https://registry.npmjs.org/@typescript-eslint/parser/-/parser-8.46.3.tgz", + "integrity": "sha512-6m1I5RmHBGTnUGS113G04DMu3CpSdxCAU/UvtjNWL4Nuf3MW9tQhiJqRlHzChIkhy6kZSAQmc+I1bcGjE3yNKg==", "dev": true, "license": "MIT", "dependencies": { - "@typescript-eslint/scope-manager": "8.32.1", - "@typescript-eslint/types": "8.32.1", - "@typescript-eslint/typescript-estree": "8.32.1", - "@typescript-eslint/visitor-keys": "8.32.1", + "@typescript-eslint/scope-manager": "8.46.3", + "@typescript-eslint/types": "8.46.3", + "@typescript-eslint/typescript-estree": "8.46.3", + "@typescript-eslint/visitor-keys": "8.46.3", "debug": "^4.3.4" }, "engines": { @@ -1662,18 +1663,40 @@ }, "peerDependencies": { "eslint": "^8.57.0 || ^9.0.0", - "typescript": ">=4.8.4 <5.9.0" + "typescript": ">=4.8.4 <6.0.0" + } + }, + "node_modules/@typescript-eslint/project-service": { + "version": "8.46.3", + "resolved": "https://registry.npmjs.org/@typescript-eslint/project-service/-/project-service-8.46.3.tgz", + "integrity": "sha512-Fz8yFXsp2wDFeUElO88S9n4w1I4CWDTXDqDr9gYvZgUpwXQqmZBr9+NTTql5R3J7+hrJZPdpiWaB9VNhAKYLuQ==", + "dev": true, + "license": "MIT", + "dependencies": { + "@typescript-eslint/tsconfig-utils": "^8.46.3", + "@typescript-eslint/types": "^8.46.3", + "debug": "^4.3.4" + }, + "engines": { + "node": "^18.18.0 || ^20.9.0 || >=21.1.0" + }, + "funding": { + "type": "opencollective", + "url": "https://opencollective.com/typescript-eslint" + }, + "peerDependencies": { + "typescript": ">=4.8.4 <6.0.0" } }, "node_modules/@typescript-eslint/scope-manager": { - "version": "8.32.1", - "resolved": "https://registry.npmjs.org/@typescript-eslint/scope-manager/-/scope-manager-8.32.1.tgz", - "integrity": "sha512-7IsIaIDeZn7kffk7qXC3o6Z4UblZJKV3UBpkvRNpr5NSyLji7tvTcvmnMNYuYLyh26mN8W723xpo3i4MlD33vA==", + "version": "8.46.3", + "resolved": "https://registry.npmjs.org/@typescript-eslint/scope-manager/-/scope-manager-8.46.3.tgz", + "integrity": "sha512-FCi7Y1zgrmxp3DfWfr+3m9ansUUFoy8dkEdeQSgA9gbm8DaHYvZCdkFRQrtKiedFf3Ha6VmoqoAaP68+i+22kg==", "dev": true, "license": "MIT", "dependencies": { - "@typescript-eslint/types": "8.32.1", - "@typescript-eslint/visitor-keys": "8.32.1" + "@typescript-eslint/types": "8.46.3", + "@typescript-eslint/visitor-keys": "8.46.3" }, "engines": { "node": "^18.18.0 || ^20.9.0 || >=21.1.0" @@ -1683,15 +1706,33 @@ "url": "https://opencollective.com/typescript-eslint" } }, + "node_modules/@typescript-eslint/tsconfig-utils": { + "version": "8.46.3", + "resolved": "https://registry.npmjs.org/@typescript-eslint/tsconfig-utils/-/tsconfig-utils-8.46.3.tgz", + "integrity": "sha512-GLupljMniHNIROP0zE7nCcybptolcH8QZfXOpCfhQDAdwJ/ZTlcaBOYebSOZotpti/3HrHSw7D3PZm75gYFsOA==", + "dev": true, + "license": "MIT", + "engines": { + "node": "^18.18.0 || ^20.9.0 || >=21.1.0" + }, + "funding": { + "type": "opencollective", + "url": "https://opencollective.com/typescript-eslint" + }, + "peerDependencies": { + "typescript": ">=4.8.4 <6.0.0" + } + }, "node_modules/@typescript-eslint/type-utils": { - "version": "8.32.1", - "resolved": "https://registry.npmjs.org/@typescript-eslint/type-utils/-/type-utils-8.32.1.tgz", - "integrity": "sha512-mv9YpQGA8iIsl5KyUPi+FGLm7+bA4fgXaeRcFKRDRwDMu4iwrSHeDPipwueNXhdIIZltwCJv+NkxftECbIZWfA==", + "version": "8.46.3", + "resolved": "https://registry.npmjs.org/@typescript-eslint/type-utils/-/type-utils-8.46.3.tgz", + "integrity": "sha512-ZPCADbr+qfz3aiTTYNNkCbUt+cjNwI/5McyANNrFBpVxPt7GqpEYz5ZfdwuFyGUnJ9FdDXbGODUu6iRCI6XRXw==", "dev": true, "license": "MIT", "dependencies": { - "@typescript-eslint/typescript-estree": "8.32.1", - "@typescript-eslint/utils": "8.32.1", + "@typescript-eslint/types": "8.46.3", + "@typescript-eslint/typescript-estree": "8.46.3", + "@typescript-eslint/utils": "8.46.3", "debug": "^4.3.4", "ts-api-utils": "^2.1.0" }, @@ -1704,13 +1745,13 @@ }, "peerDependencies": { "eslint": "^8.57.0 || ^9.0.0", - "typescript": ">=4.8.4 <5.9.0" + "typescript": ">=4.8.4 <6.0.0" } }, "node_modules/@typescript-eslint/types": { - "version": "8.32.1", - "resolved": "https://registry.npmjs.org/@typescript-eslint/types/-/types-8.32.1.tgz", - "integrity": "sha512-YmybwXUJcgGqgAp6bEsgpPXEg6dcCyPyCSr0CAAueacR/CCBi25G3V8gGQ2kRzQRBNol7VQknxMs9HvVa9Rvfg==", + "version": "8.46.3", + "resolved": "https://registry.npmjs.org/@typescript-eslint/types/-/types-8.46.3.tgz", + "integrity": "sha512-G7Ok9WN/ggW7e/tOf8TQYMaxgID3Iujn231hfi0Pc7ZheztIJVpO44ekY00b7akqc6nZcvregk0Jpah3kep6hA==", "dev": true, "license": "MIT", "engines": { @@ -1722,14 +1763,16 @@ } }, "node_modules/@typescript-eslint/typescript-estree": { - "version": "8.32.1", - "resolved": "https://registry.npmjs.org/@typescript-eslint/typescript-estree/-/typescript-estree-8.32.1.tgz", - "integrity": "sha512-Y3AP9EIfYwBb4kWGb+simvPaqQoT5oJuzzj9m0i6FCY6SPvlomY2Ei4UEMm7+FXtlNJbor80ximyslzaQF6xhg==", + "version": "8.46.3", + "resolved": "https://registry.npmjs.org/@typescript-eslint/typescript-estree/-/typescript-estree-8.46.3.tgz", + "integrity": "sha512-f/NvtRjOm80BtNM5OQtlaBdM5BRFUv7gf381j9wygDNL+qOYSNOgtQ/DCndiYi80iIOv76QqaTmp4fa9hwI0OA==", "dev": true, "license": "MIT", "dependencies": { - "@typescript-eslint/types": "8.32.1", - "@typescript-eslint/visitor-keys": "8.32.1", + "@typescript-eslint/project-service": "8.46.3", + "@typescript-eslint/tsconfig-utils": "8.46.3", + "@typescript-eslint/types": "8.46.3", + "@typescript-eslint/visitor-keys": "8.46.3", "debug": "^4.3.4", "fast-glob": "^3.3.2", "is-glob": "^4.0.3", @@ -1745,7 +1788,7 @@ "url": "https://opencollective.com/typescript-eslint" }, "peerDependencies": { - "typescript": ">=4.8.4 <5.9.0" + "typescript": ">=4.8.4 <6.0.0" } }, "node_modules/@typescript-eslint/typescript-estree/node_modules/brace-expansion": { @@ -1775,9 +1818,9 @@ } }, "node_modules/@typescript-eslint/typescript-estree/node_modules/semver": { - "version": "7.7.2", - "resolved": "https://registry.npmjs.org/semver/-/semver-7.7.2.tgz", - "integrity": "sha512-RF0Fw+rO5AMf9MAyaRXI4AV0Ulj5lMHqVxxdSgiVbixSCXoEmmX/jk0CuJw4+3SqroYO9VoUh+HcuJivvtJemA==", + "version": "7.7.3", + "resolved": "https://registry.npmjs.org/semver/-/semver-7.7.3.tgz", + "integrity": "sha512-SdsKMrI9TdgjdweUSR9MweHA4EJ8YxHn8DFaDisvhVlUOe4BF1tLD7GAj0lIqWVl+dPb/rExr0Btby5loQm20Q==", "dev": true, "license": "ISC", "bin": { @@ -1788,16 +1831,16 @@ } }, "node_modules/@typescript-eslint/utils": { - "version": "8.32.1", - "resolved": "https://registry.npmjs.org/@typescript-eslint/utils/-/utils-8.32.1.tgz", - "integrity": "sha512-DsSFNIgLSrc89gpq1LJB7Hm1YpuhK086DRDJSNrewcGvYloWW1vZLHBTIvarKZDcAORIy/uWNx8Gad+4oMpkSA==", + "version": "8.46.3", + "resolved": "https://registry.npmjs.org/@typescript-eslint/utils/-/utils-8.46.3.tgz", + "integrity": "sha512-VXw7qmdkucEx9WkmR3ld/u6VhRyKeiF1uxWwCy/iuNfokjJ7VhsgLSOTjsol8BunSw190zABzpwdNsze2Kpo4g==", "dev": true, "license": "MIT", "dependencies": { "@eslint-community/eslint-utils": "^4.7.0", - "@typescript-eslint/scope-manager": "8.32.1", - "@typescript-eslint/types": "8.32.1", - "@typescript-eslint/typescript-estree": "8.32.1" + "@typescript-eslint/scope-manager": "8.46.3", + "@typescript-eslint/types": "8.46.3", + "@typescript-eslint/typescript-estree": "8.46.3" }, "engines": { "node": "^18.18.0 || ^20.9.0 || >=21.1.0" @@ -1808,18 +1851,18 @@ }, "peerDependencies": { "eslint": "^8.57.0 || ^9.0.0", - "typescript": ">=4.8.4 <5.9.0" + "typescript": ">=4.8.4 <6.0.0" } }, "node_modules/@typescript-eslint/visitor-keys": { - "version": "8.32.1", - "resolved": "https://registry.npmjs.org/@typescript-eslint/visitor-keys/-/visitor-keys-8.32.1.tgz", - "integrity": "sha512-ar0tjQfObzhSaW3C3QNmTc5ofj0hDoNQ5XWrCy6zDyabdr0TWhCkClp+rywGNj/odAFBVzzJrK4tEq5M4Hmu4w==", + "version": "8.46.3", + "resolved": "https://registry.npmjs.org/@typescript-eslint/visitor-keys/-/visitor-keys-8.46.3.tgz", + "integrity": "sha512-uk574k8IU0rOF/AjniX8qbLSGURJVUCeM5e4MIMKBFFi8weeiLrG1fyQejyLXQpRZbU/1BuQasleV/RfHC3hHg==", "dev": true, "license": "MIT", "dependencies": { - "@typescript-eslint/types": "8.32.1", - "eslint-visitor-keys": "^4.2.0" + "@typescript-eslint/types": "8.46.3", + "eslint-visitor-keys": "^4.2.1" }, "engines": { "node": "^18.18.0 || ^20.9.0 || >=21.1.0" @@ -1830,9 +1873,9 @@ } }, "node_modules/@typescript-eslint/visitor-keys/node_modules/eslint-visitor-keys": { - "version": "4.2.0", - "resolved": "https://registry.npmjs.org/eslint-visitor-keys/-/eslint-visitor-keys-4.2.0.tgz", - "integrity": "sha512-UyLnSehNt62FFhSwjZlHmeokpRK59rcz29j+F1/aDgbkbRTk7wIc9XzdoasMUbRNKDM0qQt/+BJ4BrpFeABemw==", + "version": "4.2.1", + "resolved": "https://registry.npmjs.org/eslint-visitor-keys/-/eslint-visitor-keys-4.2.1.tgz", + "integrity": "sha512-Uhdk5sfqcee/9H/rCOJikYz67o0a2Tw2hGRPOG2Y1R2dg7brRe1uG0yaNQDHu+TO/uQPF/5eCapvYSmHUjt7JQ==", "dev": true, "license": "Apache-2.0", "engines": { @@ -7318,9 +7361,9 @@ } }, "node_modules/typescript": { - "version": "5.4.5", - "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.4.5.tgz", - "integrity": "sha512-vcI4UpRgg81oIRUFwR0WSIHKt11nJ7SAVlYNIu+QpqeyXP+gpQJy/Z4+F0aGxSE4MqwjyXvW/TzgkLAx2AGHwQ==", + "version": "5.9.3", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.9.3.tgz", + "integrity": "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==", "dev": true, "license": "Apache-2.0", "bin": { @@ -7420,10 +7463,11 @@ } }, "node_modules/undici-types": { - "version": "5.26.5", - "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-5.26.5.tgz", - "integrity": "sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA==", - "dev": true + "version": "6.21.0", + "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-6.21.0.tgz", + "integrity": "sha512-iwDZqg0QAGrg9Rav5H4n0M64c3mkR59cJ6wQp+7C4nI0gsmExaedaYLNO44eT4AtBBwjbTiGPMlt2Md0T9H9JQ==", + "dev": true, + "license": "MIT" }, "node_modules/update-browserslist-db": { "version": "1.0.13", diff --git a/package.json b/package.json index 9deb2c6..92e333a 100644 --- a/package.json +++ b/package.json @@ -30,12 +30,12 @@ "@jest/globals": "^29.6.2", "@rollup/plugin-commonjs": "^25.0.7", "@rollup/plugin-node-resolve": "^13.0.4", - "@rollup/plugin-typescript": "^10.0.1", + "@rollup/plugin-typescript": "^12.1.2", "@types/jest": "^29.5.0", - "@types/node": "^20.5.0", + "@types/node": "^22.13.14", "@types/ws": "^8.18.1", - "@typescript-eslint/eslint-plugin": "8.32.1", - "@typescript-eslint/parser": "8.32.1", + "@typescript-eslint/eslint-plugin": "^8.46.3", + "@typescript-eslint/parser": "^8.46.3", "eslint": "8.57.1", "eslint-config-prettier": "^10.1.8", "eslint-plugin-import": "^2.25.3", @@ -50,6 +50,6 @@ "rollup": "^2.79.1", "rollup-plugin-license": "3.2.0", "ts-jest": "^29.3.4", - "typescript": "~5.4.3" + "typescript": "~5.9.3" } } diff --git a/rollup.config.js b/rollup.config.js index dc669db..f421cf5 100644 --- a/rollup.config.js +++ b/rollup.config.js @@ -35,9 +35,6 @@ export default { plugins: [ typescript({ tsconfig: "./tsconfig_bundle.json", - declaration: false, - declarationMap: false, - sourceMap: false, }), resolve({ browser: true, diff --git a/src/client.ts b/src/client.ts index cdd986b..63aec13 100644 --- a/src/client.ts +++ b/src/client.ts @@ -271,7 +271,7 @@ export class SfuClient extends EventTarget { } async startRecording() { if (this.state !== SfuClientState.CONNECTED) { - throw new Error("InvalidState"); + return; } return this._bus?.request( { @@ -283,7 +283,7 @@ export class SfuClient extends EventTarget { async stopRecording() { if (this.state !== SfuClientState.CONNECTED) { - throw new Error("InvalidState"); + return; } return this._bus?.request( { diff --git a/src/models/channel.ts b/src/models/channel.ts index 0a89517..89335c1 100644 --- a/src/models/channel.ts +++ b/src/models/channel.ts @@ -130,6 +130,8 @@ export class Channel extends EventEmitter { logger.info( `created channel ${channel.uuid} (${key ? "unique" : "global"} key) for ${safeIssuer}` ); + logger.verbose(`rtc feature: ${Boolean(channel.router)}`); + logger.verbose(`recording feature: ${Boolean(channel.recorder)}`); const onWorkerDeath = () => { logger.warn(`worker died, closing channel ${channel.uuid}`); channel.close(); diff --git a/src/models/recorder.ts b/src/models/recorder.ts index 6f4afa2..d02d186 100644 --- a/src/models/recorder.ts +++ b/src/models/recorder.ts @@ -32,7 +32,7 @@ export class Recorder extends EventEmitter { // TODO ffmpeg instance creation for recording to folder.path with proper name, start, build timestamps object } this._record(); - return { state: this.state }; + return this.isRecording; } async stop() { @@ -48,7 +48,7 @@ export class Recorder extends EventEmitter { // only resolve promise and switch state when completely ready to start a new recording. this.state = RECORDER_STATE.STOPPED; } - return { state: this.state }; + return this.isRecording; } get isRecording(): boolean { diff --git a/src/models/session.ts b/src/models/session.ts index 739775b..077cbd2 100644 --- a/src/models/session.ts +++ b/src/models/session.ts @@ -208,6 +208,7 @@ export class Session extends EventEmitter { continue; } this.permissions[key] = Boolean(permissions[key]); + logger.verbose(`Permissions updated: ${key} = ${this.permissions[key]}`); } } diff --git a/src/services/auth.ts b/src/services/auth.ts index 7efef4a..9d4b0b8 100644 --- a/src/services/auth.ts +++ b/src/services/auth.ts @@ -43,7 +43,7 @@ interface PrivateJWTClaims { sfu_channel_uuid?: string; session_id?: SessionId; ice_servers?: object[]; - permissions?: SessionPermissions, + permissions?: SessionPermissions; sessionIdsByChannel?: Record; /** If provided when requesting a channel, this key will be used instead of the global key to verify JWTs related to this channel */ key?: string; diff --git a/src/shared/types.ts b/src/shared/types.ts index 0453813..0c653da 100644 --- a/src/shared/types.ts +++ b/src/shared/types.ts @@ -30,7 +30,7 @@ import type { RtpParameters // eslint-disable-next-line node/no-unpublished-import } from "mediasoup-client/lib/types"; -import type { CLIENT_MESSAGE, CLIENT_REQUEST, SERVER_MESSAGE, SERVER_REQUEST } from "./enums.ts"; +import type { CLIENT_MESSAGE, CLIENT_REQUEST, SERVER_MESSAGE, SERVER_REQUEST } from "./enums"; export type BusMessage = | { name: typeof CLIENT_MESSAGE.BROADCAST; payload: JSONSerializable } diff --git a/tests/network.test.ts b/tests/network.test.ts index fd21080..a43ef56 100644 --- a/tests/network.test.ts +++ b/tests/network.test.ts @@ -283,9 +283,9 @@ describe("Full network", () => { const sender = await network.connect(channelUUID, 3); await Promise.all([user1.isConnected, sender.isConnected]); expect(sender.sfuClient.availableFeatures.recording).toBe(true); - const startResult = (await sender.sfuClient.startRecording()) as { state: string }; - expect(startResult.state).toBe("started"); - const stopResult = (await sender.sfuClient.stopRecording()) as { state: string }; - expect(stopResult.state).toBe("stopped"); + const startResult = (await sender.sfuClient.startRecording()) as boolean; + expect(startResult).toBe(true); + const stopResult = (await sender.sfuClient.stopRecording()) as boolean; + expect(stopResult).toBe(false); }); }); diff --git a/tests/utils/network.ts b/tests/utils/network.ts index 33d6cfc..943f445 100644 --- a/tests/utils/network.ts +++ b/tests/utils/network.ts @@ -37,6 +37,7 @@ interface ConnectionResult { session: Session; /** Client-side SFU client instance */ sfuClient: SfuClient; + /** Promise resolving to true when client is connected */ isConnected: Promise; } diff --git a/tsconfig.json b/tsconfig.json index 9140ec1..43c116e 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -10,13 +10,11 @@ "lib": ["ES2022", "DOM"], "module": "ESNext", "allowImportingTsExtensions": true, - "noEmit": true, "baseUrl": ".", "paths": { "#src/*": ["./src/*"], "#tests/*": ["./tests/*"] }, - "outDir": "dist", "strict": true, "forceConsistentCasingInFileNames": true, "noUnusedLocals": true, @@ -30,8 +28,6 @@ ], "esModuleInterop": true, "resolveJsonModule": true, - "declaration": true, - "declarationDir": "dist/types", "preserveConstEnums": false, } } diff --git a/tsconfig_bundle.json b/tsconfig_bundle.json index aca1cdc..ba7b048 100644 --- a/tsconfig_bundle.json +++ b/tsconfig_bundle.json @@ -5,8 +5,8 @@ "module": "es6", "declaration": false, "sourceMap": false, - "noEmit": false, - "allowImportingTsExtensions": false + "allowImportingTsExtensions": true, + "noEmit": true }, "include": ["src/client.ts", "src/shared/**/*"], "exclude": ["tests/**/*", "src/server.ts", "src/services/**/*", "src/models/**/*"] From 010f54e032aa77ed9a1b7a898b09cced1d16ddab Mon Sep 17 00:00:00 2001 From: ThanhDodeurOdoo Date: Wed, 5 Nov 2025 11:31:37 +0100 Subject: [PATCH 11/12] [IMP] discuss: wip recording see: https://github.com/odoo/sfu/pull/27 --- src/client.ts | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/client.ts b/src/client.ts index 63aec13..eb2b876 100644 --- a/src/client.ts +++ b/src/client.ts @@ -269,28 +269,28 @@ export class SfuClient extends EventTarget { await Promise.all(proms); return stats; } - async startRecording() { + async startRecording(): Promise { if (this.state !== SfuClientState.CONNECTED) { - return; + throw new Error("Cannot start recording when not connected"); } return this._bus?.request( { name: CLIENT_REQUEST.START_RECORDING }, { batch: true } - ); + ) as Promise; } - async stopRecording() { + async stopRecording(): Promise { if (this.state !== SfuClientState.CONNECTED) { - return; + throw new Error("Cannot stop recording when not connected"); } return this._bus?.request( { name: CLIENT_REQUEST.STOP_RECORDING }, { batch: true } - ); + ) as Promise; } /** From 01bdd3a9b32459e08c708c2e6a64f939d5c48f4c Mon Sep 17 00:00:00 2001 From: ThanhDodeurOdoo Date: Wed, 5 Nov 2025 12:31:29 +0100 Subject: [PATCH 12/12] [IMP] discuss: better bus types --- src/client.ts | 18 +++++++++++------- src/models/session.ts | 14 +++++++++----- src/shared/bus-types.ts | 21 +++++++++++++++++++++ src/shared/bus.ts | 29 +++++++++++++++++------------ tests/bus.test.ts | 7 ++++--- 5 files changed, 62 insertions(+), 27 deletions(-) create mode 100644 src/shared/bus-types.ts diff --git a/src/client.ts b/src/client.ts index eb2b876..33a193a 100644 --- a/src/client.ts +++ b/src/client.ts @@ -25,6 +25,7 @@ import type { AvailableFeatures, StartupData } from "#src/shared/types"; +import type { RequestMessage } from "#src/shared/bus-types"; import type { TransportConfig, SessionId, SessionInfo } from "#src/models/session"; interface Consumers { @@ -273,24 +274,24 @@ export class SfuClient extends EventTarget { if (this.state !== SfuClientState.CONNECTED) { throw new Error("Cannot start recording when not connected"); } - return this._bus?.request( + return this._bus!.request( { name: CLIENT_REQUEST.START_RECORDING }, { batch: true } - ) as Promise; + ); } async stopRecording(): Promise { if (this.state !== SfuClientState.CONNECTED) { throw new Error("Cannot stop recording when not connected"); } - return this._bus?.request( + return this._bus!.request( { name: CLIENT_REQUEST.STOP_RECORDING }, { batch: true } - ) as Promise; + ); } /** @@ -531,10 +532,10 @@ export class SfuClient extends EventTarget { }); transport.on("produce", async ({ kind, rtpParameters, appData }, callback, errback) => { try { - const result = (await this._bus!.request({ + const result = await this._bus!.request({ name: CLIENT_REQUEST.INIT_PRODUCER, payload: { type: appData.type as StreamType, kind, rtpParameters } - })) as { id: string }; + }); callback({ id: result.id }); } catch (error) { errback(error as Error); @@ -626,7 +627,10 @@ export class SfuClient extends EventTarget { } } - private async _handleRequest({ name, payload }: BusMessage): Promise { + private async _handleRequest({ + name, + payload + }: RequestMessage): Promise { switch (name) { case SERVER_REQUEST.INIT_CONSUMER: { const { id, kind, producerId, rtpParameters, sessionId, type, active } = payload; diff --git a/src/models/session.ts b/src/models/session.ts index 077cbd2..2d8ba20 100644 --- a/src/models/session.ts +++ b/src/models/session.ts @@ -21,6 +21,7 @@ import { STREAM_TYPE } from "#src/shared/enums.ts"; import type { BusMessage, JSONSerializable, StartupData, StreamType } from "#src/shared/types"; +import type { RequestMessage } from "#src/shared/bus-types"; import type { Bus } from "#src/shared/bus.ts"; import type { Channel } from "#src/models/channel.ts"; import { RECORDER_STATE } from "#src/models/recorder.ts"; @@ -361,15 +362,15 @@ export class Session extends EventEmitter { this._ctsTransport?.close(); this._stcTransport?.close(); }); - this._clientCapabilities = (await this.bus!.request({ + this._clientCapabilities = await this.bus!.request({ name: SERVER_REQUEST.INIT_TRANSPORTS, payload: { capabilities: this._channel.router!.rtpCapabilities, - stcConfig: this._createTransportConfig(this._stcTransport), - ctsConfig: this._createTransportConfig(this._ctsTransport), + stcConfig: this._createTransportConfig(this._stcTransport!), + ctsConfig: this._createTransportConfig(this._ctsTransport!), producerOptionsByKind: config.rtc.producerOptionsByKind } - })) as RtpCapabilities; + }); await Promise.all([ this._ctsTransport.setMaxIncomingBitrate(config.MAX_BITRATE_IN), this._stcTransport.setMaxOutgoingBitrate(config.MAX_BITRATE_OUT) @@ -628,7 +629,10 @@ export class Session extends EventEmitter { } } - private async _handleRequest({ name, payload }: BusMessage): Promise { + private async _handleRequest({ + name, + payload + }: RequestMessage): Promise { switch (name) { case CLIENT_REQUEST.CONNECT_STC_TRANSPORT: { const { dtlsParameters } = payload; diff --git a/src/shared/bus-types.ts b/src/shared/bus-types.ts new file mode 100644 index 0000000..dc69233 --- /dev/null +++ b/src/shared/bus-types.ts @@ -0,0 +1,21 @@ +// eslint-disable-next-line node/no-unpublished-import +import type { RtpCapabilities } from "mediasoup-client/lib/types"; +import type { CLIENT_REQUEST, SERVER_REQUEST } from "./enums"; +import type { BusMessage } from "./types"; + +export interface RequestMap { + [CLIENT_REQUEST.CONNECT_CTS_TRANSPORT]: void; + [CLIENT_REQUEST.CONNECT_STC_TRANSPORT]: void; + [CLIENT_REQUEST.INIT_PRODUCER]: { id: string }; + [CLIENT_REQUEST.START_RECORDING]: boolean; + [CLIENT_REQUEST.STOP_RECORDING]: boolean; + [SERVER_REQUEST.INIT_CONSUMER]: void; + [SERVER_REQUEST.INIT_TRANSPORTS]: RtpCapabilities; + [SERVER_REQUEST.PING]: void; +} + +export type RequestName = keyof RequestMap; + +export type RequestMessage = Extract; + +export type ResponseFrom = RequestMap[T]; diff --git a/src/shared/bus.ts b/src/shared/bus.ts index 5171485..4edf54e 100644 --- a/src/shared/bus.ts +++ b/src/shared/bus.ts @@ -1,9 +1,11 @@ import type { WebSocket as NodeWebSocket } from "ws"; import type { JSONSerializable, BusMessage } from "./types"; +import type { RequestMessage, RequestName, ResponseFrom } from "./bus-types"; + export interface Payload { /** The actual message content */ - message: BusMessage; + message: BusMessage | JSONSerializable; /** Request ID if this message expects a response */ needResponse?: string; /** Response ID if this message is responding to a request */ @@ -46,11 +48,9 @@ export class Bus { /** Global ID counter for Bus instances */ private static _idCount = 0; /** Message handler for incoming messages */ - // eslint-disable-next-line @typescript-eslint/no-explicit-any public onMessage?: (message: BusMessage) => void; /** Request handler for incoming requests */ - // eslint-disable-next-line @typescript-eslint/no-explicit-any - public onRequest?: (request: BusMessage) => Promise; + public onRequest?: (request: RequestMessage) => Promise; /** Unique bus instance identifier */ public readonly id: number = Bus._idCount++; /** Request counter for generating unique request IDs */ @@ -96,8 +96,10 @@ export class Bus { /** * Sends a request and waits for a response */ - // eslint-disable-next-line @typescript-eslint/no-explicit-any - request(message: BusMessage, options: RequestOptions = {}): Promise { + request( + message: RequestMessage, + options: RequestOptions = {} + ): Promise> { const { timeout = 5000, batch } = options; const requestId = this._getNextRequestId(); return new Promise((resolve, reject) => { @@ -105,7 +107,11 @@ export class Bus { reject(new Error("bus request timed out")); this._pendingRequests.delete(requestId); }, timeout); - this._pendingRequests.set(requestId, { resolve, reject, timeout: timeoutId }); + this._pendingRequests.set(requestId, { + resolve, + reject, + timeout: timeoutId + }); this._sendPayload(message, { needResponse: requestId, batch }); }); } @@ -138,8 +144,7 @@ export class Bus { } private _sendPayload( - // eslint-disable-next-line @typescript-eslint/no-explicit-any - message: BusMessage, + message: BusMessage | JSONSerializable, options: { needResponse?: string; responseTo?: string; @@ -212,11 +217,11 @@ export class Bus { } } else if (needResponse) { // This is a request that expects a response - const response = await this.onRequest?.(message); - this._sendPayload(response!, { responseTo: needResponse }); + const response = await this.onRequest?.(message as RequestMessage); + this._sendPayload(response ?? {}, { responseTo: needResponse }); } else { // This is a plain message - this.onMessage?.(message); + this.onMessage?.(message as BusMessage); } } } diff --git a/tests/bus.test.ts b/tests/bus.test.ts index 2d980cc..c0cc5a7 100644 --- a/tests/bus.test.ts +++ b/tests/bus.test.ts @@ -4,6 +4,7 @@ import { expect, describe, jest } from "@jest/globals"; import { Bus } from "#src/shared/bus"; import type { JSONSerializable, BusMessage } from "#src/shared/types"; +import { RequestMessage } from "#src/shared/bus-types.ts"; class MockTargetWebSocket extends EventTarget { send(message: JSONSerializable) { @@ -74,14 +75,14 @@ describe("Bus API", () => { return "pong"; } }; - const response = await aliceBus.request("ping" as unknown as BusMessage); + const response = await aliceBus.request("ping" as unknown as RequestMessage); expect(response).toBe("pong"); }); test("promises are rejected when the bus is closed", async () => { const { aliceSocket } = mockSocketPair(); const aliceBus = new Bus(aliceSocket as unknown as WebSocket); let rejected = false; - const promise = aliceBus.request("ping" as unknown as BusMessage); + const promise = aliceBus.request("ping" as unknown as RequestMessage); aliceBus.close(); try { await promise; @@ -96,7 +97,7 @@ describe("Bus API", () => { const { aliceSocket } = mockSocketPair(); const aliceBus = new Bus(aliceSocket as unknown as WebSocket); const timeout = 500; - const promise = aliceBus.request("hello" as unknown as BusMessage, { timeout }); + const promise = aliceBus.request("hello" as unknown as RequestMessage, { timeout }); jest.advanceTimersByTime(timeout); await expect(promise).rejects.toThrow(); jest.useRealTimers();