feat(audio): [DEMON] route graph-node audio to WebRTC/MPEG-TS sink#1033
Open
leszko wants to merge 9 commits into
Open
feat(audio): [DEMON] route graph-node audio to WebRTC/MPEG-TS sink#1033leszko wants to merge 9 commits into
leszko wants to merge 9 commits into
Conversation
Adds the plumbing needed for plain-node graphs (e.g. DEMON's ACEStep music-cover workflow) to emit audio out the WebRTC/MPEG-TS sink: - ``NodeProcessor`` learns latch-with-rerun for non-continuous nodes (parameter updates re-fire even when no fresh inputs arrive), continuous-node input merging from the prior tick's latch (so static one-shot handles like MODEL/VAE/CLIP persist), and an audio routing path that detects when an output port feeds a sink and pushes through ``audio_output_queue`` with PTS metadata derived from the chunk's ``start_sample``. - ``graph_executor`` recognises audio→sink edges and tags the feeder processor's ``audio_sink_ports`` instead of allocating a sink queue nobody drains. - ``AudioProcessingTrack`` trims duplicate-prefix samples when the decoder emits overlapping windows (e.g. ``vae_overlap`` + ``follow_playhead``), keyed off the new PTS. - ``mcp_router`` / ``webrtc`` derive audio/video modalities from the graph's sink edges when a graph is present — authoritative over stale ``pipeline_ids`` left from a previous workflow load. - ``HeadlessTsStreamer`` no longer requires a video frame to bootstrap; audio-only graphs init the MPEG-TS container from the first audio chunk. - Adds a built-in ``audio.AudioSource`` node (WAV file → 100 ms chunks or full-clip emit; per-session state resets on shutdown so a re-Run doesn't latch on "already emitted"). - App logging honours ``ACESTEP_BRIDGE_TRACE=1`` to surface bridge diagnostics from ``scope_plugin``. - Adds ``demon_workflow.json`` — a ready-to-import 120 s music-cover graph that exercises all of the above. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Rafal Leszko <rafal@livepeer.org>
Follow-up cleanup of the DEMON integration: - Replace ``_graph_produces_audio`` / ``_graph_has_video_to_sink`` (mcp_router) and ``_graph_sink_modalities`` (webrtc) with a single ``GraphConfig.get_sink_modalities()`` method that returns ``(has_video, has_audio)``. Both call sites now go through the parsed schema instead of re-walking the raw dict. - Drop unused ``SCOPE_NODE_TRACE`` env flag + the bare ``import os`` from ``nodes/processor.py``. - Simplify ``AudioSourceNode``'s "fire once in full mode" latch from a ``(file, mode)`` tuple to a plain ``bool``. - Generalise the plugin trace knob in ``app.py`` to honour both the generic ``SCOPE_PLUGIN_TRACE`` and the legacy ``ACESTEP_BRIDGE_TRACE`` so existing DEMON instructions keep working. Behaviour-preserving — no observable changes. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Rafal Leszko <rafal@livepeer.org>
|
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
…onfig A plugin should surface its own diagnostics — having Scope hardcode env var names like ``SCOPE_PLUGIN_TRACE`` / ``ACESTEP_BRIDGE_TRACE`` and lift the ``scope_plugin`` logger to INFO based on them puts plugin-specific concerns into core. Each plugin can set its own logger level inside its module init (e.g. DEMON's ``scope_plugin.bridge``) when its own trace flag is set. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Rafal Leszko <rafal@livepeer.org>
Contributor
🚀 fal.ai Preview Deployment
Testing on Cloud |
Strip ``AudioSourceNode`` to the only path the DEMON workflow actually uses: load a WAV, emit the whole clip, done. Removes a pile of dead streaming-mode machinery that was speculative. Source changes: - Switch ``continuous=True`` → ``continuous=False``. ``NodeProcessor`` now re-runs the node only when a parameter changes, which is the right semantic for "load file, emit clip" — and incidentally fixes the ~100/sec ``AudioSource: file not found`` log flood that ``continuous`` was producing on a missing ``file_id``. - Drop ``_emit_chunk`` (chunked streaming) and ``_emit_full`` (inlined). - Drop the ``mode`` and ``pacing`` ``NodeParam``s; nothing reads them. - Drop ``_position``, ``_last_call_time``, ``_full_emitted`` fields and the ``shutdown()`` reset that only existed to clear them. - Drop the ``CHUNK_DURATION`` / ``CHUNK_SAMPLES`` constants and the ``time`` import. - Drop the defensive ``.copy()`` on the emitted buffer — with one-shot emission there's no aliasing risk. - Collapse ``_resolve_path``'s "is absolute and exists" + "exists" branches into one. - Keep: ``duration`` in the cache key (real bug fix), WAV decoder (PCM 8/16/24/32 + IEEE float 32/64 + ``WAVE_FORMAT_EXTENSIBLE``), resample, mono→stereo, asset-dir fallback. Other follow-ups on the DEMON audio wiring: - Rewrite the ``_route_outputs`` comment so it describes what the code actually does (audio sinks fan out via ``audio_output_queue``; non-sink consumers still go through ``output_queues``). - Collapse the ``_route_audio`` timestamp construction to a single conditional expression. Tests: - ``test_graph_sink_modalities.py`` — 6 cases covering ``GraphConfig.get_sink_modalities()``. - ``test_audio_source_node.py`` — WAV decoder (PCM16, IEEE-float32, malformed input) and ``AudioSourceNode`` (full-clip emission, missing file is silent, duration cache key). - ``TestOverlapTrim`` in ``test_audio_processing_track.py`` — three cases for ACEStep ``StreamVAEDecode`` overlap trimming. Net: ``audio_io.py`` shrinks from 307 → 222 lines. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Rafal Leszko <rafal@livepeer.org>
Trim the changes against ``main`` without altering behavior: - Drop the ``_pending_inputs`` field; drain straight into ``_last_inputs`` and use a local ``fresh`` dict per tick to gate the "skip when nothing changed" check. - Collapse the three-branch ``_process_once`` (source / continuous / non-continuous) into one drain-cache-fire flow. - Revert the class docstring tweak. - Trim multi-paragraph comments around ``audio_sink_ports``, ``audio_output_queue``, ``_last_inputs``, ``update_parameters``, and ``_route_outputs`` to one-liners where the code speaks for itself. - Rewrite ``update_parameters`` change-detection as a single ``any(...)``. Net diff on ``processor.py`` vs ``main`` drops from +170/-31 to +105/-22. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Rafal Leszko <rafal@livepeer.org>
``NodeProcessor._route_audio`` was bridging two layers — pulling tensor fields out of a node output, normalizing device/shape/dtype, and building an ``AudioPacket`` with a PTS-bearing ``MediaTimestamp``. All of that is server-domain work; only the queue push is core-domain. Extract the construction into a free function ``audio_packet_from_node_output`` next to ``AudioPacket`` itself, and have ``NodeProcessor._route_audio`` call it via the same lazy import that the original code already needed for ``AudioPacket``/``torch``. ``_route_audio`` shrinks from 47 lines to 14 — it now just converts and applies the backpressured put — and the tensor-shape knowledge lives beside the types it produces. Four focused tests cover the new helper (tuple input, waveform-object PTS, shape/dtype normalization, missing waveform). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Rafal Leszko <rafal@livepeer.org>
The demo workflow lives in the PR description instead so it doesn't ship in the repo as committed product. Anyone wanting to try the DEMON graph can copy the JSON from there. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Rafal Leszko <rafal@livepeer.org>
Scope serves at most one session at a time, so the ``WeakSet`` + lock + iteration in the playhead registry are over-engineered. Replace them with a single module-level optional reference that each new ``AudioProcessingTrack`` overwrites in ``__init__``. The ``readyState != "live"`` check still guards the brief gap between a session ending and the next one starting. Drops the ``threading`` and ``weakref`` imports added by this PR. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Rafal Leszko <rafal@livepeer.org>
The graph-aware modality detection added in this PR was REPLACING the registry result, which broke pipelines that produce audio internally without exposing it as a graph port — most notably LTX-2. In graph mode, the graph reports ``(video, no audio)`` because LTX-2's audio output isn't on a port, and the previous code dropped ``produces_audio`` to False, so WebRTC never created an audio track. Fix: start from the registry (it knows about internal audio), then OR in whatever the graph adds. Only drop video when the graph is *explicitly* audio-only — that's the DEMON case where ``pipeline_ids`` can be stale ``longlive`` from a previous workflow load. This mirrors the logic already used in mcp_router.py. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Rafal Leszko <rafal@livepeer.org>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Lets plain-node graphs (e.g. DEMON's ACEStep music-cover workflow) emit audio through the WebRTC / MPEG-TS sink.
Demo
demo_demon.mp4
Workflow
demon.scope-workflow.json
Changes
NodeProcessor: latch-with-rerun for non-continuous nodes, persisted one-shot inputs (MODEL/VAE/CLIP), and an audio-routing path that pushes toaudio_output_queuewith PTS.graph_executor: tag the feeder'saudio_sink_portsfor audio→sink edges instead of allocating an unused sink queue.AudioProcessingTrack: trim duplicate-prefix samples on overlapping decoder windows (e.g.vae_overlap+follow_playhead).mcp_router/webrtc: derive sink modalities from the graph viaGraphConfig.get_sink_modalities(), OR'd with the registry so pipelines with internal audio (e.g. LTX-2) keep their audio track.HeadlessTsStreamer: bootstrap the MPEG-TS container from the first audio chunk (audio-only graphs).audio.AudioSourcenode: WAV file → one-shot full-clip emit.AudioPacketconstruction moved next to the type, inmedia_packets.