Skip to content

Commit

Permalink
Add readrateInitialBurst option (#140)
Browse files Browse the repository at this point in the history
* Add `sync` property to disable/enable syncing

* Expose `noSleep` property

* Add `readrateInitialBurst` property

* Add logging for sync enable/disable

* Add some stats to sleep logger

* Rearrange stuff to remove reinit hack

* More reordering

* Simplification
  • Loading branch information
longnguyen2004 authored Jan 27, 2025
1 parent f679531 commit 6735f81
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 11 deletions.
58 changes: 47 additions & 11 deletions src/media/BaseMediaStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export class BaseMediaStream extends Writable {
private _noSleep: boolean;
private _startTime?: number;
private _startPts?: number;
private _sync = true;

public syncStream?: BaseMediaStream;
constructor(type: string, noSleep = false) {
Expand All @@ -23,6 +24,25 @@ export class BaseMediaStream extends Writable {
this._loggerSleep = new Log(`stream:${type}:sleep`);
this._noSleep = noSleep;
}

get sync(): boolean {
return this._sync;
}
set sync(val: boolean) {
this._sync = val;
if (val)
this._loggerSync.debug("Sync enabled");
else
this._loggerSync.debug("Sync disabled");
}
get noSleep(): boolean {
return this._noSleep;
}
set noSleep(val: boolean) {
this._noSleep = val;
if (!val)
this._startPts = this._startTime = undefined;
}
get pts(): number | undefined {
return this._pts;
}
Expand All @@ -38,7 +58,7 @@ export class BaseMediaStream extends Writable {
{
let i = 0;
while (
this.syncStream &&
this.sync && this.syncStream &&
!this.syncStream.writableEnded &&
this.syncStream.pts !== undefined &&
this._pts !== undefined &&
Expand All @@ -60,23 +80,22 @@ export class BaseMediaStream extends Writable {
throw new Error("Not implemented");
}
async _write(frame: Packet, _: BufferEncoding, callback: (error?: Error | null) => void) {
if (this._startTime === undefined)
this._startTime = performance.now();
const start_write = performance.now();
await this._waitForOtherStream();

const { data, ptshi, pts, durationhi, duration, time_base_num, time_base_den } = frame;
// biome-ignore lint/style/noNonNullAssertion: this will never happen with our media stream
const frametime = combineLoHi(durationhi!, duration!) / time_base_den! * time_base_num! * 1000;

const start = performance.now();
const start_sendFrame = performance.now();
await this._sendFrame(Buffer.from(data), frametime);
const end = performance.now();
const end_sendFrame = performance.now();

// biome-ignore lint/style/noNonNullAssertion: this will never happen with our media stream
this._pts = combineLoHi(ptshi!, pts!) / time_base_den! * time_base_num! * 1000;
if (this._startPts === undefined)
this._startPts = this._pts;
this.emit("pts", this._pts);

const sendTime = end - start;
const sendTime = end_sendFrame - start_sendFrame;
const ratio = sendTime / frametime;
this._loggerSend.debug({
stats: {
Expand All @@ -94,13 +113,30 @@ export class BaseMediaStream extends Writable {
frametime
}, `Frame takes too long to send (${(ratio * 100).toFixed(2)}% frametime)`)
}
const now = performance.now();
const sleep = Math.max(0, this._pts - this._startPts + frametime - (now - this._startTime));
this._loggerSleep.debug(`Sleeping for ${sleep}ms`);

const end_write = performance.now();
this._startTime ??= start_write;
this._startPts ??= this._pts;
if (this._noSleep)
{
callback(null);
}
else
{
const sleep = Math.max(
0, this._pts - this._startPts + frametime - (end_write - this._startTime)
);
this._loggerSleep.debug({
stats: {
pts: this._pts,
startPts: this._startPts,
time: end_write,
startTime: this._startTime,
frametime
}
}, `Sleeping for ${sleep}ms`);
setTimeout(sleep).then(() => callback(null));
}
}
_destroy(error: Error | null, callback: (error?: Error | null) => void): void {
super._destroy(error, callback);
Expand Down
27 changes: 27 additions & 0 deletions src/media/newApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,12 @@ export type PlayStreamOptions = {
*/
frameRate: number,

/**
* Same as ffmpeg's `readrate_initial_burst` command line flag
* See https://ffmpeg.org/ffmpeg.html#:~:text=%2Dreadrate_initial_burst
*/
readrateInitialBurst: number | undefined,

/**
* Enable RTCP Sender Report for synchronization
*/
Expand Down Expand Up @@ -319,6 +325,7 @@ export async function playStream(
width: video.width,
height: video.height,
frameRate: video.framerate_num / video.framerate_den,
readrateInitialBurst: undefined,
rtcpSenderReportEnabled: true,
forceChacha20Encryption: false
} satisfies PlayStreamOptions;
Expand All @@ -345,6 +352,11 @@ export async function playStream(
: defaultOptions.frameRate
),

readrateInitialBurst:
isFiniteNonZero(opts.readrateInitialBurst) && opts.readrateInitialBurst > 0
? opts.readrateInitialBurst
: defaultOptions.readrateInitialBurst,

rtcpSenderReportEnabled:
opts.rtcpSenderReportEnabled ?? defaultOptions.rtcpSenderReportEnabled,

Expand Down Expand Up @@ -389,6 +401,21 @@ export async function playStream(
audio.stream.pipe(aStream);
vStream.syncStream = aStream;
aStream.syncStream = vStream;

const burstTime = mergedOptions.readrateInitialBurst;
if (typeof burstTime === "number")
{
vStream.sync = aStream.sync = false;
vStream.noSleep = aStream.noSleep = true;
const stopBurst = (pts: number) => {
if (pts < burstTime * 1000)
return;
vStream.sync = aStream.sync = true;
vStream.noSleep = aStream.noSleep = false;
vStream.off("pts", stopBurst);
}
vStream.on("pts", stopBurst);
}
}
return new Promise<void>((resolve) => {
vStream.once("finish", () => {
Expand Down

0 comments on commit 6735f81

Please sign in to comment.