Skip to content

Commit 6ad0f49

Browse files
[IMP] add recording feature
1 parent 2b5a531 commit 6ad0f49

File tree

4 files changed

+243
-1
lines changed

4 files changed

+243
-1
lines changed

src/models/channel.js

+4
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { getAllowedCodecs, Logger } from "#src/utils/utils.js";
55
import { AuthenticationError, OvercrowdedError } from "#src/utils/errors.js";
66
import { Session, SESSION_CLOSE_CODE } from "#src/models/session.js";
77
import { getWorker } from "#src/services/rtc.js";
8+
import { Recorder } from "#src/models/recorder.js";
89

910
const logger = new Logger("CHANNEL");
1011

@@ -39,6 +40,7 @@ export class Channel extends EventEmitter {
3940
name;
4041
/** @type {WithImplicitCoercion<string>} base 64 buffer key */
4142
key;
43+
recorder;
4244
/** @type {import("mediasoup").types.Router}*/
4345
router;
4446
/** @type {Map<number, Session>} */
@@ -130,6 +132,7 @@ export class Channel extends EventEmitter {
130132
this.name = `${remoteAddress}*${this.uuid.slice(-5)}`;
131133
this.router = router;
132134
this._worker = worker;
135+
this.recorder = new Recorder(this);
133136
this._onSessionClose = this._onSessionClose.bind(this);
134137
}
135138

@@ -249,6 +252,7 @@ export class Channel extends EventEmitter {
249252
}
250253
clearTimeout(this._closeTimeout);
251254
this.sessions.clear();
255+
this.recorder.stop();
252256
Channel.records.delete(this.uuid);
253257
/**
254258
* @event Channel#close

src/models/recorder.js

+187
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
import child_process from "node:child_process";
2+
import os from "node:os";
3+
import path from "node:path";
4+
import fs from "node:fs";
5+
6+
import { EventEmitter } from "node:events"; // TODO remove if unnecessary
7+
import { Logger } from "#src/utils/utils.js";
8+
import { STREAM_TYPE } from "#src/shared/enums.js";
9+
10+
const logger = new Logger("RECORDER");
11+
const temp = os.tmpdir();
12+
const fileType = "mp4"; // TODO config
13+
const VIDEO_LIMIT = 4; // TODO config (and other name?)
14+
15+
/**
16+
* @typedef {Object} RTPTransports
17+
* @property {Array<import("mediasoup").types.Transport>} audio
18+
* @property {Array<import("mediasoup").types.Transport>} camera
19+
* @property {Array<import("mediasoup").types.Transport>} screen
20+
*/
21+
22+
/**
23+
* Wraps the FFMPEG process
24+
* TODO move in own file
25+
*/
26+
class FFMPEG extends EventEmitter {
27+
/** @type {child_process.ChildProcess} */
28+
_process;
29+
/** @type {string} */
30+
_filePath;
31+
32+
get _args() {
33+
return [
34+
"-loglevel",
35+
"debug", // TODO warning in prod
36+
"-protocol_whitelist",
37+
"pipe,udp,rtp",
38+
"-fflags",
39+
"+genpts",
40+
"-f",
41+
"sdp",
42+
"-i",
43+
"pipe:0",
44+
"-movflags",
45+
"frag_keyframe+empty_moov+default_base_moof", // fragmented
46+
"-c:v",
47+
"libx264", // vid codec
48+
"-c:a",
49+
"aac", // audio codec
50+
"-f",
51+
fileType,
52+
this._filePath,
53+
];
54+
}
55+
56+
/**
57+
* @param {string} filePath
58+
*/
59+
constructor(filePath) {
60+
super();
61+
this._filePath = filePath;
62+
}
63+
64+
/**
65+
* @param {String[]} [sdp]
66+
*/
67+
async spawn(sdp) {
68+
this._process = child_process.spawn("ffmpeg", this._args, {
69+
stdio: ["pipe", "pipe", process.stderr],
70+
});
71+
72+
if (!this._process.stdin.writable) {
73+
throw new Error("FFMPEG stdin not writable.");
74+
}
75+
this._process.stdin.write(sdp);
76+
this._process.stdin.end();
77+
78+
this._process.stdout.on("data", (chunk) => {
79+
this.emit("data", chunk); // Emit data chunks as they become available
80+
// may need to ues this to pipe to request if file stream does not work
81+
});
82+
83+
this._process.on("close", (code) => {
84+
if (code === 0) {
85+
this.emit("success");
86+
}
87+
});
88+
89+
logger.debug(
90+
`FFMPEG process (pid:${this._process.pid}) spawned, outputting to ${this._filePath}`
91+
);
92+
}
93+
94+
kill() {
95+
this._process?.kill("SIGINT");
96+
}
97+
}
98+
99+
export class Recorder extends EventEmitter {
100+
static records = new Map();
101+
102+
/** @type {string} */
103+
uuid = crypto.randomUUID();
104+
/** @type {import("#src/models/channel").Channel} */
105+
channel;
106+
/** @type {string} */
107+
state;
108+
ffmpeg;
109+
/** @type {RTPTransports} */
110+
_rtpTransports;
111+
/** @type {string} */
112+
filePath;
113+
/**
114+
* @param {import("#src/models/channel").Channel} channel
115+
*/
116+
constructor(channel) {
117+
super();
118+
this.channel = channel;
119+
this.filePath = path.join(temp, `${this.uuid}.${fileType}`);
120+
Recorder.records.set(this.uuid, this);
121+
}
122+
123+
/** @returns {number} */
124+
get videoCount() {
125+
return this._rtpTransports.camera.length + this._rtpTransports.screen.length;
126+
}
127+
128+
/**
129+
* @param {Array} ids
130+
* @returns {string} filePath
131+
*/
132+
start(ids) {
133+
// maybe internal state and check if already recording (recording = has ffmpeg child process).
134+
this.stop();
135+
for (const id of ids) {
136+
const session = this.channel.sessions.get(id);
137+
const audioRtp = this._createRtp(
138+
session.producers[STREAM_TYPE.AUDIO],
139+
STREAM_TYPE.AUDIO
140+
);
141+
audioRtp && this._rtpTransports.audio.push(audioRtp);
142+
for (const type in [STREAM_TYPE.CAMERA, STREAM_TYPE.SCREEN]) {
143+
if (this.videoCount < VIDEO_LIMIT) {
144+
const rtp = this._createRtp(session.producers[type], type);
145+
rtp && this._rtpTransports[type].push(rtp);
146+
}
147+
}
148+
}
149+
this.ffmpeg = new FFMPEG(this.filePath);
150+
this.ffmpeg.spawn(); // args should be base on the rtp transports
151+
this.ffmpeg.once("success", () => {
152+
this.emit("download-ready", this.filePath);
153+
});
154+
return this.filePath;
155+
}
156+
pause() {
157+
// TODO maybe shouldn't be able to pause
158+
}
159+
stop() {
160+
// TODO
161+
// cleanup all rtp transports
162+
// stop ffmpeg process
163+
Recorder.records.delete(this.uuid);
164+
}
165+
166+
/**
167+
* @param {http.ServerResponse} res
168+
*/
169+
pipeToResponse(res) {
170+
// TODO check if this can be executed, otherwise end request, or throw error
171+
const fileStream = fs.createReadStream(this._filePath); // may need to be explicitly closed?
172+
res.writeHead(200, {
173+
"Content-Type": `video/${fileType}`,
174+
"Content-Disposition": "inline",
175+
});
176+
fileStream.pipe(res); // Pipe the file stream to the response
177+
}
178+
179+
/**
180+
* @param {import("mediasoup").types.Producer} producer
181+
* @param {STREAM_TYPE[keyof STREAM_TYPE]} type
182+
* @return {Promise<void>} probably just create transport with right ports and return that,
183+
*/
184+
async _createRtp(producer, type) {
185+
// TODO
186+
}
187+
}

src/services/http.js

+46-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import * as config from "#src/config.js";
66
import { Logger, parseBody, extractRequestInfo } from "#src/utils/utils.js";
77
import { SESSION_CLOSE_CODE } from "#src/models/session.js";
88
import { Channel } from "#src/models/channel.js";
9+
import { Recorder } from "#src/models/recorder.js";
910

1011
/**
1112
* @typedef {function} routeCallback
@@ -15,6 +16,7 @@ import { Channel } from "#src/models/channel.js";
1516
* @param {string} param2.remoteAddress
1617
* @param {string} param2.protocol
1718
* @param {string} param2.host
19+
* @param {Object} param2.match name/value mapping of route variables
1820
* @param {URLSearchParams} param2.searchParams
1921
* @return {http.ServerResponse}
2022
*/
@@ -77,6 +79,19 @@ export async function start({ httpInterface = config.HTTP_INTERFACE, port = conf
7779
return res.end();
7880
},
7981
});
82+
routeListener.get(`/v${API_VERSION}/recording/<token>`, {
83+
callback: async (req, res, { remoteAddress, match }) => {
84+
try {
85+
const { token } = match;
86+
logger.info(`[${remoteAddress}]: requested recording ${token}`);
87+
Recorder.records.get(token)?.pipeToResponse(res);
88+
// res not ended as we are streaming
89+
} catch (error) {
90+
logger.error(`[${remoteAddress}] failed to obtain recording: ${error.message}`);
91+
return res.end();
92+
}
93+
},
94+
});
8095
routeListener.post(`/v${API_VERSION}/disconnect`, {
8196
callback: async (req, res, { remoteAddress }) => {
8297
try {
@@ -183,7 +198,8 @@ class RouteListener {
183198
break;
184199
}
185200
for (const [pattern, options] of registeredRoutes) {
186-
if (pathname === pattern) {
201+
const match = this._extractPattern(pathname, pattern);
202+
if (match) {
187203
if (options?.cors) {
188204
res.setHeader("Access-Control-Allow-Origin", options.cors);
189205
res.setHeader("Access-Control-Allow-Methods", options.methods);
@@ -195,6 +211,7 @@ class RouteListener {
195211
host,
196212
protocol,
197213
remoteAddress,
214+
match,
198215
searchParams,
199216
});
200217
} catch (error) {
@@ -212,4 +229,32 @@ class RouteListener {
212229
}
213230
return res.end();
214231
}
232+
233+
/**
234+
* Matches a pathname against a pattern with named parameters.
235+
* @param {string} pathname - The URL path requested, e.g., "/channel/6/person/42/"
236+
* @param {string} pattern - The pattern to match, e.g., "/channel/<channelId>/session/<sessionId>"
237+
* @returns {object|undefined} - Returns undefined if no match. If matched, returns an object mapping keys to values,
238+
* the object is empty if matching a pattern with no variables.
239+
* eg: { channelId: "6", sessionId: "42" } | {} | undefined
240+
*/
241+
_extractPattern(pathname, pattern) {
242+
pathname = pathname.replace(/\/+$/, "");
243+
pattern = pattern.replace(/\/+$/, "");
244+
const paramNames = [];
245+
const regexPattern = pattern.replace(/<([^>]+)>/g, (_, paramName) => {
246+
paramNames.push(paramName);
247+
return "([^/]+)";
248+
});
249+
const regex = new RegExp(`^${regexPattern}$`);
250+
const match = pathname.match(regex);
251+
if (!match) {
252+
return;
253+
}
254+
const params = {};
255+
paramNames.forEach((name, index) => {
256+
params[name] = match[index + 1];
257+
});
258+
return params;
259+
}
215260
}

src/shared/enums.js

+6
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,12 @@ export const WS_CLOSE_CODE = {
1212
CHANNEL_FULL: 4109,
1313
};
1414

15+
export const STREAM_TYPE = {
16+
AUDIO: "audio",
17+
CAMERA: "camera",
18+
SCREEN: "screen",
19+
};
20+
1521
export const SERVER_REQUEST = {
1622
/** Requests the creation of a consumer that is used to forward a track to the client */
1723
INIT_CONSUMER: "INIT_CONSUMER",

0 commit comments

Comments
 (0)