From 6735f816fff9a247d62c010e74e89d202a84a2c6 Mon Sep 17 00:00:00 2001 From: Long Nguyen Date: Tue, 28 Jan 2025 00:24:50 +0700 Subject: [PATCH] Add `readrateInitialBurst` option (#140) * Add `sync` property to disable/enable syncing * Expose `noSleep` property * Add `readrateInitialBurst` property * Add logging for sync enable/disable * Add some stats to sleep logger * Rearrange stuff to remove reinit hack * More reordering * Simplification --- src/media/BaseMediaStream.ts | 58 +++++++++++++++++++++++++++++------- src/media/newApi.ts | 27 +++++++++++++++++ 2 files changed, 74 insertions(+), 11 deletions(-) diff --git a/src/media/BaseMediaStream.ts b/src/media/BaseMediaStream.ts index 9e981f5..d06e90e 100644 --- a/src/media/BaseMediaStream.ts +++ b/src/media/BaseMediaStream.ts @@ -14,6 +14,7 @@ export class BaseMediaStream extends Writable { private _noSleep: boolean; private _startTime?: number; private _startPts?: number; + private _sync = true; public syncStream?: BaseMediaStream; constructor(type: string, noSleep = false) { @@ -23,6 +24,25 @@ export class BaseMediaStream extends Writable { this._loggerSleep = new Log(`stream:${type}:sleep`); this._noSleep = noSleep; } + + get sync(): boolean { + return this._sync; + } + set sync(val: boolean) { + this._sync = val; + if (val) + this._loggerSync.debug("Sync enabled"); + else + this._loggerSync.debug("Sync disabled"); + } + get noSleep(): boolean { + return this._noSleep; + } + set noSleep(val: boolean) { + this._noSleep = val; + if (!val) + this._startPts = this._startTime = undefined; + } get pts(): number | undefined { return this._pts; } @@ -38,7 +58,7 @@ export class BaseMediaStream extends Writable { { let i = 0; while ( - this.syncStream && + this.sync && this.syncStream && !this.syncStream.writableEnded && this.syncStream.pts !== undefined && this._pts !== undefined && @@ -60,23 +80,22 @@ export class BaseMediaStream extends Writable { throw new Error("Not implemented"); } async _write(frame: Packet, _: BufferEncoding, callback: (error?: Error | null) => void) { - if (this._startTime === undefined) - this._startTime = performance.now(); + const start_write = performance.now(); await this._waitForOtherStream(); const { data, ptshi, pts, durationhi, duration, time_base_num, time_base_den } = frame; // biome-ignore lint/style/noNonNullAssertion: this will never happen with our media stream const frametime = combineLoHi(durationhi!, duration!) / time_base_den! * time_base_num! * 1000; - const start = performance.now(); + const start_sendFrame = performance.now(); await this._sendFrame(Buffer.from(data), frametime); - const end = performance.now(); + const end_sendFrame = performance.now(); + // biome-ignore lint/style/noNonNullAssertion: this will never happen with our media stream this._pts = combineLoHi(ptshi!, pts!) / time_base_den! * time_base_num! * 1000; - if (this._startPts === undefined) - this._startPts = this._pts; + this.emit("pts", this._pts); - const sendTime = end - start; + const sendTime = end_sendFrame - start_sendFrame; const ratio = sendTime / frametime; this._loggerSend.debug({ stats: { @@ -94,13 +113,30 @@ export class BaseMediaStream extends Writable { frametime }, `Frame takes too long to send (${(ratio * 100).toFixed(2)}% frametime)`) } - const now = performance.now(); - const sleep = Math.max(0, this._pts - this._startPts + frametime - (now - this._startTime)); - this._loggerSleep.debug(`Sleeping for ${sleep}ms`); + + const end_write = performance.now(); + this._startTime ??= start_write; + this._startPts ??= this._pts; if (this._noSleep) + { callback(null); + } else + { + const sleep = Math.max( + 0, this._pts - this._startPts + frametime - (end_write - this._startTime) + ); + this._loggerSleep.debug({ + stats: { + pts: this._pts, + startPts: this._startPts, + time: end_write, + startTime: this._startTime, + frametime + } + }, `Sleeping for ${sleep}ms`); setTimeout(sleep).then(() => callback(null)); + } } _destroy(error: Error | null, callback: (error?: Error | null) => void): void { super._destroy(error, callback); diff --git a/src/media/newApi.ts b/src/media/newApi.ts index c1d2a8b..1ce6f47 100644 --- a/src/media/newApi.ts +++ b/src/media/newApi.ts @@ -286,6 +286,12 @@ export type PlayStreamOptions = { */ frameRate: number, + /** + * Same as ffmpeg's `readrate_initial_burst` command line flag + * See https://ffmpeg.org/ffmpeg.html#:~:text=%2Dreadrate_initial_burst + */ + readrateInitialBurst: number | undefined, + /** * Enable RTCP Sender Report for synchronization */ @@ -319,6 +325,7 @@ export async function playStream( width: video.width, height: video.height, frameRate: video.framerate_num / video.framerate_den, + readrateInitialBurst: undefined, rtcpSenderReportEnabled: true, forceChacha20Encryption: false } satisfies PlayStreamOptions; @@ -345,6 +352,11 @@ export async function playStream( : defaultOptions.frameRate ), + readrateInitialBurst: + isFiniteNonZero(opts.readrateInitialBurst) && opts.readrateInitialBurst > 0 + ? opts.readrateInitialBurst + : defaultOptions.readrateInitialBurst, + rtcpSenderReportEnabled: opts.rtcpSenderReportEnabled ?? defaultOptions.rtcpSenderReportEnabled, @@ -389,6 +401,21 @@ export async function playStream( audio.stream.pipe(aStream); vStream.syncStream = aStream; aStream.syncStream = vStream; + + const burstTime = mergedOptions.readrateInitialBurst; + if (typeof burstTime === "number") + { + vStream.sync = aStream.sync = false; + vStream.noSleep = aStream.noSleep = true; + const stopBurst = (pts: number) => { + if (pts < burstTime * 1000) + return; + vStream.sync = aStream.sync = true; + vStream.noSleep = aStream.noSleep = false; + vStream.off("pts", stopBurst); + } + vStream.on("pts", stopBurst); + } } return new Promise((resolve) => { vStream.once("finish", () => {