Skip to content

Commit b371f93

Browse files
[IMP] add recording feature
1 parent 2b5a531 commit b371f93

File tree

4 files changed

+242
-1
lines changed

4 files changed

+242
-1
lines changed

src/models/channel.js

+4
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { getAllowedCodecs, Logger } from "#src/utils/utils.js";
55
import { AuthenticationError, OvercrowdedError } from "#src/utils/errors.js";
66
import { Session, SESSION_CLOSE_CODE } from "#src/models/session.js";
77
import { getWorker } from "#src/services/rtc.js";
8+
import { Recorder } from "#src/models/recorder.js";
89

910
const logger = new Logger("CHANNEL");
1011

@@ -39,6 +40,7 @@ export class Channel extends EventEmitter {
3940
name;
4041
/** @type {WithImplicitCoercion<string>} base 64 buffer key */
4142
key;
43+
recorder;
4244
/** @type {import("mediasoup").types.Router}*/
4345
router;
4446
/** @type {Map<number, Session>} */
@@ -130,6 +132,7 @@ export class Channel extends EventEmitter {
130132
this.name = `${remoteAddress}*${this.uuid.slice(-5)}`;
131133
this.router = router;
132134
this._worker = worker;
135+
this.recorder = new Recorder(this);
133136
this._onSessionClose = this._onSessionClose.bind(this);
134137
}
135138

@@ -249,6 +252,7 @@ export class Channel extends EventEmitter {
249252
}
250253
clearTimeout(this._closeTimeout);
251254
this.sessions.clear();
255+
this.recorder.stop();
252256
Channel.records.delete(this.uuid);
253257
/**
254258
* @event Channel#close

src/models/recorder.js

+186
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
import child_process from "node:child_process";
2+
import os from "node:os";
3+
import path from "node:path";
4+
import fs from "node:fs";
5+
6+
import { EventEmitter } from "node:events"; // TODO remove if unnecessary
7+
import { Logger } from "#src/utils/utils.js";
8+
import { STREAM_TYPE } from "#src/shared/enums.js";
9+
10+
const logger = new Logger("RECORDER");
11+
const temp = os.tmpdir();
12+
const FORMAT = "mp4"; // TODO config
13+
const VIDEO_LIMIT = 4; // TODO config (and other name?)
14+
15+
/**
16+
* @typedef {Object} RTPTransports
17+
* @property {Array<import("mediasoup").types.Transport>} audio
18+
* @property {Array<import("mediasoup").types.Transport>} camera
19+
* @property {Array<import("mediasoup").types.Transport>} screen
20+
*/
21+
22+
/**
23+
* Wraps the FFMPEG process
24+
* TODO move in own file
25+
*/
26+
class FFMPEG extends EventEmitter {
27+
/** @type {child_process.ChildProcess} */
28+
_process;
29+
/** @type {string} */
30+
_filePath;
31+
32+
get _args() {
33+
return [
34+
"-loglevel",
35+
"debug", // TODO warning in prod
36+
"-protocol_whitelist",
37+
"pipe,udp,rtp",
38+
"-fflags",
39+
"+genpts",
40+
"-f",
41+
"sdp",
42+
"-i",
43+
"pipe:0",
44+
"-movflags",
45+
"frag_keyframe+empty_moov+default_base_moof", // fragmented
46+
"-c:v",
47+
"libx264", // vid codec
48+
"-c:a",
49+
"aac", // audio codec
50+
"-f",
51+
FORMAT,
52+
this._filePath,
53+
];
54+
}
55+
56+
/**
57+
* @param {string} filePath
58+
*/
59+
constructor(filePath) {
60+
super();
61+
this._filePath = filePath;
62+
}
63+
64+
/**
65+
* @param {String[]} [sdp]
66+
*/
67+
async spawn(sdp) {
68+
this._process = child_process.spawn("ffmpeg", this._args, {
69+
stdio: ["pipe", "pipe", process.stderr],
70+
});
71+
72+
if (!this._process.stdin.writable) {
73+
throw new Error("FFMPEG stdin not writable.");
74+
}
75+
this._process.stdin.write(sdp);
76+
this._process.stdin.end();
77+
78+
this._process.stdout.on("data", (chunk) => {
79+
this.emit("data", chunk); // Emit data chunks as they become available
80+
});
81+
82+
this._process.on("close", (code) => {
83+
if (code === 0) {
84+
this.emit("success");
85+
}
86+
});
87+
88+
logger.debug(
89+
`FFMPEG process (pid:${this._process.pid}) spawned, outputting to ${this._filePath}`
90+
);
91+
}
92+
93+
kill() {
94+
this._process?.kill("SIGINT");
95+
}
96+
}
97+
98+
export class Recorder extends EventEmitter {
99+
static records = new Map();
100+
101+
/** @type {string} */
102+
uuid = crypto.randomUUID();
103+
/** @type {import("#src/models/channel").Channel} */
104+
channel;
105+
/** @type {string} */
106+
state;
107+
ffmpeg;
108+
/** @type {RTPTransports} */
109+
_rtpTransports;
110+
/** @type {string} */
111+
filePath;
112+
/**
113+
* @param {import("#src/models/channel").Channel} channel
114+
*/
115+
constructor(channel) {
116+
super();
117+
this.channel = channel;
118+
this.filePath = path.join(temp, `${this.uuid}.${FORMAT}`);
119+
Recorder.records.set(this.uuid, this);
120+
}
121+
122+
/** @returns {number} */
123+
get videoCount() {
124+
return this._rtpTransports.camera.length + this._rtpTransports.screen.length;
125+
}
126+
127+
/**
128+
* @param {Array} ids
129+
* @returns {string} filePath
130+
*/
131+
start(ids) {
132+
// maybe internal state and check if already recording (recording = has ffmpeg child process).
133+
this.stop();
134+
for (const id of ids) {
135+
const session = this.channel.sessions.get(id);
136+
const audioRtp = this._createRtp(
137+
session.producers[STREAM_TYPE.AUDIO],
138+
STREAM_TYPE.AUDIO
139+
);
140+
audioRtp && this._rtpTransports.audio.push(audioRtp);
141+
for (const type in [STREAM_TYPE.CAMERA, STREAM_TYPE.SCREEN]) {
142+
if (this.videoCount < VIDEO_LIMIT) {
143+
const rtp = this._createRtp(session.producers[type], type);
144+
rtp && this._rtpTransports[type].push(rtp);
145+
}
146+
}
147+
}
148+
this.ffmpeg = new FFMPEG(this.filePath);
149+
this.ffmpeg.spawn(); // args should be base on the rtp transports
150+
this.ffmpeg.once("success", () => {
151+
this.emit("download-ready", this.filePath);
152+
});
153+
return this.filePath;
154+
}
155+
pause() {
156+
// TODO maybe shouldn't be able to pause
157+
}
158+
stop() {
159+
// TODO
160+
// cleanup all rtp transports
161+
// stop ffmpeg process
162+
Recorder.records.delete(this.uuid);
163+
}
164+
165+
/**
166+
* @param {http.ServerResponse} res
167+
*/
168+
pipeToResponse(res) {
169+
// TODO check if this can be executed, otherwise end request?
170+
const fileStream = fs.createReadStream(this._filePath);
171+
res.writeHead(200, {
172+
"Content-Type": `video/${FORMAT}`,
173+
"Content-Disposition": "inline",
174+
});
175+
fileStream.pipe(res); // Pipe the file stream to the response
176+
}
177+
178+
/**
179+
* @param {import("mediasoup").types.Producer} producer
180+
* @param {STREAM_TYPE[keyof STREAM_TYPE]} type
181+
* @return {Promise<void>} probably just create transport with right ports and return that,
182+
*/
183+
async _createRtp(producer, type) {
184+
// TODO
185+
}
186+
}

src/services/http.js

+46-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import * as config from "#src/config.js";
66
import { Logger, parseBody, extractRequestInfo } from "#src/utils/utils.js";
77
import { SESSION_CLOSE_CODE } from "#src/models/session.js";
88
import { Channel } from "#src/models/channel.js";
9+
import { Recorder } from "#src/models/recorder.js";
910

1011
/**
1112
* @typedef {function} routeCallback
@@ -15,6 +16,7 @@ import { Channel } from "#src/models/channel.js";
1516
* @param {string} param2.remoteAddress
1617
* @param {string} param2.protocol
1718
* @param {string} param2.host
19+
* @param {Object} param2.match name/value mapping of route variables
1820
* @param {URLSearchParams} param2.searchParams
1921
* @return {http.ServerResponse}
2022
*/
@@ -77,6 +79,19 @@ export async function start({ httpInterface = config.HTTP_INTERFACE, port = conf
7779
return res.end();
7880
},
7981
});
82+
routeListener.get(`/v${API_VERSION}/recording/<token>`, {
83+
callback: async (req, res, { remoteAddress, match }) => {
84+
try {
85+
const { token } = match;
86+
logger.info(`[${remoteAddress}]: requested recording ${token}`);
87+
Recorder.records.get(token)?.pipeToResponse(res);
88+
// res not ended as we are streaming
89+
} catch (error) {
90+
logger.error(`[${remoteAddress}] failed to obtain recording: ${error.message}`);
91+
return res.end();
92+
}
93+
},
94+
});
8095
routeListener.post(`/v${API_VERSION}/disconnect`, {
8196
callback: async (req, res, { remoteAddress }) => {
8297
try {
@@ -183,7 +198,8 @@ class RouteListener {
183198
break;
184199
}
185200
for (const [pattern, options] of registeredRoutes) {
186-
if (pathname === pattern) {
201+
const match = this._extractPattern(pathname, pattern);
202+
if (match) {
187203
if (options?.cors) {
188204
res.setHeader("Access-Control-Allow-Origin", options.cors);
189205
res.setHeader("Access-Control-Allow-Methods", options.methods);
@@ -195,6 +211,7 @@ class RouteListener {
195211
host,
196212
protocol,
197213
remoteAddress,
214+
match,
198215
searchParams,
199216
});
200217
} catch (error) {
@@ -212,4 +229,32 @@ class RouteListener {
212229
}
213230
return res.end();
214231
}
232+
233+
/**
234+
* Matches a pathname against a pattern with named parameters.
235+
* @param {string} pathname - The URL path requested, e.g., "/channel/6/person/42/"
236+
* @param {string} pattern - The pattern to match, e.g., "/channel/<channelId>/session/<sessionId>"
237+
* @returns {object|undefined} - Returns undefined if no match. If matched, returns an object mapping keys to values,
238+
* the object is empty if matching a pattern with no variables.
239+
* eg: { channelId: "6", sessionId: "42" } | {} | undefined
240+
*/
241+
_extractPattern(pathname, pattern) {
242+
pathname = pathname.replace(/\/+$/, "");
243+
pattern = pattern.replace(/\/+$/, "");
244+
const paramNames = [];
245+
const regexPattern = pattern.replace(/<([^>]+)>/g, (_, paramName) => {
246+
paramNames.push(paramName);
247+
return "([^/]+)";
248+
});
249+
const regex = new RegExp(`^${regexPattern}$`);
250+
const match = pathname.match(regex);
251+
if (!match) {
252+
return;
253+
}
254+
const params = {};
255+
paramNames.forEach((name, index) => {
256+
params[name] = match[index + 1];
257+
});
258+
return params;
259+
}
215260
}

src/shared/enums.js

+6
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,12 @@ export const WS_CLOSE_CODE = {
1212
CHANNEL_FULL: 4109,
1313
};
1414

15+
export const STREAM_TYPE = {
16+
AUDIO: "audio",
17+
CAMERA: "camera",
18+
SCREEN: "screen",
19+
};
20+
1521
export const SERVER_REQUEST = {
1622
/** Requests the creation of a consumer that is used to forward a track to the client */
1723
INIT_CONSUMER: "INIT_CONSUMER",

0 commit comments

Comments
 (0)