diff --git a/.vscode/settings.json b/.vscode/settings.json index a98a69e1f..16a0e2d14 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -58,10 +58,13 @@ "tinyh", "transferables", "tsbuildinfo", + "unconfigured", "undici", "versioncode", "webadb", "webcodecs", + "webglcontextlost", + "webglcontextrestored", "webm", "websockify", "webusb", diff --git a/libraries/adb-credential-web/src/storage/password.ts b/libraries/adb-credential-web/src/storage/password.ts index 2b4af54a2..e41d9f754 100644 --- a/libraries/adb-credential-web/src/storage/password.ts +++ b/libraries/adb-credential-web/src/storage/password.ts @@ -1,5 +1,5 @@ import type { MaybeError } from "@yume-chan/adb"; -import { encodeUtf8 } from "@yume-chan/adb"; +import { encodeUtf8, toLocalUint8Array } from "@yume-chan/adb"; import type { MaybePromiseLike } from "@yume-chan/async"; import { buffer, @@ -81,7 +81,7 @@ export class TangoPasswordProtectedStorage implements TangoKeyStorage { } async save( - privateKey: Uint8Array, + privateKey: Uint8Array, name: string | undefined, ): Promise { const password = await this.#requestPassword("save", name); @@ -93,7 +93,7 @@ export class TangoPasswordProtectedStorage implements TangoKeyStorage { const encrypted = await crypto.subtle.encrypt( { name: "AES-GCM", iv }, aesKey, - privateKey, + toLocalUint8Array(privateKey), ); const bundle = Bundle.serialize({ diff --git a/libraries/adb-credential-web/src/storage/prf/storage.ts b/libraries/adb-credential-web/src/storage/prf/storage.ts index 8e5422b64..a3efeffe5 100644 --- a/libraries/adb-credential-web/src/storage/prf/storage.ts +++ b/libraries/adb-credential-web/src/storage/prf/storage.ts @@ -1,4 +1,4 @@ -import type { MaybeError } from "@yume-chan/adb"; +import { toLocalUint8Array, type MaybeError } from "@yume-chan/adb"; import { buffer, struct, @@ -85,7 +85,7 @@ export class TangoPrfStorage implements TangoKeyStorage { } async save( - privateKey: Uint8Array, + privateKey: Uint8Array, name: string | undefined, ): Promise { const prfInput = new Uint8Array(PrfInputLength); @@ -122,7 +122,7 @@ export class TangoPrfStorage implements TangoKeyStorage { const encrypted = await crypto.subtle.encrypt( { name: "AES-GCM", iv }, aesKey, - privateKey, + toLocalUint8Array(privateKey), ); const bundle = Bundle.serialize({ diff --git a/libraries/adb/src/commands/subprocess/none/pty.ts b/libraries/adb/src/commands/subprocess/none/pty.ts index 2aac4e7c0..b9b5b2868 100644 --- a/libraries/adb/src/commands/subprocess/none/pty.ts +++ b/libraries/adb/src/commands/subprocess/none/pty.ts @@ -7,9 +7,9 @@ import type { import { MaybeConsumable } from "@yume-chan/stream-extra"; import type { AdbSocket } from "../../../adb.js"; -import type { AdbPtyProcess } from "../pty.js"; +import type { AdbPty } from "../pty.js"; -export class AdbNoneProtocolPtyProcess implements AdbPtyProcess { +export class AdbNoneProtocolPty implements AdbPty { readonly #socket: AdbSocket; readonly #writer: WritableStreamDefaultWriter>; diff --git a/libraries/adb/src/commands/subprocess/none/service.ts b/libraries/adb/src/commands/subprocess/none/service.ts index a339089a8..e8d2d432c 100644 --- a/libraries/adb/src/commands/subprocess/none/service.ts +++ b/libraries/adb/src/commands/subprocess/none/service.ts @@ -1,7 +1,7 @@ import type { Adb } from "../../../adb.js"; import { AdbNoneProtocolProcessImpl } from "./process.js"; -import { AdbNoneProtocolPtyProcess } from "./pty.js"; +import { AdbNoneProtocolPty } from "./pty.js"; import { adbNoneProtocolSpawner } from "./spawner.js"; export class AdbNoneProtocolSubprocessService { @@ -42,7 +42,7 @@ export class AdbNoneProtocolSubprocessService { async pty( command?: string | readonly string[], - ): Promise { + ): Promise { let service = "shell:"; if (typeof command === "string") { @@ -52,8 +52,6 @@ export class AdbNoneProtocolSubprocessService { service += command.join(" "); } - return new AdbNoneProtocolPtyProcess( - await this.#adb.createSocket(service), - ); + return new AdbNoneProtocolPty(await this.#adb.createSocket(service)); } } diff --git a/libraries/adb/src/commands/subprocess/pty.ts b/libraries/adb/src/commands/subprocess/pty.ts index c297f4670..cf6ef01b5 100644 --- a/libraries/adb/src/commands/subprocess/pty.ts +++ b/libraries/adb/src/commands/subprocess/pty.ts @@ -5,7 +5,7 @@ import type { WritableStream, } from "@yume-chan/stream-extra"; -export interface AdbPtyProcess { +export interface AdbPty { get input(): WritableStream>; get output(): ReadableStream; get exited(): Promise; diff --git a/libraries/adb/src/commands/subprocess/shell/pty.ts b/libraries/adb/src/commands/subprocess/shell/pty.ts index 43ddaed3b..f076d5886 100644 --- a/libraries/adb/src/commands/subprocess/shell/pty.ts +++ b/libraries/adb/src/commands/subprocess/shell/pty.ts @@ -13,11 +13,11 @@ import { import { encodeUtf8 } from "@yume-chan/struct"; import type { AdbSocket } from "../../../adb.js"; -import type { AdbPtyProcess } from "../pty.js"; +import type { AdbPty } from "../pty.js"; import { AdbShellProtocolId, AdbShellProtocolPacket } from "./shared.js"; -export class AdbShellProtocolPtyProcess implements AdbPtyProcess { +export class AdbShellProtocolPty implements AdbPty { readonly #socket: AdbSocket; readonly #writer: WritableStreamDefaultWriter>; diff --git a/libraries/adb/src/commands/subprocess/shell/service.ts b/libraries/adb/src/commands/subprocess/shell/service.ts index 8e7209bdf..4aee8556a 100644 --- a/libraries/adb/src/commands/subprocess/shell/service.ts +++ b/libraries/adb/src/commands/subprocess/shell/service.ts @@ -2,7 +2,7 @@ import type { Adb } from "../../../adb.js"; import { AdbFeature } from "../../../features.js"; import { AdbShellProtocolProcessImpl } from "./process.js"; -import { AdbShellProtocolPtyProcess } from "./pty.js"; +import { AdbShellProtocolPty } from "./pty.js"; import { adbShellProtocolSpawner } from "./spawner.js"; export class AdbShellProtocolSubprocessService { @@ -39,7 +39,7 @@ export class AdbShellProtocolSubprocessService { async pty(options?: { command?: string | readonly string[] | undefined; terminalType?: string; - }): Promise { + }): Promise { const { command, terminalType } = options ?? {}; let service = "shell,v2,pty"; @@ -59,8 +59,6 @@ export class AdbShellProtocolSubprocessService { service += command.join(" "); } - return new AdbShellProtocolPtyProcess( - await this.#adb.createSocket(service), - ); + return new AdbShellProtocolPty(await this.#adb.createSocket(service)); } } diff --git a/libraries/adb/src/daemon/dispatcher.ts b/libraries/adb/src/daemon/dispatcher.ts index 26898be80..6dfbd5725 100644 --- a/libraries/adb/src/daemon/dispatcher.ts +++ b/libraries/adb/src/daemon/dispatcher.ts @@ -146,13 +146,20 @@ export class AdbPacketDispatcher implements Closeable { this.#handleOkay(packet); break; case AdbCommand.Open: - await this.#handleOpen(packet); + // Don't await + // The handler may take a long time to accept the socket, + // don't block other sockets' packet processing. + this.#handleOpen(packet).catch((e) => { + // Propagate fatal errors to consumer + controller.error(e); + }); break; case AdbCommand.Write: - // Don't await - let each socket handle its own backpressure - // without blocking other sockets' packet processing. - // Fatal errors are propagated via WritableStream's controller. + // Don't await + // The socket might be stalled because of backpressure, + // don't block other sockets' packet processing. this.#handleWrite(packet).catch((e) => { + // Propagate fatal errors to consumer controller.error(e); }); break; diff --git a/libraries/android-bin/src/bug-report.ts b/libraries/android-bin/src/bug-report.ts index efcb31f0c..6bde2a4d6 100644 --- a/libraries/android-bin/src/bug-report.ts +++ b/libraries/android-bin/src/bug-report.ts @@ -258,10 +258,10 @@ export class BugReport extends AdbServiceBase { */ bugReportZStream(): ReadableStream { return new PushReadableStream(async (controller) => { - const process = await this.adb.subprocess.shellProtocol!.spawn([ - "bugreportz", - "-s", - ]); + const process = await this.adb.subprocess.shellProtocol!.spawn( + ["bugreportz", "-s"], + controller.abortSignal, + ); process.stdout .pipeTo( new WritableStream({ diff --git a/libraries/media-codec/src/index.ts b/libraries/media-codec/src/index.ts index 5a161e5b7..fb7e7c4bc 100644 --- a/libraries/media-codec/src/index.ts +++ b/libraries/media-codec/src/index.ts @@ -1,4 +1,5 @@ export * from "./av1.js"; +export * from "./format.js"; export * as H264 from "./h264.js"; export * as H265 from "./h265.js"; export * from "./nalu.js"; diff --git a/libraries/scrcpy-decoder-tinyh264/src/decoder.ts b/libraries/scrcpy-decoder-tinyh264/src/decoder.ts index cf604dbed..bce3641d0 100644 --- a/libraries/scrcpy-decoder-tinyh264/src/decoder.ts +++ b/libraries/scrcpy-decoder-tinyh264/src/decoder.ts @@ -1,9 +1,6 @@ import { PromiseResolver } from "@yume-chan/async"; import { H264 } from "@yume-chan/media-codec"; -import type { - ScrcpyMediaStreamConfigurationPacket, - ScrcpyMediaStreamPacket, -} from "@yume-chan/scrcpy"; +import type { ScrcpyMediaStreamConfigurationPacket } from "@yume-chan/scrcpy"; import { AndroidAvcLevel, AndroidAvcProfile, @@ -17,13 +14,16 @@ import type { ScrcpyVideoDecoder, ScrcpyVideoDecoderCapability, } from "./types.js"; -import { createCanvas, glIsSupported } from "./utils/index.js"; -import { PauseControllerImpl } from "./utils/pause.js"; -import { PerformanceCounterImpl } from "./utils/performance.js"; +import { + createCanvas, + glIsSupported, + PauseController, + PerformanceCounter, +} from "./utils/index.js"; import type { TinyH264Wrapper } from "./wrapper.js"; import { createTinyH264Wrapper } from "./wrapper.js"; -const noop = () => { +export const noop = () => { // no-op }; @@ -41,6 +41,15 @@ export class TinyH264Decoder implements ScrcpyVideoDecoder { return this.#canvas; } + #pause = new PauseController(); + get paused() { + return this.#pause.paused; + } + + get writable() { + return this.#pause.writable; + } + #size = new ScrcpyVideoSizeImpl(); get width() { return this.#size.width; @@ -52,25 +61,32 @@ export class TinyH264Decoder implements ScrcpyVideoDecoder { return this.#size.sizeChanged; } - #counter = new PerformanceCounterImpl(); - get framesDrawn() { - return this.#counter.framesDrawn; + #counter = new PerformanceCounter(); + /** + * Gets the number of frames that have been drawn on the renderer. + */ + get framesRendered() { + return this.#counter.framesRendered; } + /** + * Gets the number of frames that's visible to the user. + * + * Multiple frames might be rendered during one vertical sync interval, + * but only the last of them is represented to the user. + * This costs some performance but reduces latency by 1 frame. + * + * Might be `0` if the renderer is in a nested Web Worker on Chrome due to a Chrome bug. + * https://issues.chromium.org/issues/41483010 + */ get framesPresented() { return this.#counter.framesPresented; } - get framesSkipped() { - return this.#counter.framesSkipped; - } - - #pause: PauseControllerImpl; - get paused() { - return this.#pause.paused; - } - - #writable: WritableStream; - get writable() { - return this.#writable; + /** + * Gets the number of frames that wasn't drawn on the renderer + * because the renderer can't keep up + */ + get framesSkippedRendering() { + return this.#counter.framesSkippedRendering; } #renderer: YuvCanvas | undefined; @@ -95,29 +111,31 @@ export class TinyH264Decoder implements ScrcpyVideoDecoder { }), }); - this.#pause = new PauseControllerImpl( - this.#configure, - async (packet) => { - if (!this.#decoder) { - throw new Error("Decoder not configured"); - } - - // TinyH264 decoder doesn't support associating metadata - // with each frame's input/output - // so skipping frames when resuming from pause is not supported - - const decoder = await this.#decoder; - - // `packet.data` might be from a `BufferCombiner` so we have to copy it using `slice` - decoder.feed(packet.data.slice().buffer); - }, - ); - - this.#writable = new WritableStream({ - write: this.#pause.write, - // Nothing can be disposed when the stream is aborted/closed - // No new frames will arrive, but some frames might still be decoding and/or rendering - }); + void this.#pause.readable + .pipeTo( + new WritableStream({ + write: async (packet) => { + if (packet.type === "configuration") { + await this.#configure(packet); + return; + } + + if (!this.#decoder) { + throw new Error("Decoder not configured"); + } + + // TinyH264 decoder doesn't support associating metadata + // with each frame's input/output + // so skipping frames when resuming from pause is not supported + + const decoder = await this.#decoder; + + // `packet.data` might be from a `BufferCombiner` so we have to copy it using `slice` + decoder.feed(packet.data.slice().buffer); + }, + }), + ) + .catch(noop); } #configure = async ({ @@ -164,6 +182,9 @@ export class TinyH264Decoder implements ScrcpyVideoDecoder { const uPlaneOffset = encodedWidth * encodedHeight; const vPlaneOffset = uPlaneOffset + chromaWidth * chromaHeight; decoder.onPictureReady(({ data }) => { + // TinyH264 doesn't pass any frame metadata to `onPictureReady` + // so frames marked as skipped (by pause controller) can't be skipped + const array = new Uint8Array(data); const frame = YuvBuffer.frame( format, @@ -184,7 +205,7 @@ export class TinyH264Decoder implements ScrcpyVideoDecoder { // Can't know if yuv-canvas is dropping frames or not this.#renderer!.drawFrame(frame); - this.#counter.increaseFramesDrawn(); + this.#counter.increaseFramesRendered(); }); decoder.feed(data.slice().buffer); @@ -199,8 +220,8 @@ export class TinyH264Decoder implements ScrcpyVideoDecoder { this.#pause.pause(); } - resume(): Promise { - return this.#pause.resume(); + resume(): undefined { + this.#pause.resume(); } trackDocumentVisibility(document: Document): () => undefined { diff --git a/libraries/scrcpy-decoder-tinyh264/src/types.ts b/libraries/scrcpy-decoder-tinyh264/src/types.ts index a51598e76..dfb4aad05 100644 --- a/libraries/scrcpy-decoder-tinyh264/src/types.ts +++ b/libraries/scrcpy-decoder-tinyh264/src/types.ts @@ -1,3 +1,4 @@ +import type { MaybePromiseLike } from "@yume-chan/async"; import type { Disposable } from "@yume-chan/event"; import type { ScrcpyMediaStreamPacket, @@ -13,11 +14,15 @@ export interface ScrcpyVideoDecoderCapability { export interface ScrcpyVideoDecoderPerformanceCounter { /** - * Gets the number of frames that have been drawn on the renderer + * Gets the number of frames that have been drawn on the renderer. */ - readonly framesDrawn: number; + readonly framesRendered: number; /** - * Gets the number of frames that's visible to the user + * Gets the number of frames that's visible to the user. + * + * Multiple frames might be rendered during one vertical sync interval, + * but only the last of them is represented to the user. + * This costs some performance but reduces latency by 1 frame. * * Might be `0` if the renderer is in a nested Web Worker on Chrome due to a Chrome bug. * https://issues.chromium.org/issues/41483010 @@ -27,7 +32,7 @@ export interface ScrcpyVideoDecoderPerformanceCounter { * Gets the number of frames that wasn't drawn on the renderer * because the renderer can't keep up */ - readonly framesSkipped: number; + readonly framesSkippedRendering: number; } export interface ScrcpyVideoDecoderPauseController { @@ -44,7 +49,7 @@ export interface ScrcpyVideoDecoderPauseController { /** * Resume the decoder if it was paused. */ - resume(): Promise; + resume(): MaybePromiseLike; /** * Pause the decoder when the document becomes invisible, diff --git a/libraries/scrcpy-decoder-tinyh264/src/utils/pause.ts b/libraries/scrcpy-decoder-tinyh264/src/utils/pause.ts index d661da461..9440adbc6 100644 --- a/libraries/scrcpy-decoder-tinyh264/src/utils/pause.ts +++ b/libraries/scrcpy-decoder-tinyh264/src/utils/pause.ts @@ -1,28 +1,26 @@ -import type { MaybePromiseLike } from "@yume-chan/async"; import { PromiseResolver } from "@yume-chan/async"; import type { ScrcpyMediaStreamConfigurationPacket, ScrcpyMediaStreamDataPacket, ScrcpyMediaStreamPacket, } from "@yume-chan/scrcpy"; +import type { TransformStreamDefaultController } from "@yume-chan/stream-extra"; +import { TransformStream } from "@yume-chan/stream-extra"; import type { ScrcpyVideoDecoderPauseController } from "../types.js"; -export class PauseControllerImpl implements ScrcpyVideoDecoderPauseController { +export class PauseController + extends TransformStream + implements ScrcpyVideoDecoderPauseController +{ + #controller: TransformStreamDefaultController; + #paused = false; #pausedExplicitly = false; get paused() { return this.#paused; } - #onConfiguration: ( - packet: ScrcpyMediaStreamConfigurationPacket, - ) => MaybePromiseLike; - #onFrame: ( - packet: ScrcpyMediaStreamDataPacket, - skipRendering: boolean, - ) => MaybePromiseLike; - /** * Store incoming configuration change when paused, * to recreate the decoder on resume @@ -49,58 +47,59 @@ export class PauseControllerImpl implements ScrcpyVideoDecoderPauseController { #disposed = false; - constructor( - onConfiguration: ( - packet: ScrcpyMediaStreamConfigurationPacket, - ) => MaybePromiseLike, - onFrame: ( - packet: ScrcpyMediaStreamDataPacket, - skipRendering: boolean, - ) => MaybePromiseLike, - ) { - this.#onConfiguration = onConfiguration; - this.#onFrame = onFrame; - } - - write = async (packet: ScrcpyMediaStreamPacket): Promise => { - if (this.#disposed) { - throw new Error("Attempt to write to a closed decoder"); - } - - if (this.#paused) { - switch (packet.type) { - case "configuration": - this.#pendingConfiguration = packet; - this.#pendingFrames.length = 0; - break; - case "data": - if (packet.keyframe) { - this.#pendingFrames.length = 0; + constructor() { + let controller!: TransformStreamDefaultController; + + super({ + start: (controller_) => { + controller = controller_; + }, + transform: async (packet, controller) => { + if (this.#disposed) { + return; + } + + if (this.#paused) { + switch (packet.type) { + case "configuration": + this.#pendingConfiguration = packet; + this.#pendingFrames.length = 0; + break; + case "data": + if (packet.keyframe) { + this.#pendingFrames.length = 0; + } + // Generally there won't be too many non-key frames + // (because that's bad for video quality), + // Also all frames are required for proper decoding + this.#pendingFrames.push(packet); + break; } - // Generally there won't be too many non-key frames - // (because that's bad for video quality), - // Also all frames are required for proper decoding - this.#pendingFrames.push(packet); - break; - } - return; - } - - await this.#resuming; - - if (this.#disposed) { - return; - } - - switch (packet.type) { - case "configuration": - await this.#onConfiguration(packet); - break; - case "data": - await this.#onFrame(packet, false); - break; - } - }; + return; + } + + await this.#resuming; + + if (this.#disposed) { + return; + } + + switch (packet.type) { + case "configuration": + controller.enqueue(packet); + break; + case "data": + controller.enqueue({ + ...packet, + skipRendering: false, + }); + break; + } + }, + }); + + this.#controller = controller; + } #pauseInternal(explicitly: boolean): void { if (this.#disposed) { @@ -117,7 +116,7 @@ export class PauseControllerImpl implements ScrcpyVideoDecoderPauseController { this.#pauseInternal(true); } - async #resumeInternal(explicitly: boolean): Promise { + #resumeInternal(explicitly: boolean): undefined { if (this.#disposed) { throw new Error("Attempt to resume a closed decoder"); } @@ -139,7 +138,7 @@ export class PauseControllerImpl implements ScrcpyVideoDecoderPauseController { this.#pausedExplicitly = false; if (this.#pendingConfiguration) { - await this.#onConfiguration(this.#pendingConfiguration); + this.#controller.enqueue(this.#pendingConfiguration); this.#pendingConfiguration = undefined; if (this.#disposed) { @@ -147,20 +146,17 @@ export class PauseControllerImpl implements ScrcpyVideoDecoderPauseController { } } - for ( - let i = 0, length = this.#pendingFrames.length; - i < length; - i += 1 - ) { - const frame = this.#pendingFrames[i]!; + // `#pendingFrames` won't change during iteration + // because it can only change when `#paused` is `true`, + // but the code above already sets that to `false` + for (const [index, frame] of this.#pendingFrames.entries()) { // All pending frames except the last one don't need to be rendered // because they are decoded in quick succession by the decoder // and won't be visible - await this.#onFrame(frame, i !== length - 1); - - if (this.#disposed) { - return; - } + this.#controller.enqueue({ + ...frame, + skipRendering: index !== this.#pendingFrames.length - 1, + }); } this.#pendingFrames.length = 0; @@ -169,8 +165,8 @@ export class PauseControllerImpl implements ScrcpyVideoDecoderPauseController { this.#resuming = undefined; } - resume(): Promise { - return this.#resumeInternal(true); + resume(): undefined { + this.#resumeInternal(true); } #disposeVisibilityTracker: (() => undefined) | undefined; @@ -219,6 +215,7 @@ export class PauseControllerImpl implements ScrcpyVideoDecoderPauseController { } this.#disposed = true; + this.#controller.terminate(); this.#pendingConfiguration = undefined; this.#pendingFrames.length = 0; @@ -226,3 +223,10 @@ export class PauseControllerImpl implements ScrcpyVideoDecoderPauseController { this.#disposeVisibilityTracker?.(); } } + +export namespace PauseController { + export type Input = ScrcpyMediaStreamPacket; + export type Output = + | ScrcpyMediaStreamConfigurationPacket + | (ScrcpyMediaStreamDataPacket & { skipRendering: boolean }); +} diff --git a/libraries/scrcpy-decoder-tinyh264/src/utils/performance.ts b/libraries/scrcpy-decoder-tinyh264/src/utils/performance.ts index 1c1323b1a..cf897439f 100644 --- a/libraries/scrcpy-decoder-tinyh264/src/utils/performance.ts +++ b/libraries/scrcpy-decoder-tinyh264/src/utils/performance.ts @@ -1,8 +1,8 @@ import type { ScrcpyVideoDecoderPerformanceCounter } from "../types.js"; -export class PerformanceCounterImpl implements ScrcpyVideoDecoderPerformanceCounter { +export class PerformanceCounter implements ScrcpyVideoDecoderPerformanceCounter { #framesDrawn = 0; - get framesDrawn() { + get framesRendered() { return this.#framesDrawn; } @@ -12,7 +12,7 @@ export class PerformanceCounterImpl implements ScrcpyVideoDecoderPerformanceCoun } #framesSkipped = 0; - get framesSkipped() { + get framesSkippedRendering() { return this.#framesSkipped; } @@ -51,7 +51,7 @@ export class PerformanceCounterImpl implements ScrcpyVideoDecoderPerformanceCoun this.#framesSkipped += 1; } - increaseFramesDrawn() { + increaseFramesRendered() { this.#framesDrawn += 1; } diff --git a/libraries/scrcpy-decoder-webcodecs/src/video/codec/av1.ts b/libraries/scrcpy-decoder-webcodecs/src/video/codec/av1.ts index a4458f57d..916b6d179 100644 --- a/libraries/scrcpy-decoder-webcodecs/src/video/codec/av1.ts +++ b/libraries/scrcpy-decoder-webcodecs/src/video/codec/av1.ts @@ -1,59 +1,45 @@ import { Av1 } from "@yume-chan/media-codec"; -import type { ScrcpyMediaStreamPacket } from "@yume-chan/scrcpy"; - -import type { CodecDecoder, CodecDecoderOptions } from "./type.js"; - -export class Av1Codec implements CodecDecoder { - #decoder: VideoDecoder; - #updateSize: (width: number, height: number) => void; - #options: CodecDecoderOptions | undefined; - - constructor( - decoder: VideoDecoder, - updateSize: (width: number, height: number) => void, - options?: CodecDecoderOptions, - ) { - this.#decoder = decoder; - this.#updateSize = updateSize; - this.#options = options; - } - - #configure(data: Uint8Array) { - const parser = new Av1(data); - const sequenceHeader = parser.searchSequenceHeaderObu(); - - if (!sequenceHeader) { - return; - } - - const width = sequenceHeader.max_frame_width_minus_1 + 1; - const height = sequenceHeader.max_frame_height_minus_1 + 1; - this.#updateSize(width, height); - - this.#decoder.configure({ - codec: Av1.toCodecString(sequenceHeader), - hardwareAcceleration: - this.#options?.hardwareAcceleration ?? "no-preference", - optimizeForLatency: true, +import { TransformStream } from "@yume-chan/stream-extra"; + +import { convertFrameType } from "../utils/frame-type.js"; + +import type { CodecTransformStream } from "./type.js"; + +export class Av1TransformStream + extends TransformStream< + CodecTransformStream.Input, + CodecTransformStream.Output + > + implements CodecTransformStream +{ + constructor() { + super({ + transform: (packet, controller) => { + if (packet.type === "configuration") { + // AV1 decoder doesn't need configuration packets + return; + } + + const parser = new Av1(packet.data); + const sequenceHeader = parser.searchSequenceHeaderObu(); + + if (sequenceHeader) { + const width = sequenceHeader.max_frame_width_minus_1 + 1; + const height = sequenceHeader.max_frame_height_minus_1 + 1; + + controller.enqueue({ + codec: Av1.toCodecString(sequenceHeader), + codedWidth: width, + codedHeight: height, + }); + } + + controller.enqueue({ + type: convertFrameType(packet.keyframe), + timestamp: packet.timestamp, + data: packet.data, + }); + }, }); } - - decode(packet: ScrcpyMediaStreamPacket): undefined { - if (packet.type === "configuration") { - return; - } - - this.#configure(packet.data); - this.#decoder.decode( - new EncodedVideoChunk({ - // Treat `undefined` as `key`, otherwise it won't decode. - type: packet.keyframe === false ? "delta" : "key", - // HACK: `timestamp` is only used as a marker to skip paused frames, - // so it's fine as long as we can differentiate `0` from non-zeros. - // Hope `packet.pts` won't be too large to lose precision. - timestamp: packet.pts !== undefined ? Number(packet.pts) : 1, - data: packet.data, - }), - ); - } } diff --git a/libraries/scrcpy-decoder-webcodecs/src/video/codec/h264.ts b/libraries/scrcpy-decoder-webcodecs/src/video/codec/h264.ts index 00a6eb45d..946936823 100644 --- a/libraries/scrcpy-decoder-webcodecs/src/video/codec/h264.ts +++ b/libraries/scrcpy-decoder-webcodecs/src/video/codec/h264.ts @@ -1,38 +1,15 @@ import { H264 } from "@yume-chan/media-codec"; -import { H26xDecoder } from "./h26x.js"; -import type { CodecDecoderOptions } from "./type.js"; +import { H26xTransformStream } from "./h26x.js"; -export class H264Decoder extends H26xDecoder { - #decoder: VideoDecoder; - #updateSize: (width: number, height: number) => void; - #options: CodecDecoderOptions | undefined; - - constructor( - decoder: VideoDecoder, - updateSize: (width: number, height: number) => void, - options?: CodecDecoderOptions, - ) { - super(decoder); - - this.#decoder = decoder; - this.#updateSize = updateSize; - this.#options = options; - } - - override configure(data: Uint8Array): void { +export class H264TransformStream extends H26xTransformStream { + override configure(data: Uint8Array): H26xTransformStream.Config { const configuration = H264.parseConfiguration(data); - this.#updateSize( - configuration.croppedWidth, - configuration.croppedHeight, - ); - - this.#decoder.configure({ + return { codec: H264.toCodecString(configuration), - hardwareAcceleration: - this.#options?.hardwareAcceleration ?? "no-preference", - optimizeForLatency: true, - }); + codedHeight: configuration.croppedHeight, + codedWidth: configuration.croppedWidth, + }; } } diff --git a/libraries/scrcpy-decoder-webcodecs/src/video/codec/h265.ts b/libraries/scrcpy-decoder-webcodecs/src/video/codec/h265.ts index d7d91458d..bd29c27b6 100644 --- a/libraries/scrcpy-decoder-webcodecs/src/video/codec/h265.ts +++ b/libraries/scrcpy-decoder-webcodecs/src/video/codec/h265.ts @@ -1,43 +1,18 @@ import { H265 } from "@yume-chan/media-codec"; -import { H26xDecoder } from "./h26x.js"; -import type { CodecDecoderOptions } from "./type.js"; +import { H26xTransformStream } from "./h26x.js"; -export class H265Decoder extends H26xDecoder { - #decoder: VideoDecoder; - #updateSize: (width: number, height: number) => void; - #options: CodecDecoderOptions | undefined; - - constructor( - decoder: VideoDecoder, - updateSize: (width: number, height: number) => void, - options?: CodecDecoderOptions, - ) { - super(decoder); - - this.#decoder = decoder; - this.#updateSize = updateSize; - this.#options = options; - } - - override configure(data: Uint8Array): void { +export class H265TransformStream extends H26xTransformStream { + override configure(data: Uint8Array): H26xTransformStream.Config { const configuration = H265.parseConfiguration(data); - this.#updateSize( - configuration.croppedWidth, - configuration.croppedHeight, - ); - - this.#decoder.configure({ + return { codec: H265.toCodecString(configuration), // Microsoft Edge on Windows requires explicit size, // otherwise it returns frames in incorrect size. // And it needs cropped size, as opposed to the option name. codedWidth: configuration.croppedWidth, codedHeight: configuration.croppedHeight, - hardwareAcceleration: - this.#options?.hardwareAcceleration ?? "no-preference", - optimizeForLatency: true, - }); + }; } } diff --git a/libraries/scrcpy-decoder-webcodecs/src/video/codec/h26x.ts b/libraries/scrcpy-decoder-webcodecs/src/video/codec/h26x.ts index a483fae86..f74b68718 100644 --- a/libraries/scrcpy-decoder-webcodecs/src/video/codec/h26x.ts +++ b/libraries/scrcpy-decoder-webcodecs/src/video/codec/h26x.ts @@ -1,48 +1,47 @@ -import type { ScrcpyMediaStreamPacket } from "@yume-chan/scrcpy"; +import { TransformStream } from "@yume-chan/stream-extra"; -import type { CodecDecoder } from "./type.js"; +import { convertFrameType } from "../utils/frame-type.js"; -export abstract class H26xDecoder implements CodecDecoder { - #config: Uint8Array | undefined; - #decoder: VideoDecoder; +import type { CodecTransformStream } from "./type.js"; - constructor(decoder: VideoDecoder) { - this.#decoder = decoder; - } - - abstract configure(data: Uint8Array): void; +export abstract class H26xTransformStream + extends TransformStream< + CodecTransformStream.Input, + CodecTransformStream.Output + > + implements CodecTransformStream +{ + constructor() { + super({ + transform: (packet, controller) => { + if (packet.type === "configuration") { + controller.enqueue(this.#configure(packet.data)); + return; + } - decode(packet: ScrcpyMediaStreamPacket): undefined { - if (packet.type === "configuration") { - this.#config = packet.data; - this.configure(packet.data); - return; - } + controller.enqueue({ + type: convertFrameType(packet.keyframe), + timestamp: packet.timestamp, + data: packet.data, + }); + }, + }); + } - // For H.264 and H.265, when the stream is in Annex B format - // (which Scrcpy uses, as Android MediaCodec produces), - // configuration data needs to be combined with the first frame data. - // https://www.w3.org/TR/webcodecs-avc-codec-registration/#encodedvideochunk-type - let data: Uint8Array; - if (this.#config !== undefined) { - data = new Uint8Array(this.#config.length + packet.data.length); - data.set(this.#config, 0); - data.set(packet.data, this.#config.length); - this.#config = undefined; - } else { - data = packet.data; - } + abstract configure(data: Uint8Array): H26xTransformStream.Config; - this.#decoder.decode( - new EncodedVideoChunk({ - // Treat `undefined` as `key`, otherwise won't decode. - type: packet.keyframe === false ? "delta" : "key", - // HACK: `timestamp` is only used as a marker to skip paused frames, - // so it's fine as long as we can differentiate `0` from non-zeros. - // Hope `packet.pts` won't be too large to lose precision. - timestamp: packet.pts !== undefined ? Number(packet.pts) : 1, - data, - }), - ); + #configure(data: Uint8Array): CodecTransformStream.Config { + return { + ...this.configure(data), + // For H.264 and H.265, when the stream is in Annex B format + // (which Scrcpy uses, as Android MediaCodec produces), + // configuration data needs to be combined with the first frame data. + // https://www.w3.org/TR/webcodecs-avc-codec-registration/#encodedvideochunk-type + raw: data, + }; } } + +export namespace H26xTransformStream { + export type Config = Omit; +} diff --git a/libraries/scrcpy-decoder-webcodecs/src/video/codec/index.ts b/libraries/scrcpy-decoder-webcodecs/src/video/codec/index.ts index 61263a867..d67ace48b 100644 --- a/libraries/scrcpy-decoder-webcodecs/src/video/codec/index.ts +++ b/libraries/scrcpy-decoder-webcodecs/src/video/codec/index.ts @@ -3,4 +3,3 @@ export * from "./h264.js"; export * from "./h265.js"; export * from "./h26x.js"; export * from "./type.js"; -export * from "./utils.js"; diff --git a/libraries/scrcpy-decoder-webcodecs/src/video/codec/type.ts b/libraries/scrcpy-decoder-webcodecs/src/video/codec/type.ts index 98a76b0c3..627af9edf 100644 --- a/libraries/scrcpy-decoder-webcodecs/src/video/codec/type.ts +++ b/libraries/scrcpy-decoder-webcodecs/src/video/codec/type.ts @@ -1,17 +1,44 @@ -import type { ScrcpyMediaStreamPacket } from "@yume-chan/scrcpy"; +import type { + ScrcpyMediaStreamConfigurationPacket, + ScrcpyMediaStreamDataPacket, +} from "@yume-chan/scrcpy"; +import type { TransformStream } from "@yume-chan/stream-extra"; -export interface CodecDecoder { - decode(packet: ScrcpyMediaStreamPacket): undefined; -} +export type CodecTransformStream = TransformStream< + CodecTransformStream.Input, + CodecTransformStream.Output +>; + +type PartialPlusUndefined = { + [P in keyof T]?: T[P] | undefined; +}; + +type Optional = Omit & + PartialPlusUndefined>; + +export namespace CodecTransformStream { + export type Input = + | ScrcpyMediaStreamConfigurationPacket + | (ScrcpyMediaStreamDataPacket & { timestamp: number }); + + export type Config = VideoDecoderConfig & { + codedWidth: number; + codedHeight: number; + /** + * Sets an optional raw buffer what will be prepended with the first key frame for decoding. + * + * Some codecs (e.g. H.264 and H.265 in Annex B format) + * send configuration in separate packet, + * but the configuration also needs to be feed into the decoder. + */ + raw?: AllowSharedBufferSource; + }; + + export type VideoChunk = Optional; -export interface CodecDecoderOptions { - hardwareAcceleration?: HardwareAcceleration | undefined; + export type Output = Config | VideoChunk; } -export interface CodecDecoderConstructor { - new ( - decoder: VideoDecoder, - updateSize: (width: number, height: number) => void, - options?: CodecDecoderOptions, - ): CodecDecoder; +export interface CodecTransformStreamConstructor { + new (): CodecTransformStream; } diff --git a/libraries/scrcpy-decoder-webcodecs/src/video/codec/utils.ts b/libraries/scrcpy-decoder-webcodecs/src/video/codec/utils.ts deleted file mode 100644 index b6cfe86bd..000000000 --- a/libraries/scrcpy-decoder-webcodecs/src/video/codec/utils.ts +++ /dev/null @@ -1,11 +0,0 @@ -export function hexDigits(value: number) { - return value.toString(16).toUpperCase(); -} - -export function hexTwoDigits(value: number) { - return value.toString(16).toUpperCase().padStart(2, "0"); -} - -export function decimalTwoDigits(value: number) { - return value.toString(10).padStart(2, "0"); -} diff --git a/libraries/scrcpy-decoder-webcodecs/src/video/decoder.ts b/libraries/scrcpy-decoder-webcodecs/src/video/decoder.ts index 53c4eacb7..afaf830ed 100644 --- a/libraries/scrcpy-decoder-webcodecs/src/video/decoder.ts +++ b/libraries/scrcpy-decoder-webcodecs/src/video/decoder.ts @@ -1,25 +1,24 @@ -import type { ScrcpyMediaStreamPacket } from "@yume-chan/scrcpy"; import { ScrcpyVideoCodecId, ScrcpyVideoSizeImpl } from "@yume-chan/scrcpy"; import type { ScrcpyVideoDecoder, ScrcpyVideoDecoderCapability, } from "@yume-chan/scrcpy-decoder-tinyh264"; -import { - PauseControllerImpl, - PerformanceCounterImpl, -} from "@yume-chan/scrcpy-decoder-tinyh264"; -import type { WritableStreamDefaultController } from "@yume-chan/stream-extra"; -import { WritableStream } from "@yume-chan/stream-extra"; +import { noop, PauseController } from "@yume-chan/scrcpy-decoder-tinyh264"; +import { InspectStream } from "@yume-chan/stream-extra"; -import { Av1Codec, H264Decoder, H265Decoder } from "./codec/index.js"; -import type { CodecDecoder, CodecDecoderOptions } from "./codec/type.js"; -import { Pool } from "./pool.js"; +import { + Av1TransformStream, + H264TransformStream, + H265TransformStream, +} from "./codec/index.js"; +import type { CodecTransformStream } from "./codec/type.js"; import type { VideoFrameRenderer } from "./render/index.js"; -import { VideoFrameCapturer } from "./snapshot.js"; - -const VideoFrameCapturerPool = - /* #__PURE__ */ - new Pool(() => new VideoFrameCapturer(), 4); +import { + AutoCanvasRenderer, + BitmapVideoFrameRenderer, + RendererController, +} from "./render/index.js"; +import { TimestampTransforms, VideoDecoderStream } from "./utils/index.js"; export class WebCodecsVideoDecoder implements ScrcpyVideoDecoder { static get isSupported() { @@ -33,6 +32,8 @@ export class WebCodecsVideoDecoder implements ScrcpyVideoDecoder { av1: {}, }; + // #region parameters + #codec: ScrcpyVideoCodecId; get codec() { return this.#codec; @@ -43,16 +44,22 @@ export class WebCodecsVideoDecoder implements ScrcpyVideoDecoder { return this.#renderer; } - #options: CodecDecoderOptions; + // #endregion parameters - #error: Error | undefined; + // #region pause controller - #writable: WritableStream; - #controller!: WritableStreamDefaultController; + #pause = new PauseController(); + get paused() { + return this.#pause.paused; + } get writable() { - return this.#writable; + return this.#pause.writable; } + // #endregion pause controller + + // #region size + #size = new ScrcpyVideoSizeImpl(); get width() { return this.#size.width; @@ -64,272 +71,192 @@ export class WebCodecsVideoDecoder implements ScrcpyVideoDecoder { return this.#size.sizeChanged; } - #counter = new PerformanceCounterImpl(); - get framesDrawn() { - return this.#counter.framesDrawn; + // #endregion size + + // #region raw decoder + + #rawDecoder = new VideoDecoderStream(); + /** + * Gets the number of frames waiting to be decoded. + */ + get decodeQueueSize() { + return this.#rawDecoder.decodeQueueSize; } - get framesPresented() { - return this.#counter.framesPresented; + /** + * Gets an event when a frame is dequeued (either decoded or discarded). + */ + get onDequeue() { + return this.#rawDecoder.onDequeue; } - get framesSkipped() { - return this.#counter.framesSkipped; + /** + * Gets the number of frames decoded by the decoder. + */ + get framesDecoded() { + return this.#rawDecoder.framesDecoded; + } + /** + * Gets the number of frames skipped by the decoder. + */ + get framesSkippedDecoding() { + return this.#rawDecoder.framesSkipped; } - #pause: PauseControllerImpl; - get paused() { - return this.#pause.paused; + // #endregion raw decoder + + #timestampTransforms = new TimestampTransforms(); + /** + * Gets the total time spent processing and decoding frames in milliseconds. + */ + get totalDecodeTime() { + return this.#timestampTransforms.totalDecodeTime; } - #rawDecoder: VideoDecoder; - #decoder: CodecDecoder; + // #region renderer - #framesDecoded = 0; - get framesDecoded() { - return this.#framesDecoded; + #renderController = new RendererController(); + /** + * Gets the number of frames that have been drawn on the renderer. + */ + get framesRendered() { + return this.#renderController.framesRendered; } - #decodingTime = 0; /** - * Accumulated decoding time in milliseconds + * Gets the number of frames that's visible to the user. + * + * Multiple frames might be rendered during one vertical sync interval, + * but only the last of them is represented to the user. + * This costs some performance but reduces latency by 1 frame. + * + * Might be `0` if the renderer is in a nested Web Worker on Chrome due to a Chrome bug. + * https://issues.chromium.org/issues/41483010 */ - get decodingTime() { - return this.#decodingTime; + get framesPresented() { + return this.#renderController.framesPresented; + } + /** + * Gets the number of frames that wasn't drawn on the renderer + * because the renderer can't keep up + */ + get framesSkippedRendering() { + return this.#renderController.framesSkippedRendering; } - #drawing = false; - #nextFrame: VideoFrame | undefined; - #captureFrame: VideoFrame | undefined; + // #endregion renderer /** * Create a new WebCodecs video decoder. */ constructor({ codec, - renderer, - ...options + renderer = new AutoCanvasRenderer(), + hardwareAcceleration = "no-preference", + optimizeForLatency = true, }: WebCodecsVideoDecoder.Options) { this.#codec = codec; this.#renderer = renderer; - this.#options = options; - - this.#rawDecoder = new VideoDecoder({ - output: (frame) => { - if (this.#error) { - frame.close(); - return; - } - - // Skip rendering frames while resuming from pause - if (frame.timestamp === 0) { - frame.close(); - return; - } - - this.#framesDecoded += 1; - this.#decodingTime += - performance.now() - frame.timestamp / 1000; - - this.#captureFrame?.close(); - // PERF: `VideoFrame#clone` is cheap - this.#captureFrame = frame.clone(); - - void this.#draw(frame); - }, - error: (error) => { - this.#setError(error); - }, - }); + let codecTransform: CodecTransformStream; switch (this.#codec) { case ScrcpyVideoCodecId.H264: - this.#decoder = new H264Decoder( - this.#rawDecoder, - this.#updateSize, - this.#options, - ); + codecTransform = new H264TransformStream(); break; case ScrcpyVideoCodecId.H265: - this.#decoder = new H265Decoder( - this.#rawDecoder, - this.#updateSize, - this.#options, - ); + codecTransform = new H265TransformStream(); break; case ScrcpyVideoCodecId.AV1: - this.#decoder = new Av1Codec( - this.#rawDecoder, - this.#updateSize, - this.#options, - ); + codecTransform = new Av1TransformStream(); break; default: // eslint-disable-next-line @typescript-eslint/restrict-template-expressions throw new Error(`Unsupported codec: ${this.#codec}`); } - this.#pause = new PauseControllerImpl( - (packet) => this.#decoder.decode(packet), - (packet, skipRendering) => { - let pts: bigint; - - if (skipRendering) { - // Set `pts` to 0 as a marker for skipping rendering this frame - pts = 0n; - } else { - // Set `pts` to current time to track decoding time - - // Technically `performance.now()` can return 0 (when document starts loading), - // but in practice it's impossible to call it at that time. - const now = performance.now(); - - // `now` can be an integer, so `us` needs a default value - const [ms, us = ""] = now.toString().split("."); - - // Multiply `performance.now()` by 1000 to get microseconds. - // Use string concatenation to prevent precision loss. - pts = BigInt(ms + (us + "000").slice(0, 3)); - } - - // Create a copy of `packet` because other code (like recording) - // needs the original `pts` - return this.#decoder.decode({ - ...packet, - pts, - }); - }, - ); - - this.#writable = new WritableStream({ - start: (controller) => { - if (this.#error) { - controller.error(this.#error); - } else { - this.#controller = controller; - } - }, - write: this.#pause.write, - // Nothing can be disposed when the stream is aborted/closed - // No new frames will arrive, but some frames might still be decoding and/or rendering, - // and they need to be presented. - }); + void this.#pause.readable + // Add timestamp + .pipeThrough(this.#timestampTransforms.addTimestamp) + // Convert Scrcpy packets to `VideoDecoder` config/chunk + .pipeThrough(codecTransform) + // Insert extra `VideoDecoder` config and intercept size changes + .pipeThrough( + new InspectStream((chunk): undefined => { + if ("codec" in chunk) { + chunk.hardwareAcceleration = hardwareAcceleration; + chunk.optimizeForLatency = optimizeForLatency; + + this.#size.setSize(chunk.codedWidth, chunk.codedHeight); + } + }), + ) + // Decode `VideoDecoder` config/chunk to `VideoFrame`s + .pipeThrough(this.#rawDecoder) + // Track decoding time and filter skipped frames + .pipeThrough(this.#timestampTransforms.consumeTimestamp) + // Skip frames if renderer can't keep up + .pipeThrough(this.#renderController) + // Render + .pipeTo(renderer.writable) + // Errors will be handled by source stream + .catch(noop); } - #setError(error: Error) { - if (this.#error) { - return; - } - - this.#error = error; - - try { - this.#controller?.error(error); - } catch { - // ignore - } - - this.dispose(); + pause(): void { + this.#pause.pause(); } - async #draw(frame: VideoFrame) { - try { - if (this.#drawing) { - if (this.#nextFrame) { - // Frame `n` is still drawing, frame `n + m` (m > 0) is waiting, and frame `n + m + 1` comes. - // Dispose frame `n + m` and set frame `n + m + 1` as the next frame. - this.#nextFrame.close(); - this.#counter.increaseFramesSkipped(); - } - this.#nextFrame = frame; - return; - } - - this.#drawing = true; - - do { - this.#updateSize(frame.displayWidth, frame.displayHeight); - - // PERF: Draw every frame to minimize latency at cost of performance. - // When multiple frames are drawn in one vertical sync interval, - // only the last one is visible to users. - // But this ensures users can always see the most up-to-date screen. - // This is also the behavior of official Scrcpy client. - // https://github.com/Genymobile/scrcpy/issues/3679 - await this.#renderer.draw(frame); - frame.close(); - - this.#counter.increaseFramesDrawn(); - - if (this.#nextFrame) { - frame = this.#nextFrame; - this.#nextFrame = undefined; - } else { - break; - } - } while (true); - - this.#drawing = false; - } catch (error) { - this.#setError(error as Error); - } + resume(): undefined { + this.#pause.resume(); } - #updateSize = (width: number, height: number) => { - this.#renderer.setSize(width, height); - this.#size.setSize(width, height); - }; + trackDocumentVisibility(document: Document): () => undefined { + return this.#pause.trackDocumentVisibility(document); + } async snapshot() { - const frame = this.#captureFrame; + const frame = this.#renderController.captureFrame; if (!frame) { return undefined; } - const capturer = await VideoFrameCapturerPool.borrow(); + // First check if the renderer can provide a snapshot natively + let blob = await this.#renderer.snapshot?.(); + if (blob) { + return blob; + } + + // Create a BitmapVideoFrameRenderer to draw the frame + const renderer = new BitmapVideoFrameRenderer(); try { - return await capturer.capture(frame); + const writer = renderer.writable.getWriter(); + await writer.write(frame); + // BitmapVideoFrameRenderer.snapshot will definitely return a value + return await renderer.snapshot(); } finally { - VideoFrameCapturerPool.return(capturer); + renderer.dispose(); } } - pause(): void { - this.#pause.pause(); - } - - resume(): Promise { - return this.#pause.resume(); - } - - trackDocumentVisibility(document: Document): () => undefined { - return this.#pause.trackDocumentVisibility(document); - } - dispose() { - this.#captureFrame?.close(); - - this.#counter.dispose(); - this.#renderer.dispose(); - this.#size.dispose(); - this.#nextFrame?.close(); + // Most cleanup happens automatically when `writable` ends + // (in each stream's `close` callback). + // This method cleanup things that still available after `writable` ends - if (this.#rawDecoder.state !== "closed") { - this.#rawDecoder.close(); - } - - // This class doesn't need to guard against multiple dispose calls - // since most of the logic is already handled in `#pause` this.#pause.dispose(); - - this.#setError(new Error("Attempt to write to a disposed decoder")); + this.#size.dispose(); + this.#renderController.dispose(); + this.#renderer.dispose?.(); } } export namespace WebCodecsVideoDecoder { - export interface Options extends CodecDecoderOptions { + export interface Options extends Pick< + VideoDecoderConfig, + "hardwareAcceleration" | "optimizeForLatency" + > { /** * The video codec to decode */ codec: ScrcpyVideoCodecId; - renderer: VideoFrameRenderer; + renderer?: VideoFrameRenderer | undefined; } } diff --git a/libraries/scrcpy-decoder-webcodecs/src/video/render/auto-canvas.ts b/libraries/scrcpy-decoder-webcodecs/src/video/render/auto-canvas.ts new file mode 100644 index 000000000..89d230628 --- /dev/null +++ b/libraries/scrcpy-decoder-webcodecs/src/video/render/auto-canvas.ts @@ -0,0 +1,35 @@ +import { BitmapVideoFrameRenderer } from "./bitmap.js"; +import type { CanvasVideoFrameRenderer } from "./canvas.js"; +import type { VideoFrameRenderer } from "./type.js"; +import { WebGLVideoFrameRenderer } from "./webgl.js"; + +export class AutoCanvasRenderer implements VideoFrameRenderer { + #inner: CanvasVideoFrameRenderer; + + get canvas() { + return this.#inner.canvas; + } + + get writable() { + return this.#inner.writable; + } + + constructor( + canvas?: HTMLCanvasElement | OffscreenCanvas, + options?: WebGLVideoFrameRenderer.Options, + ) { + if (WebGLVideoFrameRenderer.isSupported) { + this.#inner = new WebGLVideoFrameRenderer(canvas, options); + } else { + this.#inner = new BitmapVideoFrameRenderer(canvas, options); + } + } + + snapshot(): Promise { + return this.#inner.snapshot(); + } + + dispose() { + return this.#inner.dispose(); + } +} diff --git a/libraries/scrcpy-decoder-webcodecs/src/video/render/bitmap.ts b/libraries/scrcpy-decoder-webcodecs/src/video/render/bitmap.ts index 3d2d99a4b..5349a421c 100644 --- a/libraries/scrcpy-decoder-webcodecs/src/video/render/bitmap.ts +++ b/libraries/scrcpy-decoder-webcodecs/src/video/render/bitmap.ts @@ -3,16 +3,27 @@ import { CanvasVideoFrameRenderer } from "./canvas.js"; export class BitmapVideoFrameRenderer extends CanvasVideoFrameRenderer { #context: ImageBitmapRenderingContext; - constructor(canvas?: HTMLCanvasElement | OffscreenCanvas) { - super(canvas); + constructor( + canvas?: HTMLCanvasElement | OffscreenCanvas, + options?: CanvasVideoFrameRenderer.Options, + ) { + super( + async (frame) => { + const bitmap = await createImageBitmap(frame); + this.#context.transferFromImageBitmap(bitmap); + bitmap.close(); + }, + canvas, + options, + ); - this.#context = this.canvas.getContext("bitmaprenderer", { - alpha: false, - })!; - } - - async draw(frame: VideoFrame): Promise { - const bitmap = await createImageBitmap(frame); - this.#context.transferFromImageBitmap(bitmap); + this.#context = (this.canvas as HTMLCanvasElement).getContext( + "bitmaprenderer", + { + // Avoid alpha:false, which can be expensive + // https://developer.mozilla.org/en-US/docs/Web/API/WebGL_API/WebGL_best_practices#avoid_alphafalse_which_can_be_expensive + alpha: true, + }, + )!; } } diff --git a/libraries/scrcpy-decoder-webcodecs/src/video/render/canvas.ts b/libraries/scrcpy-decoder-webcodecs/src/video/render/canvas.ts index ce8d47fd2..34d4a8aa9 100644 --- a/libraries/scrcpy-decoder-webcodecs/src/video/render/canvas.ts +++ b/libraries/scrcpy-decoder-webcodecs/src/video/render/canvas.ts @@ -1,34 +1,152 @@ import type { MaybePromiseLike } from "@yume-chan/async"; import { createCanvas } from "@yume-chan/scrcpy-decoder-tinyh264"; +import { WritableStream } from "@yume-chan/stream-extra"; +import { RedrawController } from "./redraw.js"; import type { VideoFrameRenderer } from "./type.js"; +import { canvasToBlob } from "../utils/snapshot.js"; -export abstract class CanvasVideoFrameRenderer implements VideoFrameRenderer { +export abstract class CanvasVideoFrameRenderer< + TOptions extends CanvasVideoFrameRenderer.Options = + CanvasVideoFrameRenderer.Options, +> implements VideoFrameRenderer { #canvas: HTMLCanvasElement | OffscreenCanvas; get canvas() { return this.#canvas; } - constructor(canvas?: HTMLCanvasElement | OffscreenCanvas) { - if (canvas) { - this.#canvas = canvas; - } else { - this.#canvas = createCanvas(); + #options: TOptions | undefined; + #canvasSize: CanvasVideoFrameRenderer.Options["canvasSize"]; + get options(): Readonly | undefined { + return this.#options; + } + + #resizeObserver: ResizeObserver | undefined; + #displayWidth = Infinity; + #displayHeight = Infinity; + + #draw: (frame: VideoFrame) => MaybePromiseLike; + #controller = new RedrawController((frame) => { + if (this.#canvasSize !== "external") { + this.#updateSize(frame); } + + return this.#draw(frame); + }); + get lastFrame() { + return this.#controller.lastFrame; } - setSize(width: number, height: number): void { - if (this.#canvas.width !== width || this.#canvas.height !== height) { - this.#canvas.width = width; - this.#canvas.height = height; + #writable = new WritableStream({ + write: (frame) => this.#controller.draw(frame), + }); + get writable() { + return this.#writable; + } + + constructor( + draw: (frame: VideoFrame) => MaybePromiseLike, + canvas?: HTMLCanvasElement | OffscreenCanvas, + options?: TOptions, + ) { + this.#draw = draw; + this.#canvas = canvas ?? createCanvas(); + this.#options = options; + this.#canvasSize = options?.canvasSize ?? "video"; + + if (this.#canvasSize === "display") { + if ( + typeof HTMLCanvasElement === "undefined" || + !(this.#canvas instanceof HTMLCanvasElement) + ) { + throw new Error( + "`canvasSize: display` is only supported for HTMLCanvasElement", + ); + } + + this.#resizeObserver = new ResizeObserver((entries) => { + const entry = entries[0]!; + + if (entry.devicePixelContentBoxSize) { + const size = entry.devicePixelContentBoxSize[0]!; + this.#displayWidth = size.inlineSize; + this.#displayHeight = size.blockSize; + } else { + const size = entry.contentBoxSize[0]!; + this.#displayWidth = Math.round( + size.inlineSize * devicePixelRatio, + ); + this.#displayHeight = Math.round( + size.blockSize * devicePixelRatio, + ); + } + + this.#controller.redraw(); + }); + this.#resizeObserver.observe(this.#canvas); } } - abstract draw(frame: VideoFrame): Promise; + #updateSize(frame: VideoFrame) { + let { codedWidth: width, codedHeight: height } = frame; + if (this.#canvasSize === "display") { + width = Math.min(width, this.#displayWidth); + height = Math.min(height, this.#displayHeight); + } + + if (this.#canvas.width === width && this.#canvas.height === height) { + return false; + } + + this.#canvas.width = width; + this.#canvas.height = height; + return true; + } + + /** + * Redraws the last drawn frame. + * + * If a draw or redraw is in progress, it waits for them to finish before redrawing. + * + * If a redraw is in progress and another one is in queue, + * it cancels the queued redraw and redraws the latest frame instead. + */ + redraw() { + this.#controller.redraw(); + } + + async snapshot(): Promise { + if (this.#canvasSize !== "video") { + return undefined; + } + return canvasToBlob(this.#canvas); + } + + dispose(): undefined { + if (this.#canvasSize !== "external") { + this.#canvas.width = 0; + this.#canvas.height = 0; + } + + this.#resizeObserver?.disconnect(); + this.#controller.dispose(); + } +} - dispose(): MaybePromiseLike { - this.#canvas.width = 0; - this.#canvas.height = 0; - return undefined; +export namespace CanvasVideoFrameRenderer { + export interface Options { + /** + * Whether to update the canvas size (rendering resolution) automatically. + * + * * `"video"` (default): update the canvas size to match the video resolution + * * `"display"` (only for `HTMLCanvasElement`): + * update the canvas size to match the display size. + * The display size can be set using `canvas.style.width/height`, + * and must be in correct aspect ratio. + * * `"external"`: use the canvas size as it is. + * The size must be manually set using `canvas.width/height`, + * and must be in correct aspect ratio. + */ + canvasSize?: "video" | "display" | "external"; } } diff --git a/libraries/scrcpy-decoder-webcodecs/src/video/render/flow-control.ts b/libraries/scrcpy-decoder-webcodecs/src/video/render/flow-control.ts new file mode 100644 index 000000000..f8fd43f61 --- /dev/null +++ b/libraries/scrcpy-decoder-webcodecs/src/video/render/flow-control.ts @@ -0,0 +1,157 @@ +import type { ScrcpyVideoDecoderPerformanceCounter } from "@yume-chan/scrcpy-decoder-tinyh264"; +import { PerformanceCounter } from "@yume-chan/scrcpy-decoder-tinyh264"; +import type { + PushReadableStreamController, + ReadableStream, + TransformStream, + WritableStreamDefaultController, +} from "@yume-chan/stream-extra"; +import { PushReadableStream, WritableStream } from "@yume-chan/stream-extra"; + +export class RendererController + implements + TransformStream, + ScrcpyVideoDecoderPerformanceCounter +{ + #readable: ReadableStream; + #readableController!: PushReadableStreamController; + get readable() { + return this.#readable; + } + + #writable: WritableStream; + #writableController!: WritableStreamDefaultController; + get writable() { + return this.#writable; + } + + #captureFrame: VideoFrame | undefined; + get captureFrame() { + return this.#captureFrame; + } + + #nextFrame: VideoFrame | undefined; + + #drawing = false; + + #counter = new PerformanceCounter(); + /** + * Gets the number of frames that have been drawn on the renderer. + */ + get framesRendered() { + return this.#counter.framesRendered; + } + /** + * Gets the number of frames that's visible to the user. + * + * Multiple frames might be rendered during one vertical sync interval, + * but only the last of them is represented to the user. + * This costs some performance but reduces latency by 1 frame. + * + * Might be `0` if the renderer is in a nested Web Worker on Chrome due to a Chrome bug. + * https://issues.chromium.org/issues/41483010 + */ + get framesPresented() { + return this.#counter.framesPresented; + } + /** + * Gets the number of frames that wasn't drawn on the renderer + * because the renderer can't keep up + */ + get framesSkippedRendering() { + return this.#counter.framesSkippedRendering; + } + + constructor() { + this.#readable = new PushReadableStream((controller) => { + this.#readableController = controller; + }); + + this.#writable = new WritableStream({ + start: (controller) => { + this.#writableController = controller; + + const signal = this.#readableController.abortSignal; + signal.addEventListener("abort", () => + controller.error(signal.reason), + ); + }, + write: (frame) => { + this.#captureFrame?.close(); + // `#captureFrame` and `#nextFrame` must not be the same object + // because they need to be closed at different times + this.#captureFrame = frame.clone(); + + // Frame A is drawing, frame B (`#nextFrame`) is waiting, + // then frame C (`frame`) arrives. + // Skip frame B and queue frame C + if (this.#nextFrame) { + this.#nextFrame.close(); + this.#counter.increaseFramesSkipped(); + } + this.#nextFrame = frame; + + // Don't `await` because this writable needs to + // accept incoming frames as fast as produced. + // The `#draw` method then draws the frames + // as fast as the renderer can keep up + void this.#draw(); + }, + close: () => { + this.#readableController.close(); + this.#counter.dispose(); + // Don't close `#captureFrame` to allow using `snapshot` on the last frame + // Don't close `#nextFrame` to make sure all frames are rendered + }, + abort: (reason) => { + this.#readableController.error(reason); + this.#counter.dispose(); + // Don't close `#captureFrame` to allow using `snapshot` on the last frame + // Don't close `#nextFrame` to make sure all frames are rendered + }, + }); + } + + async #draw() { + if (this.#drawing) { + return; + } + this.#drawing = true; + + // PERF: Draw every frame to minimize latency at cost of performance. + // When multiple frames are drawn in one vertical sync interval, + // only the last one is visible to users. + // But this ensures users can always see the most up-to-date screen. + // This is also the behavior of official Scrcpy client. + // https://github.com/Genymobile/scrcpy/issues/3679 + + let frame: VideoFrame | undefined; + while ((frame = this.#nextFrame)) { + this.#nextFrame = undefined; + if (await this.#readableController.enqueue(frame)) { + // The consumer is responsible for closing `frame` + this.#counter.increaseFramesRendered(); + } else { + frame.close(); + } + } + + this.#drawing = false; + } + + dispose() { + this.#captureFrame?.close(); + this.#captureFrame = undefined; + + this.#nextFrame?.close(); + this.#nextFrame = undefined; + + this.#counter.dispose(); + + this.#readableController.close(); + // Throw a similar error to native TransformStream + this.#writableController.error( + new TypeError("The transform stream has been terminated"), + ); + } +} diff --git a/libraries/scrcpy-decoder-webcodecs/src/video/render/index.ts b/libraries/scrcpy-decoder-webcodecs/src/video/render/index.ts index fbaf6c72a..33d42e31c 100644 --- a/libraries/scrcpy-decoder-webcodecs/src/video/render/index.ts +++ b/libraries/scrcpy-decoder-webcodecs/src/video/render/index.ts @@ -2,6 +2,8 @@ export * from "./bitmap.js"; export * from "./canvas.js"; +export * from "./auto-canvas.js"; +export * from "./flow-control.js"; export * from "./insertable-stream.js"; export * from "./type.js"; export * from "./webgl.js"; diff --git a/libraries/scrcpy-decoder-webcodecs/src/video/render/insertable-stream.ts b/libraries/scrcpy-decoder-webcodecs/src/video/render/insertable-stream.ts index 4efab048c..ca8d7717f 100644 --- a/libraries/scrcpy-decoder-webcodecs/src/video/render/insertable-stream.ts +++ b/libraries/scrcpy-decoder-webcodecs/src/video/render/insertable-stream.ts @@ -1,8 +1,6 @@ // cspell: ignore insertable -import type { MaybePromiseLike } from "@yume-chan/async"; -import type { WritableStreamDefaultWriter } from "@yume-chan/stream-extra"; -import { tryClose } from "@yume-chan/stream-extra"; +import type { WritableStream } from "@yume-chan/stream-extra"; import type { VideoFrameRenderer } from "./type.js"; @@ -23,8 +21,14 @@ export class InsertableStreamVideoFrameRenderer implements VideoFrameRenderer { } #generator: MediaStreamTrackGenerator; - #writer: WritableStreamDefaultWriter; + get writable() { + return this.#generator.writable; + } + #stream: MediaStream; + get stream() { + return this.#stream; + } constructor(element?: HTMLVideoElement) { if (element) { @@ -42,32 +46,18 @@ export class InsertableStreamVideoFrameRenderer implements VideoFrameRenderer { this.#element.disablePictureInPicture = true; this.#element.disableRemotePlayback = true; + this.#element.addEventListener("resize", () => { + this.#element.width = this.#element.videoWidth; + this.#element.height = this.#element.videoHeight; + }); + // The spec replaced `MediaStreamTrackGenerator` with `VideoTrackGenerator`. // But Chrome has not implemented it yet. // https://issues.chromium.org/issues/40058895 this.#generator = new MediaStreamTrackGenerator({ kind: "video" }); this.#generator.contentHint = "motion"; - this.#writer = - this.#generator.writable.getWriter() as WritableStreamDefaultWriter; - this.#stream = new MediaStream([this.#generator]); this.#element.srcObject = this.#stream; } - - setSize(width: number, height: number): void { - if (this.#element.width !== width || this.#element.height !== height) { - this.#element.width = width; - this.#element.height = height; - } - } - - draw(frame: VideoFrame): Promise { - return this.#writer.write(frame); - } - - dispose(): MaybePromiseLike { - tryClose(this.#writer); - return undefined; - } } diff --git a/libraries/scrcpy-decoder-webcodecs/src/video/render/redraw.ts b/libraries/scrcpy-decoder-webcodecs/src/video/render/redraw.ts new file mode 100644 index 000000000..5f5cf2217 --- /dev/null +++ b/libraries/scrcpy-decoder-webcodecs/src/video/render/redraw.ts @@ -0,0 +1,102 @@ +import type { MaybePromiseLike } from "@yume-chan/async"; + +/** + * Manages drawing and redrawing of video frames. + */ +export class RedrawController { + #draw: (frame: VideoFrame) => MaybePromiseLike; + + #ready = Promise.resolve(undefined); + #pendingRedraw: AbortController | undefined; + #error: unknown; + + #lastFrame: VideoFrame | undefined; + get lastFrame() { + return this.#lastFrame; + } + + constructor(draw: (frame: VideoFrame) => MaybePromiseLike) { + this.#draw = draw; + } + + /** + * Draws a new frame. + * + * If a redraw is in progress, it waits for the redraw to finish before drawing the new frame. + * + * If a redraw is in progress and another one is in queue, + * it cancels the queued redraw and draws the new frame instead. + * + * @param frame A `VideoFrame` to draw + */ + draw(frame: VideoFrame) { + if (this.#error) { + // eslint-disable-next-line @typescript-eslint/only-throw-error + throw this.#error; + } + + this.#lastFrame?.close(); + this.#lastFrame = frame.clone(); + + this.#pendingRedraw?.abort(); + this.#pendingRedraw = undefined; + + this.#ready = this.#ready.then(async (): Promise => { + try { + await this.#draw(frame); + } catch (e) { + this.#error = e; + throw e; + } finally { + frame.close(); + } + }); + + return this.#ready; + } + + /** + * Redraws the last drawn frame. + * + * If a draw or redraw is in progress, it waits for them to finish before redrawing. + * + * If a redraw is in progress and another one is in queue, + * it cancels the queued redraw and redraws the latest frame instead. + */ + redraw(): void { + if (!this.#lastFrame || this.#pendingRedraw) { + return; + } + + if (this.#error) { + // eslint-disable-next-line @typescript-eslint/only-throw-error + throw this.#error; + } + + const abortController = new AbortController(); + this.#pendingRedraw = abortController; + + this.#ready = this.#ready.then(async (): Promise => { + if (abortController.signal.aborted) { + return; + } + + this.#pendingRedraw = undefined; + + const frame = this.#lastFrame!.clone(); + try { + await this.#draw(frame); + } catch (e) { + this.#error = e; + } finally { + frame.close(); + } + }); + } + + dispose() { + this.#lastFrame?.close(); + this.#pendingRedraw?.abort(); + this.#error = new Error("Can't write to a closed renderer"); + } +} diff --git a/libraries/scrcpy-decoder-webcodecs/src/video/render/type.ts b/libraries/scrcpy-decoder-webcodecs/src/video/render/type.ts index c1b13d5e6..be95e9758 100644 --- a/libraries/scrcpy-decoder-webcodecs/src/video/render/type.ts +++ b/libraries/scrcpy-decoder-webcodecs/src/video/render/type.ts @@ -1,9 +1,10 @@ import type { MaybePromiseLike } from "@yume-chan/async"; +import type { WritableStream } from "@yume-chan/stream-extra"; export interface VideoFrameRenderer { - setSize(width: number, height: number): void; + writable: WritableStream; - draw(frame: VideoFrame): MaybePromiseLike; + snapshot?(): Promise; - dispose(): MaybePromiseLike; + dispose?(): MaybePromiseLike; } diff --git a/libraries/scrcpy-decoder-webcodecs/src/video/render/webgl.ts b/libraries/scrcpy-decoder-webcodecs/src/video/render/webgl.ts index 915cb1de5..ce6c5adce 100644 --- a/libraries/scrcpy-decoder-webcodecs/src/video/render/webgl.ts +++ b/libraries/scrcpy-decoder-webcodecs/src/video/render/webgl.ts @@ -1,4 +1,6 @@ -import type { MaybePromiseLike } from "@yume-chan/async"; +// cspell: ignore highp +// cspell: ignore mediump + import { glCreateContext, glIsSupported, @@ -7,33 +9,163 @@ import { import { CanvasVideoFrameRenderer } from "./canvas.js"; -const Resolved = Promise.resolve(); +function createShader(gl: WebGLRenderingContext, type: number, source: string) { + const shader = gl.createShader(type)!; + gl.shaderSource(shader, source); + gl.compileShader(shader); + return shader; +} + +function createProgram( + gl: WebGLRenderingContext, + vertexShaderSource: string, + fragmentShaderSource: string, +) { + const vertexShader = createShader(gl, gl.VERTEX_SHADER, vertexShaderSource); + const fragmentShader = createShader( + gl, + gl.FRAGMENT_SHADER, + fragmentShaderSource, + ); + + const program = gl.createProgram(); + gl.attachShader(program, vertexShader); + gl.attachShader(program, fragmentShader); + gl.linkProgram(program); + + try { + if (gl.getProgramParameter(program, gl.LINK_STATUS)) { + return program; + } + + // Don't check shader compile status unless linking fails + // https://developer.mozilla.org/en-US/docs/Web/API/WebGL_API/WebGL_best_practices#dont_check_shader_compile_status_unless_linking_fails + if (!gl.getShaderParameter(vertexShader, gl.COMPILE_STATUS)) { + throw new Error(gl.getShaderInfoLog(vertexShader)!); + } + + if (!gl.getShaderParameter(fragmentShader, gl.COMPILE_STATUS)) { + throw new Error(gl.getShaderInfoLog(fragmentShader)!); + } -export class WebGLVideoFrameRenderer extends CanvasVideoFrameRenderer { - static VertexShaderSource = ` + throw new Error(gl.getProgramInfoLog(program)!); + } finally { + // Delete objects eagerly + // https://developer.mozilla.org/en-US/docs/Web/API/WebGL_API/WebGL_best_practices#delete_objects_eagerly + gl.deleteShader(vertexShader); + gl.deleteShader(fragmentShader); + } +} + +export class WebGLVideoFrameRenderer extends CanvasVideoFrameRenderer { + static VertexShader = ` attribute vec2 xy; varying highp vec2 uv; void main(void) { gl_Position = vec4(xy, 0.0, 1.0); + // Map vertex coordinates (-1 to +1) to UV coordinates (0 to 1). + uv = xy * 0.5 + 0.5; // UV coordinates are Y-flipped relative to vertex coordinates. - uv = vec2((1.0 + xy.x) / 2.0, (1.0 - xy.y) / 2.0); + uv.y = 1.0 - uv.y; } `; - static FragmentShaderSource = ` + static FragmentShader = ` precision mediump float; - varying highp vec2 uv; - uniform sampler2D texture; + uniform sampler2D source; + uniform vec2 texelSize; + uniform float zoom; - void main(void) { - gl_FragColor = texture2D(texture, uv); + varying vec2 uv; + + vec4 tent4(vec2 uv) { + vec2 dx = vec2(texelSize.x, 0.0); + vec2 dy = vec2(0.0, texelSize.y); + + vec4 c0 = texture2D(source, uv); + vec4 c1 = texture2D(source, uv + dx); + vec4 c2 = texture2D(source, uv + dy); + vec4 c3 = texture2D(source, uv + dx + dy); + + return 0.25 * (c0 + c1 + c2 + c3); + } + + vec4 gaussian9(vec2 uv) { + vec2 dx = vec2(texelSize.x, 0.0); + vec2 dy = vec2(0.0, texelSize.y); + + vec4 sum = vec4(0.0); + sum += texture2D(source, uv) * 0.227027; + sum += texture2D(source, uv + dx) * 0.1945946; + sum += texture2D(source, uv - dx) * 0.1945946; + sum += texture2D(source, uv + dy) * 0.1945946; + sum += texture2D(source, uv - dy) * 0.1945946; + + return sum; + } + + float mnWeight(float x) { + x = abs(x); + float x2 = x * x; + float x3 = x2 * x; + + if (x < 1.0) { + return (1.0/6.0) * ((12.0 - 9.0 * (1.0/3.0) - 6.0 * (1.0/3.0)) * x3 + + (-18.0 + 12.0 * (1.0/3.0) + 6.0 * (1.0/3.0)) * x2 + + (6.0 - 2.0 * (1.0/3.0))); + } else if (x < 2.0) { + return (1.0/6.0) * ((- (1.0/3.0) - 6.0 * (1.0/3.0)) * x3 + + ((6.0 * (1.0/3.0) + 30.0 * (1.0/3.0)) * x2 + + (-12.0 * (1.0/3.0) - 48.0 * (1.0/3.0)) * x + + (8.0 * (1.0/3.0) + 24.0 * (1.0/3.0)))); + } + return 0.0; + } + + vec4 bicubicMN(vec2 uv, vec2 texelSize) { + vec2 texCoord = uv / texelSize; + vec2 base = floor(texCoord - 0.5); + vec2 f = texCoord - base - 0.5; + + vec4 sum = vec4(0.0); + float total = 0.0; + + for (int j = -1; j <= 2; j++) { + float wy = mnWeight(float(j) - f.y); + for (int i = -1; i <= 2; i++) { + float wx = mnWeight(float(i) - f.x); + float w = wx * wy; + + vec2 coord = (base + vec2(float(i), float(j)) + 0.5) * texelSize; + sum += texture2D(source, coord) * w; + total += w; + } + } + return sum / total; + } + + void main() { + if (zoom > 0.95) { + gl_FragColor = texture2D(source, uv); + } + else if (zoom > 0.5) { + gl_FragColor = bicubicMN(uv, texelSize); + } + else { + gl_FragColor = tent4(uv); + } } `; + /** + * A single oversized triangle that covers the entire canvas. + */ + static Vertices = new Float32Array([-1.0, -1.0, 3.0, -1.0, -1.0, 3.0]); + static get isSupported() { return glIsSupported({ // Disallow software rendering. @@ -42,7 +174,10 @@ export class WebGLVideoFrameRenderer extends CanvasVideoFrameRenderer { }); } - #context: WebGLRenderingContext | WebGL2RenderingContext; + #context: WebGLRenderingContext; + #program!: WebGLProgram; + #texelSizeLocation!: WebGLUniformLocation; + #zoomLocation!: WebGLUniformLocation; /** * Create a new WebGL frame renderer. @@ -53,20 +188,61 @@ export class WebGLVideoFrameRenderer extends CanvasVideoFrameRenderer { */ constructor( canvas?: HTMLCanvasElement | OffscreenCanvas, - enableCapture?: boolean, + options?: WebGLVideoFrameRenderer.Options, ) { - super(canvas); + super( + (frame): undefined => { + const gl = this.#context; + if (gl.isContextLost()) { + return; + } + + gl.texImage2D( + gl.TEXTURE_2D, + 0, + gl.RGBA, + gl.RGBA, + gl.UNSIGNED_BYTE, + frame, + ); + + gl.uniform2f( + this.#texelSizeLocation, + 1.0 / frame.codedWidth, + 1.0 / frame.codedHeight, + ); + + gl.uniform1f( + this.#zoomLocation, + this.canvas.width / frame.codedWidth, + ); + + gl.viewport( + 0, + 0, + gl.drawingBufferWidth, + gl.drawingBufferHeight, + ); + gl.drawArrays(gl.TRIANGLES, 0, 3); + + gl.flush(); + }, + canvas, + options, + ); const gl = glCreateContext(this.canvas, { // Low-power GPU should be enough for video rendering. powerPreference: "low-power", - alpha: false, + // Avoid alpha:false, which can be expensive + // https://developer.mozilla.org/en-US/docs/Web/API/WebGL_API/WebGL_best_practices#avoid_alphafalse_which_can_be_expensive + alpha: true, // Disallow software rendering. // `ImageBitmapRenderingContext` is faster than software-based WebGL. failIfMajorPerformanceCaveat: true, - preserveDrawingBuffer: !!enableCapture, + preserveDrawingBuffer: !!this.options?.enableCapture, // Enable desynchronized mode when not capturing to reduce latency. - desynchronized: !enableCapture, + desynchronized: !this.options?.enableCapture, antialias: false, depth: false, premultipliedAlpha: true, @@ -77,88 +253,100 @@ export class WebGLVideoFrameRenderer extends CanvasVideoFrameRenderer { } this.#context = gl; - const vertexShader = gl.createShader(gl.VERTEX_SHADER)!; - gl.shaderSource( - vertexShader, - WebGLVideoFrameRenderer.VertexShaderSource, - ); - gl.compileShader(vertexShader); - if (!gl.getShaderParameter(vertexShader, gl.COMPILE_STATUS)) { - throw new Error(gl.getShaderInfoLog(vertexShader)!); - } + this.#initialize(); - const fragmentShader = gl.createShader(gl.FRAGMENT_SHADER)!; - gl.shaderSource( - fragmentShader, - WebGLVideoFrameRenderer.FragmentShaderSource, + this.canvas.addEventListener( + "webglcontextlost", + this.#handleContextLost, ); - gl.compileShader(fragmentShader); - if (!gl.getShaderParameter(fragmentShader, gl.COMPILE_STATUS)) { - throw new Error(gl.getShaderInfoLog(fragmentShader)!); - } + this.canvas.addEventListener( + "webglcontextrestored", + this.#handleContextRestored, + ); + } - const shaderProgram = gl.createProgram(); - gl.attachShader(shaderProgram, vertexShader); - gl.attachShader(shaderProgram, fragmentShader); - gl.linkProgram(shaderProgram); - if (!gl.getProgramParameter(shaderProgram, gl.LINK_STATUS)) { - throw new Error(gl.getProgramInfoLog(shaderProgram)!); - } - gl.useProgram(shaderProgram); + #initialize() { + const gl = this.#context; + + this.#program = createProgram( + gl, + WebGLVideoFrameRenderer.VertexShader, + WebGLVideoFrameRenderer.FragmentShader, + ); + gl.useProgram(this.#program); // Vertex coordinates, clockwise from bottom-left. const vertexBuffer = gl.createBuffer(); gl.bindBuffer(gl.ARRAY_BUFFER, vertexBuffer); gl.bufferData( gl.ARRAY_BUFFER, - new Float32Array([-1.0, -1.0, -1.0, +1.0, +1.0, +1.0, +1.0, -1.0]), + WebGLVideoFrameRenderer.Vertices, gl.STATIC_DRAW, ); - const xyLocation = gl.getAttribLocation(shaderProgram, "xy"); + const xyLocation = gl.getAttribLocation(this.#program, "xy"); gl.vertexAttribPointer(xyLocation, 2, gl.FLOAT, false, 0, 0); gl.enableVertexAttribArray(xyLocation); + this.#texelSizeLocation = gl.getUniformLocation( + this.#program, + "texelSize", + )!; + this.#zoomLocation = gl.getUniformLocation(this.#program, "zoom")!; + // Create one texture to upload frames to. const texture = gl.createTexture(); gl.bindTexture(gl.TEXTURE_2D, texture); gl.texParameteri(gl.TEXTURE_2D, gl.TEXTURE_MAG_FILTER, gl.NEAREST); - gl.texParameteri( - gl.TEXTURE_2D, - gl.TEXTURE_MIN_FILTER, - // WebGL 1 doesn't support mipmaps for non-power-of-two textures - gl instanceof WebGL2RenderingContext - ? gl.NEAREST_MIPMAP_LINEAR - : gl.NEAREST, - ); + gl.texParameteri(gl.TEXTURE_2D, gl.TEXTURE_MIN_FILTER, gl.NEAREST); gl.texParameteri(gl.TEXTURE_2D, gl.TEXTURE_WRAP_S, gl.CLAMP_TO_EDGE); gl.texParameteri(gl.TEXTURE_2D, gl.TEXTURE_WRAP_T, gl.CLAMP_TO_EDGE); } - draw(frame: VideoFrame): Promise { - const gl = this.#context; - gl.texImage2D( - gl.TEXTURE_2D, - 0, - gl.RGBA, - gl.RGBA, - gl.UNSIGNED_BYTE, - frame, - ); + #handleContextLost = (e: Event) => { + // Notify WebGL we want to handle context restoration + e.preventDefault(); + }; - // WebGL 1 doesn't support mipmaps for non-power-of-two textures - if (gl instanceof WebGL2RenderingContext) { - gl.generateMipmap(gl.TEXTURE_2D); + #handleContextRestored = () => { + this.#initialize(); + this.redraw(); + }; + + override async snapshot(): Promise { + if (!this.options?.enableCapture) { + return undefined; } + return super.snapshot(); + } - gl.viewport(0, 0, gl.drawingBufferWidth, gl.drawingBufferHeight); - gl.drawArrays(gl.TRIANGLE_FAN, 0, 4); + override dispose(): undefined { + this.#context.deleteProgram(this.#program); - return Resolved; - } + this.canvas.removeEventListener( + "webglcontextlost", + this.#handleContextLost, + ); + this.canvas.removeEventListener( + "webglcontextrestored", + this.#handleContextRestored, + ); - override dispose(): MaybePromiseLike { + // Lose contexts eagerly + // https://developer.mozilla.org/en-US/docs/Web/API/WebGL_API/WebGL_best_practices#lose_contexts_eagerly glLoseContext(this.#context); - return undefined; + + super.dispose(); + } +} + +export namespace WebGLVideoFrameRenderer { + export interface Options extends CanvasVideoFrameRenderer.Options { + /** + * Whether to allow capturing the canvas content using APIs like `readPixels` and `toDataURL`. + * + * Enabling this option may reduce performance. + */ + enableCapture?: boolean; } } diff --git a/libraries/scrcpy-decoder-webcodecs/src/video/snapshot.ts b/libraries/scrcpy-decoder-webcodecs/src/video/snapshot.ts deleted file mode 100644 index e10dbc1ee..000000000 --- a/libraries/scrcpy-decoder-webcodecs/src/video/snapshot.ts +++ /dev/null @@ -1,41 +0,0 @@ -export class VideoFrameCapturer { - #canvas: OffscreenCanvas | HTMLCanvasElement; - #context: ImageBitmapRenderingContext; - - constructor() { - if (typeof OffscreenCanvas !== "undefined") { - this.#canvas = new OffscreenCanvas(1, 1); - } else { - this.#canvas = document.createElement("canvas"); - this.#canvas.width = 1; - this.#canvas.height = 1; - } - this.#context = this.#canvas.getContext("bitmaprenderer", { - alpha: false, - })!; - } - - async capture(frame: VideoFrame): Promise { - this.#canvas.width = frame.displayWidth; - this.#canvas.height = frame.displayHeight; - - const bitmap = await createImageBitmap(frame); - this.#context.transferFromImageBitmap(bitmap); - - if (this.#canvas instanceof OffscreenCanvas) { - return await this.#canvas.convertToBlob({ - type: "image/png", - }); - } else { - return new Promise((resolve, reject) => { - (this.#canvas as HTMLCanvasElement).toBlob((blob) => { - if (!blob) { - reject(new Error("Failed to convert canvas to blob")); - } else { - resolve(blob); - } - }, "image/png"); - }); - } - } -} diff --git a/libraries/scrcpy-decoder-webcodecs/src/video/utils/frame-type.ts b/libraries/scrcpy-decoder-webcodecs/src/video/utils/frame-type.ts new file mode 100644 index 000000000..3f415f141 --- /dev/null +++ b/libraries/scrcpy-decoder-webcodecs/src/video/utils/frame-type.ts @@ -0,0 +1,7 @@ +export function convertFrameType( + keyframe?: boolean, +): EncodedVideoChunkType | undefined { + if (keyframe === true) return "key"; + if (keyframe === false) return "delta"; + return undefined; +} diff --git a/libraries/scrcpy-decoder-webcodecs/src/video/utils/index.ts b/libraries/scrcpy-decoder-webcodecs/src/video/utils/index.ts new file mode 100644 index 000000000..af0c0a415 --- /dev/null +++ b/libraries/scrcpy-decoder-webcodecs/src/video/utils/index.ts @@ -0,0 +1,3 @@ +export * from "./frame-type.js"; +export * from "./timestamp.js"; +export * from "./video-decoder-stream.js"; diff --git a/libraries/scrcpy-decoder-webcodecs/src/video/utils/snapshot.ts b/libraries/scrcpy-decoder-webcodecs/src/video/utils/snapshot.ts new file mode 100644 index 000000000..bb99ef4f0 --- /dev/null +++ b/libraries/scrcpy-decoder-webcodecs/src/video/utils/snapshot.ts @@ -0,0 +1,17 @@ +export function canvasToBlob(canvas: HTMLCanvasElement | OffscreenCanvas) { + if (canvas instanceof OffscreenCanvas) { + return canvas.convertToBlob({ + type: "image/png", + }); + } else { + return new Promise((resolve, reject) => { + canvas.toBlob((blob) => { + if (!blob) { + reject(new Error("Failed to convert canvas to blob")); + } else { + resolve(blob); + } + }, "image/png"); + }); + } +} diff --git a/libraries/scrcpy-decoder-webcodecs/src/video/utils/timestamp.ts b/libraries/scrcpy-decoder-webcodecs/src/video/utils/timestamp.ts new file mode 100644 index 000000000..b82687345 --- /dev/null +++ b/libraries/scrcpy-decoder-webcodecs/src/video/utils/timestamp.ts @@ -0,0 +1,99 @@ +import type { PauseController } from "@yume-chan/scrcpy-decoder-tinyh264"; +import { TransformStream } from "@yume-chan/stream-extra"; + +import type { CodecTransformStream } from "../codec/type.js"; + +const view = new DataView(new ArrayBuffer(8)); + +function nextUp(x: number) { + if (Number.isNaN(x) || x === Infinity) return x; + if (x === 0) return Number.MIN_VALUE; + + // Write the number as a float64 + view.setFloat64(0, x, false); + + let bits = view.getBigUint64(0, false); + + // If x > 0, increment bits; if x < 0, decrement bits + bits += x > 0 ? 1n : -1n; + + view.setBigUint64(0, bits, false); + return view.getFloat64(0, false); +} + +let prevValue = 0; + +export function increasingNow() { + let now = performance.now(); + if (now <= prevValue) { + now = nextUp(prevValue); + } + prevValue = now; + return now; +} + +export class TimestampTransforms { + /** + * Timestamp of the last frame to be skipped by pause controller. + */ + #skipFramesUntil = 0; + + #addTimestamp = new TransformStream< + PauseController.Output, + CodecTransformStream.Input + >({ + transform: (packet, controller) => { + if (packet.type === "configuration") { + controller.enqueue(packet); + return; + } + + // Use `timestamp` to convey `skipRendering` to later step + // and track total decoding time + const timestamp = increasingNow(); + + if (packet.skipRendering) { + this.#skipFramesUntil = timestamp; + } + + controller.enqueue({ + ...packet, + timestamp, + }); + }, + }); + get addTimestamp() { + return this.#addTimestamp; + } + + // This is not in `VideoDecoderStream` because + // this time includes all pre-processing time, + // and requires `EncodedVideoChunk.timestamp` to contain + // local time of when the frame is received, + // which is set by this class. + #totalDecodeTime = 0; + /** + * Gets the total time spent processing and decoding frames in milliseconds. + */ + get totalDecodeTime() { + return this.#totalDecodeTime; + } + + #consumeTimestamp = new TransformStream({ + transform: (frame, controller) => { + // `frame.timestamp` is the same `EncodedVideoChunk.timestamp` set above + this.#totalDecodeTime += performance.now() - frame.timestamp; + + // Don't count these frames as skipped rendering + if (frame.timestamp <= this.#skipFramesUntil) { + frame.close(); + return; + } + + controller.enqueue(frame); + }, + }); + get consumeTimestamp() { + return this.#consumeTimestamp; + } +} diff --git a/libraries/scrcpy-decoder-webcodecs/src/video/utils/video-decoder-stream.ts b/libraries/scrcpy-decoder-webcodecs/src/video/utils/video-decoder-stream.ts new file mode 100644 index 000000000..2c8318ec2 --- /dev/null +++ b/libraries/scrcpy-decoder-webcodecs/src/video/utils/video-decoder-stream.ts @@ -0,0 +1,215 @@ +import { EventEmitter } from "@yume-chan/event"; +import { concatBuffers, TransformStream } from "@yume-chan/stream-extra"; + +import type { CodecTransformStream } from "../codec/type.js"; + +export class VideoDecoderStream extends TransformStream< + CodecTransformStream.Config | CodecTransformStream.VideoChunk, + VideoFrame +> { + /** + * The native decoder. + * + * `transform`, `flush` and `cancel` callbacks don't need to + * check `#decoder.state` for "closed". + * + * Decoder can enter "closed" state by either: + * - Encounter a decoding error: this triggers `controller.error`, + * so no more transformer callbacks will be called. + * - Calling `close` manually: this only happens in `flush` and `cancel`, + * so no more transformer callbacks will be called. + */ + #decoder!: VideoDecoder; + + /** + * Gets the number of frames waiting to be decoded. + */ + get decodeQueueSize() { + return this.#decoder.decodeQueueSize; + } + + #onDequeue = new EventEmitter(); + /** + * Gets an event when a frame is dequeued (either decoded or discarded). + */ + get onDequeue() { + return this.#onDequeue.event; + } + + #framesDecoded = 0; + /** + * Gets the number of frames decoded by the decoder. + */ + get framesDecoded() { + return this.#framesDecoded; + } + + #framesSkipped = 0; + /** + * Gets the number of frames skipped by the decoder. + */ + get framesSkipped() { + return this.#framesSkipped; + } + + #decoderResetCount = 0; + /** + * Gets the number of times the decoder has been reset to catch up new keyframes. + */ + get decoderResetCount() { + return this.#decoderResetCount; + } + + /** + * Saved decoder configuration for use when resetting the native decoder. + */ + #config?: CodecTransformStream.Config; + + #configured = false; + + constructor() { + let decoder!: VideoDecoder; + + super({ + start: (controller) => { + // WARN: can't use `this` here + + decoder = new VideoDecoder({ + output: (frame) => { + this.#framesDecoded += 1; + controller.enqueue(frame); + }, + error: (error) => { + // Propagate decoder error to stream. + controller.error(error); + this.#dispose(); + }, + }); + }, + transform: (chunk) => { + if ("codec" in chunk) { + this.#config = chunk; + this.#configured = false; + return; + } + + this.#handleVideoChunk(chunk); + }, + flush: async () => { + // `flush` can only be called when `state` is "configured". + if (this.#decoder.state === "configured") { + // Wait for all queued frames to be decoded when + // `writable` side ends without exception. + // The `readable` side will wait for `flush` to complete before closing. + await this.#decoder.flush(); + } + this.#dispose(); + }, + cancel: () => { + // Immediately close the decoder on stream cancel/error + this.#dispose(); + }, + }); + + this.#decoder = decoder; + this.#decoder.addEventListener("dequeue", this.#handleDequeue); + } + + #handleVideoChunk(chunk: CodecTransformStream.VideoChunk) { + if (!this.#config) { + throw new Error("Decoder not configured"); + } + + if (chunk.type === "key") { + if (this.#decoder.decodeQueueSize) { + // If the device is too slow to decode all frames, + // discard queued frames when next keyframe arrives. + // (decoding can only start from keyframes) + // This limits the maximum latency to 1 keyframe interval + // (60 frames by default). + this.#framesSkipped += this.#decoder.decodeQueueSize; + this.#decoderResetCount += 1; + this.#decoder.reset(); + + // `reset` also resets the decoder configuration + // so we need to re-configure it again. + this.#configureAndDecodeFirstKeyFrame(this.#config, chunk); + return; + } + + if (!this.#configured) { + this.#configureAndDecodeFirstKeyFrame(this.#config, chunk); + return; + } + + this.#decoder.decode( + // `type` has been checked to be "key" + new EncodedVideoChunk(chunk as EncodedVideoChunkInit), + ); + return; + } + + if (!this.#configured) { + if (chunk.type === undefined) { + // Infer the first frame after configuration as keyframe + // (`VideoDecoder` will throw error if it's not) + this.#configureAndDecodeFirstKeyFrame(this.#config, chunk); + return; + } + + throw new Error("Expect a keyframe but got a delta frame"); + } + + this.#decoder.decode( + new EncodedVideoChunk({ + // Treat `undefined` as "key" otherwise it won't decode + type: chunk.type ?? "key", + timestamp: chunk.timestamp, + duration: chunk.duration!, + data: chunk.data, + }), + ); + } + + #configureAndDecodeFirstKeyFrame( + config: CodecTransformStream.Config, + chunk: CodecTransformStream.VideoChunk, + ) { + this.#decoder.configure(config); + this.#configured = true; + + if (config.raw) { + this.#decoder.decode( + new EncodedVideoChunk({ + type: "key", + timestamp: chunk.timestamp, + duration: chunk.duration!, + data: concatBuffers([config.raw, chunk.data]), + }), + ); + return; + } + + this.#decoder.decode( + new EncodedVideoChunk({ + type: "key", + timestamp: chunk.timestamp, + duration: chunk.duration!, + data: chunk.data, + }), + ); + } + + #handleDequeue = () => { + this.#onDequeue.fire(undefined); + }; + + #dispose() { + this.#decoder.removeEventListener("dequeue", this.#handleDequeue); + this.#onDequeue.dispose(); + + if (this.#decoder.state !== "closed") { + this.#decoder.close(); + } + } +} diff --git a/libraries/scrcpy/src/2_0/impl/parse-audio-stream-metadata.ts b/libraries/scrcpy/src/2_0/impl/parse-audio-stream-metadata.ts index 7adc44f07..b9abf579f 100644 --- a/libraries/scrcpy/src/2_0/impl/parse-audio-stream-metadata.ts +++ b/libraries/scrcpy/src/2_0/impl/parse-audio-stream-metadata.ts @@ -86,13 +86,9 @@ export async function parseAudioStreamMetadata( await controller.enqueue(buffer); const stream = buffered.release(); - const reader = stream.getReader(); - while (true) { - const { done, value } = await reader.read(); - if (done) { - break; - } - await controller.enqueue(value); + + for await (const chunk of stream) { + await controller.enqueue(chunk); } }), }; diff --git a/libraries/stream-extra/src/concat.ts b/libraries/stream-extra/src/concat.ts index 3771c542d..be70b057c 100644 --- a/libraries/stream-extra/src/concat.ts +++ b/libraries/stream-extra/src/concat.ts @@ -135,6 +135,42 @@ export function concatUint8Arrays(chunks: readonly Uint8Array[]): Uint8Array { return output; } +export function toUint8Array( + buffer: ArrayBufferLike | ArrayBufferView, +): Uint8Array { + if (buffer instanceof Uint8Array) { + return buffer; + } + if (ArrayBuffer.isView(buffer)) { + return new Uint8Array( + buffer.buffer, + buffer.byteOffset, + buffer.byteLength, + ); + } + return new Uint8Array(buffer); +} + +export function concatBuffers( + buffers: readonly (ArrayBufferLike | ArrayBufferView)[], +): Uint8Array { + switch (buffers.length) { + case 0: + return EmptyUint8Array; + case 1: + return toUint8Array(buffers[0]!); + } + + const length = buffers.reduce((a, b) => a + b.byteLength, 0); + const output = new Uint8Array(length); + let offset = 0; + for (const buffer of buffers) { + output.set(toUint8Array(buffer), offset); + offset += buffer.byteLength; + } + return output; +} + /** * A `TransformStream` that concatenates `Uint8Array`s. * diff --git a/libraries/stream-extra/src/push-readable.spec.ts b/libraries/stream-extra/src/push-readable.spec.ts index cbca10475..0f7c42324 100644 --- a/libraries/stream-extra/src/push-readable.spec.ts +++ b/libraries/stream-extra/src/push-readable.spec.ts @@ -35,78 +35,60 @@ describe("PushReadableStream", () => { await reader.cancel("reason"); await delay(0); assert.deepStrictEqual( - log.mock.calls.map((call) => call.arguments), + log.mock.calls.map((call) => call.arguments[0]), [ - [ - { - operation: "enqueue", - phase: "start", - source: "producer", - value: 1, - }, - ], - [ - { - operation: "enqueue", - phase: "complete", - source: "producer", - value: 1, - }, - ], - [ - { - operation: "enqueue", - phase: "start", - source: "producer", - value: 2, - }, - ], - [ - { - operation: "enqueue", - phase: "waiting", - source: "producer", - value: 2, - }, - ], - [ - { - operation: "cancel", - phase: "start", - source: "consumer", - }, - ], - [ - { - operation: "cancel", - phase: "complete", - source: "consumer", - }, - ], - [ - { - operation: "enqueue", - phase: "ignored", - source: "producer", - value: 2, - }, - ], - [ - { - explicit: false, - operation: "close", - phase: "start", - source: "producer", - }, - ], - [ - { - explicit: false, - operation: "close", - phase: "ignored", - source: "producer", - }, - ], + { + operation: "enqueue", + phase: "start", + source: "producer", + value: 1, + }, + { + operation: "enqueue", + phase: "complete", + source: "producer", + value: 1, + }, + { + operation: "enqueue", + phase: "start", + source: "producer", + value: 2, + }, + { + operation: "enqueue", + phase: "waiting", + source: "producer", + value: 2, + }, + { + operation: "cancel", + phase: "start", + source: "consumer", + }, + { + operation: "cancel", + phase: "complete", + source: "consumer", + }, + { + operation: "enqueue", + phase: "ignored", + source: "producer", + value: 2, + }, + { + explicit: false, + operation: "close", + phase: "start", + source: "producer", + }, + { + explicit: false, + operation: "close", + phase: "ignored", + source: "producer", + }, ], ); }); @@ -128,94 +110,82 @@ describe("PushReadableStream", () => { // Add extra microtasks to allow all operations to complete await delay(0); assert.deepStrictEqual( - log.mock.calls.map((call) => call.arguments), + log.mock.calls.map((call) => call.arguments[0]), [ - [ - { - operation: "enqueue", - phase: "start", - source: "producer", - value: 1, - }, - ], - [ - { - operation: "enqueue", - phase: "complete", - source: "producer", - value: 1, - }, - ], - [ - { - operation: "enqueue", - phase: "start", - source: "producer", - value: 2, - }, - ], - [ - { - operation: "enqueue", - phase: "waiting", - source: "producer", - value: 2, - }, - ], - [ - { - operation: "cancel", - phase: "start", - source: "consumer", - }, - ], - [ - { - operation: "cancel", - phase: "complete", - source: "consumer", - }, - ], - [ - { - operation: "enqueue", - phase: "ignored", - source: "producer", - value: 2, - }, - ], - [ - { - operation: "enqueue", - phase: "start", - source: "producer", - value: 3, - }, - ], - [ - { - operation: "enqueue", - phase: "ignored", - source: "producer", - value: 3, - }, - ], - [ - { - explicit: false, - operation: "close", - phase: "start", - source: "producer", - }, - ], - [ - { - explicit: false, - operation: "close", - phase: "ignored", - source: "producer", - }, - ], + { + operation: "enqueue", + phase: "start", + source: "producer", + value: 1, + }, + + { + operation: "enqueue", + phase: "complete", + source: "producer", + value: 1, + }, + + { + operation: "enqueue", + phase: "start", + source: "producer", + value: 2, + }, + + { + operation: "enqueue", + phase: "waiting", + source: "producer", + value: 2, + }, + + { + operation: "cancel", + phase: "start", + source: "consumer", + }, + + { + operation: "cancel", + phase: "complete", + source: "consumer", + }, + + { + operation: "enqueue", + phase: "ignored", + source: "producer", + value: 2, + }, + + { + operation: "enqueue", + phase: "start", + source: "producer", + value: 3, + }, + + { + operation: "enqueue", + phase: "ignored", + source: "producer", + value: 3, + }, + + { + explicit: false, + operation: "close", + phase: "start", + source: "producer", + }, + + { + explicit: false, + operation: "close", + phase: "ignored", + source: "producer", + }, ], ); }); @@ -231,99 +201,90 @@ describe("PushReadableStream", () => { undefined, log, ); + const reader = stream.getReader(); await delay(0); + await reader.cancel("reason"); await delay(0); + assert.deepStrictEqual( - log.mock.calls.map((call) => call.arguments), + log.mock.calls.map((call) => call.arguments[0]), [ - [ - { - operation: "enqueue", - phase: "start", - source: "producer", - value: 1, - }, - ], - [ - { - operation: "enqueue", - phase: "complete", - source: "producer", - value: 1, - }, - ], - [ - { - operation: "enqueue", - phase: "start", - source: "producer", - value: 2, - }, - ], - [ - { - operation: "enqueue", - phase: "waiting", - source: "producer", - value: 2, - }, - ], - [ - { - operation: "cancel", - phase: "start", - source: "consumer", - }, - ], - [ - { - operation: "cancel", - phase: "complete", - source: "consumer", - }, - ], - [ - { - operation: "enqueue", - phase: "ignored", - source: "producer", - value: 2, - }, - ], - [ - { - explicit: true, - operation: "close", - phase: "start", - source: "producer", - }, - ], - [ - { - explicit: true, - operation: "close", - phase: "ignored", - source: "producer", - }, - ], - [ - { - explicit: false, - operation: "close", - phase: "start", - source: "producer", - }, - ], - [ - { - explicit: false, - operation: "close", - phase: "ignored", - source: "producer", - }, - ], + { + operation: "enqueue", + phase: "start", + source: "producer", + value: 1, + }, + + { + operation: "enqueue", + phase: "complete", + source: "producer", + value: 1, + }, + + { + operation: "enqueue", + phase: "start", + source: "producer", + value: 2, + }, + + { + operation: "enqueue", + phase: "waiting", + source: "producer", + value: 2, + }, + + { + operation: "cancel", + phase: "start", + source: "consumer", + }, + + { + operation: "cancel", + phase: "complete", + source: "consumer", + }, + + { + operation: "enqueue", + phase: "ignored", + source: "producer", + value: 2, + }, + + { + explicit: true, + operation: "close", + phase: "start", + source: "producer", + }, + + { + explicit: true, + operation: "close", + phase: "ignored", + source: "producer", + }, + + { + explicit: false, + operation: "close", + phase: "start", + source: "producer", + }, + + { + explicit: false, + operation: "close", + phase: "ignored", + source: "producer", + }, ], ); }); @@ -369,38 +330,33 @@ describe("PushReadableStream", () => { value: 1, }); assert.deepStrictEqual( - log.mock.calls.map((call) => call.arguments), + log.mock.calls.map((call) => call.arguments[0]), [ - [ - { - operation: "pull", - phase: "start", - source: "consumer", - }, - ], - [ - { - operation: "pull", - phase: "complete", - source: "consumer", - }, - ], - [ - { - operation: "enqueue", - phase: "start", - source: "producer", - value: 1, - }, - ], - [ - { - operation: "enqueue", - phase: "complete", - source: "producer", - value: 1, - }, - ], + { + operation: "pull", + phase: "start", + source: "consumer", + }, + + { + operation: "pull", + phase: "complete", + source: "consumer", + }, + + { + operation: "enqueue", + phase: "start", + source: "producer", + value: 1, + }, + + { + operation: "enqueue", + phase: "complete", + source: "producer", + value: 1, + }, ], ); }); @@ -421,62 +377,151 @@ describe("PushReadableStream", () => { }); await delay(0); assert.deepStrictEqual( - log.mock.calls.map((call) => call.arguments), + log.mock.calls.map((call) => call.arguments[0]), [ - [ - { - operation: "enqueue", - phase: "start", - source: "producer", - value: 1, - }, - ], - [ - { - operation: "enqueue", - phase: "waiting", - source: "producer", - value: 1, - }, - ], - [ - { - operation: "pull", - phase: "start", - source: "consumer", - }, - ], - [ - { - operation: "pull", - phase: "complete", - source: "consumer", - }, - ], - [ - { - operation: "enqueue", - phase: "complete", - source: "producer", - value: 1, - }, - ], - [ - { - explicit: false, - operation: "close", - phase: "start", - source: "producer", - }, - ], - [ - { - explicit: false, - operation: "close", - phase: "complete", - source: "producer", - }, - ], + { + operation: "enqueue", + phase: "start", + source: "producer", + value: 1, + }, + + { + operation: "enqueue", + phase: "waiting", + source: "producer", + value: 1, + }, + + { + operation: "pull", + phase: "start", + source: "consumer", + }, + + { + operation: "pull", + phase: "complete", + source: "consumer", + }, + + { + operation: "enqueue", + phase: "complete", + source: "producer", + value: 1, + }, + + { + explicit: false, + operation: "close", + phase: "start", + source: "producer", + }, + + { + explicit: false, + operation: "close", + phase: "complete", + source: "producer", + }, + ], + ); + }); + + it("should allow multiple `enqueue`", async () => { + const log = mock.fn>(); + + const stream = new PushReadableStream( + async (controller) => { + // Test calling `enqueue` multiple times without `await` + // (this defeats the purpose of `PushReadableStream` + // and shouldn't be used in production code, + // but we want to make sure it doesn't break) + const p1 = controller.enqueue(1); + const p2 = controller.enqueue(2); + await Promise.all([p1, p2]); + }, + { highWaterMark: 0 }, + log, + ); + await delay(0); + assert.deepStrictEqual( + log.mock.calls.map((call) => call.arguments[0]), + [ + { + source: "producer", + operation: "enqueue", + value: 1, + phase: "start", + }, + { + source: "producer", + operation: "enqueue", + value: 1, + phase: "waiting", + }, + ], + ); + log.mock.resetCalls(); + + const reader = stream.getReader(); + assert.deepStrictEqual(await reader.read(), { + done: false, + value: 1, + }); + assert.deepStrictEqual( + log.mock.calls.map((call) => call.arguments[0]), + [ + { + source: "consumer", + operation: "pull", + phase: "start", + }, + { + source: "consumer", + operation: "pull", + phase: "complete", + }, + { + source: "producer", + operation: "enqueue", + value: 1, + phase: "complete", + }, + ], + ); + log.mock.resetCalls(); + + assert.deepStrictEqual(await reader.read(), { + done: false, + value: 2, + }); + assert.deepStrictEqual( + log.mock.calls.map((call) => call.arguments[0]), + [ + { + source: "consumer", + operation: "pull", + phase: "start", + }, + { + source: "consumer", + operation: "pull", + phase: "complete", + }, + { + source: "producer", + operation: "enqueue", + value: 2, + phase: "start", + }, + { + source: "producer", + operation: "enqueue", + value: 2, + phase: "complete", + }, ], ); }); @@ -502,52 +547,45 @@ describe("PushReadableStream", () => { value: 1, }); assert.deepStrictEqual( - log.mock.calls.map((call) => call.arguments), + log.mock.calls.map((call) => call.arguments[0]), [ - [ - { - operation: "pull", - phase: "start", - source: "consumer", - }, - ], - [ - { - operation: "pull", - phase: "complete", - source: "consumer", - }, - ], - [ - { - operation: "enqueue", - phase: "start", - source: "producer", - value: 1, - }, - ], - [ - { - operation: "pull", - phase: "start", - source: "consumer", - }, - ], - [ - { - operation: "pull", - phase: "complete", - source: "consumer", - }, - ], - [ - { - operation: "enqueue", - phase: "complete", - source: "producer", - value: 1, - }, - ], + { + operation: "pull", + phase: "start", + source: "consumer", + }, + + { + operation: "pull", + phase: "complete", + source: "consumer", + }, + + { + operation: "enqueue", + phase: "start", + source: "producer", + value: 1, + }, + + { + operation: "pull", + phase: "start", + source: "consumer", + }, + + { + operation: "pull", + phase: "complete", + source: "consumer", + }, + + { + operation: "enqueue", + phase: "complete", + source: "producer", + value: 1, + }, ], ); }); @@ -568,54 +606,47 @@ describe("PushReadableStream", () => { }); await delay(0); assert.deepStrictEqual( - log.mock.calls.map((call) => call.arguments), + log.mock.calls.map((call) => call.arguments[0]), [ - [ - { - operation: "enqueue", - phase: "start", - source: "producer", - value: 1, - }, - ], - [ - { - operation: "enqueue", - phase: "complete", - source: "producer", - value: 1, - }, - ], - [ - { - operation: "pull", - phase: "start", - source: "consumer", - }, - ], - [ - { - operation: "pull", - phase: "complete", - source: "consumer", - }, - ], - [ - { - explicit: false, - operation: "close", - phase: "start", - source: "producer", - }, - ], - [ - { - explicit: false, - operation: "close", - phase: "complete", - source: "producer", - }, - ], + { + operation: "enqueue", + phase: "start", + source: "producer", + value: 1, + }, + + { + operation: "enqueue", + phase: "complete", + source: "producer", + value: 1, + }, + + { + operation: "pull", + phase: "start", + source: "consumer", + }, + + { + operation: "pull", + phase: "complete", + source: "consumer", + }, + + { + explicit: false, + operation: "close", + phase: "start", + source: "producer", + }, + + { + explicit: false, + operation: "close", + phase: "complete", + source: "producer", + }, ], ); }); @@ -655,5 +686,69 @@ describe("PushReadableStream", () => { const reader = stream.getReader(); assert.strictEqual(await reader.closed, undefined); }); + + it("should cancel pending `enqueue`", async () => { + const log = mock.fn>(); + + new PushReadableStream( + async (controller) => { + controller.enqueue(1); + controller.enqueue(2); + await delay(0); + }, + { highWaterMark: 1 }, + log, + ); + + await delay(0); + + assert.deepStrictEqual( + log.mock.calls.map((call) => call.arguments[0]), + [ + { + source: "producer", + operation: "enqueue", + value: 1, + phase: "start", + }, + { + source: "producer", + operation: "enqueue", + value: 1, + phase: "complete", + }, + { + source: "producer", + operation: "enqueue", + value: 2, + phase: "start", + }, + { + source: "producer", + operation: "enqueue", + value: 2, + phase: "waiting", + }, + { + source: "producer", + operation: "close", + explicit: false, + phase: "start", + }, + { + source: "producer", + operation: "close", + explicit: false, + phase: "complete", + }, + { + source: "producer", + operation: "enqueue", + value: 2, + phase: "ignored", + }, + ], + ); + }); }); }); diff --git a/libraries/stream-extra/src/push-readable.ts b/libraries/stream-extra/src/push-readable.ts index 8302b79bb..b9acd3b86 100644 --- a/libraries/stream-extra/src/push-readable.ts +++ b/libraries/stream-extra/src/push-readable.ts @@ -1,12 +1,20 @@ -import { PromiseResolver } from "@yume-chan/async"; +import { + isPromiseLike, + PromiseResolver, + type MaybePromise, +} from "@yume-chan/async"; -import type { AbortSignal, QueuingStrategy } from "./stream.js"; +import type { + AbortSignal, + QueuingStrategy, + ReadableStreamDefaultController, +} from "./stream.js"; import { AbortController, ReadableStream } from "./stream.js"; export interface PushReadableStreamController { abortSignal: AbortSignal; - enqueue(chunk: T): Promise; + enqueue(chunk: T): Promise; close(): void; @@ -51,216 +59,228 @@ export class PushReadableStream extends ReadableStream { strategy?: QueuingStrategy, logger?: PushReadableLogger, ) { - let waterMarkLow: PromiseResolver | undefined; + let controller_!: ReadableStreamDefaultController; + let ready: Promise | undefined; + let zeroHighWaterMarkAllowEnqueue = false; + // Resolves when consumer calls `reader.read`. + // Rejects when producer calls `controller.close` or `controller.error`, + // or consumer calls `reader.cancel`. + let waterMarkLow: PromiseResolver | undefined; + const abortController = new AbortController(); + let stopped = false; - super( - { - start: (controller) => { - const result = source({ - abortSignal: abortController.signal, - enqueue: async (chunk) => { - logger?.({ - source: "producer", - operation: "enqueue", - value: chunk, - phase: "start", - }); + const enqueue = (chunk: T): MaybePromise => { + logger?.({ + source: "producer", + operation: "enqueue", + value: chunk, + phase: "start", + }); - if (abortController.signal.aborted) { - // In original `ReadableStream`, calling `enqueue` or `close` - // on an cancelled stream will throw an error, - // - // But in `PushReadableStream`, `enqueue` is an async function, - // the producer can't just check `abortSignal.aborted` - // before calling `enqueue`, as it might change when waiting - // for the backpressure to be reduced. - // - // So IMO it's better to handle this for the producer - // by simply ignoring the `enqueue` call. - // - // Note that we check `abortSignal.aborted` instead of `stopped`, - // as it's not allowed for the producer to call `enqueue` after - // they called `close` or `error`. - // - // Obviously, the producer should listen to the `abortSignal` and - // stop producing, but most pushing data sources don't support that. - logger?.({ - source: "producer", - operation: "enqueue", - value: chunk, - phase: "ignored", - }); - return; - } + if (abortController.signal.aborted) { + // In original `ReadableStream`, calling `enqueue` or `close` + // on an cancelled stream will throw an error, + // + // But in `PushReadableStream`, `enqueue` is an async function, + // the producer can't just check `abortSignal.aborted` + // before calling `enqueue`, as it might change when waiting + // for the backpressure to be reduced. + // + // So IMO it's better to handle this for the producer + // by simply ignoring the `enqueue` call. + // + // Note that we check `abortSignal.aborted` instead of `stopped`, + // as it's not allowed for the producer to call `enqueue` after + // they called `close` or `error`. + // + // Obviously, the producer should listen to the `abortSignal` and + // stop producing, but most pushing data sources don't support that. + logger?.({ + source: "producer", + operation: "enqueue", + value: chunk, + phase: "ignored", + }); + return false; + } - if (controller.desiredSize === null) { - // `desiredSize` being `null` means the stream is in error state, - // `controller.enqueue` will throw an error for us. - controller.enqueue(chunk); - // istanbul ignore next - return; - } + if (controller_.desiredSize === null) { + // `desiredSize` being `null` means the stream is in error state, + // `controller.enqueue` will throw an error for us. + controller_.enqueue(chunk); + // istanbul ignore next + throw new Error("unreachable"); + } - if (zeroHighWaterMarkAllowEnqueue) { - // When `highWaterMark` is set to `0`, - // `controller.desiredSize` will always be `0`, - // even if the consumer has called `reader.read()`. - // (in this case, each `reader.read()`/`pull` - // should allow one `enqueue` of any size) - // - // If the consumer has already called `reader.read()`, - // before the producer tries to `enqueue`, - // `controller.desiredSize` is `0` and normal `waterMarkLow` signal - // will never trigger, - // (because `ReadableStream` prevents reentrance of `pull`) - // The stream will stuck. - // - // So we need a special signal for this case. - zeroHighWaterMarkAllowEnqueue = false; - controller.enqueue(chunk); - logger?.({ - source: "producer", - operation: "enqueue", - value: chunk, - phase: "complete", - }); - return; - } + if (zeroHighWaterMarkAllowEnqueue) { + // When `highWaterMark` is set to `0`, + // `controller.desiredSize` will always be `0`, + // even if the consumer has called `reader.read()`. + // (in this case, each `reader.read()`/`pull` + // should allow one `enqueue` of any size) + // + // If the consumer has already called `reader.read()`, + // before the producer tries to `enqueue`, + // `controller.desiredSize` is `0` and normal `waterMarkLow` signal + // will never trigger, + // (because `ReadableStream` prevents reentrance of `pull`) + // The stream will stuck. + // + // So we need a special signal for this case. + zeroHighWaterMarkAllowEnqueue = false; + controller_.enqueue(chunk); + logger?.({ + source: "producer", + operation: "enqueue", + value: chunk, + phase: "complete", + }); + return true; + } + + if (controller_.desiredSize <= 0) { + logger?.({ + source: "producer", + operation: "enqueue", + value: chunk, + phase: "waiting", + }); + + waterMarkLow = new PromiseResolver(); + return waterMarkLow.promise.then( + (): boolean => { + controller_.enqueue(chunk); + logger?.({ + source: "producer", + operation: "enqueue", + value: chunk, + phase: "complete", + }); + return true; + }, + (): boolean => { + // Only ignore this in-flight `enqueue` call + // future calls will trigger `desiredSize === null` and throw + logger?.({ + source: "producer", + operation: "enqueue", + value: chunk, + phase: "ignored", + }); + return false; + }, + ); + } + + controller_.enqueue(chunk); + logger?.({ + source: "producer", + operation: "enqueue", + value: chunk, + phase: "complete", + }); + return true; + }; + + const close = (explicit: boolean) => { + logger?.({ + source: "producer", + operation: "close", + explicit, + phase: "start", + }); + + // Allow calling `controller.close` on cancelled stream as `enqueue` does + // Ignore inexplicit close on any stopped state + // But don't allow calling `controller.close` multiple times + if (abortController.signal.aborted || (stopped && !explicit)) { + logger?.({ + source: "producer", + operation: "close", + explicit, + phase: "ignored", + }); + return; + } + + stopped = true; + controller_.close(); + // Wake up pending `enqueue` + waterMarkLow?.reject(); + + logger?.({ + source: "producer", + operation: "close", + explicit, + phase: "complete", + }); + }; + + const error = (error: unknown, explicit: boolean) => { + logger?.({ + source: "producer", + operation: "error", + explicit, + phase: "start", + }); - if (controller.desiredSize <= 0) { - logger?.({ - source: "producer", - operation: "enqueue", - value: chunk, - phase: "waiting", - }); + stopped = true; + // `controller.error` won't throw on closed/errored/cancelled stream + // so don't need any checks + controller_.error(error); + // Wake up pending `enqueue` + waterMarkLow?.reject(); - waterMarkLow = new PromiseResolver(); - await waterMarkLow.promise; + logger?.({ + source: "producer", + operation: "error", + explicit, + phase: "complete", + }); + }; - // Recheck consumer cancellation after async operations. - if (abortController.signal.aborted) { - logger?.({ - source: "producer", - operation: "enqueue", - value: chunk, - phase: "ignored", - }); - return; + super( + { + start: (controller) => { + controller_ = controller; + + const result = source({ + abortSignal: abortController.signal, + enqueue: async (chunk) => { + if (!ready) { + // If all `enqueue` calls are synchronous, + // `ready` will always be `undefined`. + // This avoid an extra microtask for each `enqueue` call. + const result = enqueue(chunk); + if (result instanceof Promise) { + ready = result; } + return result; + } else { + // Chain `enqueue` calls + ready = ready.then(() => enqueue(chunk)); } - - controller.enqueue(chunk); - logger?.({ - source: "producer", - operation: "enqueue", - value: chunk, - phase: "complete", - }); + return ready; }, close() { - logger?.({ - source: "producer", - operation: "close", - explicit: true, - phase: "start", - }); - - // Since `enqueue` on an cancelled stream won't throw an error, - // so does `close`. - if (abortController.signal.aborted) { - logger?.({ - source: "producer", - operation: "close", - explicit: true, - phase: "ignored", - }); - return; - } - - controller.close(); - logger?.({ - source: "producer", - operation: "close", - explicit: true, - phase: "complete", - }); + close(true); }, error(e) { - logger?.({ - source: "producer", - operation: "error", - explicit: true, - phase: "start", - }); - - // Calling `error` on an already closed or errored stream is a no-op. - controller.error(e); - - logger?.({ - source: "producer", - operation: "error", - explicit: true, - phase: "complete", - }); + error(e, true); }, }); - if (result && "then" in result) { + if (!stopped && isPromiseLike(result)) { // If `source` returns a `Promise`, // close the stream when the `Promise` is resolved, // and error the stream when the `Promise` is rejected. // The producer can return a never-settling `Promise` // to disable this behavior. result.then( - () => { - logger?.({ - source: "producer", - operation: "close", - explicit: false, - phase: "start", - }); - - try { - controller.close(); - - logger?.({ - source: "producer", - operation: "close", - explicit: false, - phase: "complete", - }); - } catch { - logger?.({ - source: "producer", - operation: "close", - explicit: false, - phase: "ignored", - }); - - // The stream is already closed by the producer, - // Or cancelled by the consumer. - } - }, - (e) => { - logger?.({ - source: "producer", - operation: "error", - explicit: false, - phase: "start", - }); - - controller.error(e); - - logger?.({ - source: "producer", - operation: "error", - explicit: false, - phase: "complete", - }); - }, + () => close(false), + (e) => error(e, false), ); } }, @@ -272,7 +292,8 @@ export class PushReadableStream extends ReadableStream { }); if (waterMarkLow) { - waterMarkLow.resolve(); + waterMarkLow.resolve(undefined); + waterMarkLow = undefined; } else if (strategy?.highWaterMark === 0) { zeroHighWaterMarkAllowEnqueue = true; } @@ -290,9 +311,9 @@ export class PushReadableStream extends ReadableStream { phase: "start", }); + stopped = true; abortController.abort(reason); - // Resolve it on cancellation. `pull` will check `abortSignal.aborted` again. - waterMarkLow?.resolve(); + waterMarkLow?.reject(); logger?.({ source: "consumer", diff --git a/libraries/struct/src/utils.ts b/libraries/struct/src/utils.ts index 5aeebab10..566f6136b 100644 --- a/libraries/struct/src/utils.ts +++ b/libraries/struct/src/utils.ts @@ -8,6 +8,10 @@ interface TextEncoder { encode(input: string): Uint8Array; + encodeInto( + source: string, + destination: Uint8Array, + ): { read: number; written: number }; } interface TextDecoder { @@ -25,7 +29,7 @@ interface GlobalExtension { export const { TextEncoder, TextDecoder } = globalThis as unknown as GlobalExtension; -const SharedEncoder = /* #__PURE__ */ new TextEncoder(); +export const SharedEncoder = /* #__PURE__ */ new TextEncoder(); const SharedDecoder = /* #__PURE__ */ new TextDecoder(); /* #__NO_SIDE_EFFECTS__ */