From 02f1e8a767d7ba7000f85cce138cef9a295fa236 Mon Sep 17 00:00:00 2001 From: Rafal Leszko Date: Wed, 13 May 2026 14:47:05 +0000 Subject: [PATCH 1/9] feat(audio): wire AceStep/DEMON-style graph nodes into the audio sink MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the plumbing needed for plain-node graphs (e.g. DEMON's ACEStep music-cover workflow) to emit audio out the WebRTC/MPEG-TS sink: - ``NodeProcessor`` learns latch-with-rerun for non-continuous nodes (parameter updates re-fire even when no fresh inputs arrive), continuous-node input merging from the prior tick's latch (so static one-shot handles like MODEL/VAE/CLIP persist), and an audio routing path that detects when an output port feeds a sink and pushes through ``audio_output_queue`` with PTS metadata derived from the chunk's ``start_sample``. - ``graph_executor`` recognises audio→sink edges and tags the feeder processor's ``audio_sink_ports`` instead of allocating a sink queue nobody drains. - ``AudioProcessingTrack`` trims duplicate-prefix samples when the decoder emits overlapping windows (e.g. ``vae_overlap`` + ``follow_playhead``), keyed off the new PTS. - ``mcp_router`` / ``webrtc`` derive audio/video modalities from the graph's sink edges when a graph is present — authoritative over stale ``pipeline_ids`` left from a previous workflow load. - ``HeadlessTsStreamer`` no longer requires a video frame to bootstrap; audio-only graphs init the MPEG-TS container from the first audio chunk. - Adds a built-in ``audio.AudioSource`` node (WAV file → 100 ms chunks or full-clip emit; per-session state resets on shutdown so a re-Run doesn't latch on "already emitted"). - App logging honours ``ACESTEP_BRIDGE_TRACE=1`` to surface bridge diagnostics from ``scope_plugin``. - Adds ``demon_workflow.json`` — a ready-to-import 120 s music-cover graph that exercises all of the above. Co-Authored-By: Claude Opus 4.7 (1M context) Signed-off-by: Rafal Leszko --- demon_workflow.json | 223 ++++++++++++++++ src/scope/core/nodes/__init__.py | 4 +- src/scope/core/nodes/builtins/__init__.py | 3 +- src/scope/core/nodes/builtins/audio_io.py | 304 ++++++++++++++++++++++ src/scope/core/nodes/processor.py | 198 ++++++++++++-- src/scope/server/app.py | 5 + src/scope/server/audio_track.py | 62 +++++ src/scope/server/graph_executor.py | 16 ++ src/scope/server/headless.py | 37 ++- src/scope/server/mcp_router.py | 50 +++- src/scope/server/webrtc.py | 49 +++- 11 files changed, 905 insertions(+), 46 deletions(-) create mode 100644 demon_workflow.json create mode 100644 src/scope/core/nodes/builtins/audio_io.py diff --git a/demon_workflow.json b/demon_workflow.json new file mode 100644 index 000000000..1fce450fc --- /dev/null +++ b/demon_workflow.json @@ -0,0 +1,223 @@ +{ + "format": "scope-workflow", + "format_version": "1.0", + "metadata": { + "name": "DEMON Realtime Cover", + "created_at": "2026-05-13T00:00:00.000Z", + "scope_version": "0.2.4" + }, + "pipelines": [ + { + "pipeline_id": "audio.AudioSource", + "source": { "type": "builtin" }, + "loras": [], + "params": {} + }, + { + "pipeline_id": "acestep.LoadModel", + "source": { "type": "local", "plugin_name": "demon", "package_spec": "demon" }, + "loras": [], + "params": {} + }, + { + "pipeline_id": "acestep.VAEEncodeAudio", + "source": { "type": "local", "plugin_name": "demon", "package_spec": "demon" }, + "loras": [], + "params": {} + }, + { + "pipeline_id": "acestep.SemanticExtract", + "source": { "type": "local", "plugin_name": "demon", "package_spec": "demon" }, + "loras": [], + "params": {} + }, + { + "pipeline_id": "acestep.EncodeText", + "source": { "type": "local", "plugin_name": "demon", "package_spec": "demon" }, + "loras": [], + "params": {} + }, + { + "pipeline_id": "acestep.EncodeConditioning", + "source": { "type": "local", "plugin_name": "demon", "package_spec": "demon" }, + "loras": [], + "params": {} + }, + { + "pipeline_id": "acestep.OdeSolver", + "source": { "type": "local", "plugin_name": "demon", "package_spec": "demon" }, + "loras": [], + "params": {} + }, + { + "pipeline_id": "acestep.StreamDenoise", + "source": { "type": "local", "plugin_name": "demon", "package_spec": "demon" }, + "loras": [], + "params": {} + }, + { + "pipeline_id": "acestep.StreamVAEDecode", + "source": { "type": "local", "plugin_name": "demon", "package_spec": "demon" }, + "loras": [], + "params": {} + } + ], + "graph": { + "nodes": [ + { + "id": "audio_model", + "type": "node", + "node_type_id": "acestep.LoadModel", + "params": { + "config_path": "acestep-v15-turbo", + "device": "cuda", + "use_flash_attention": true, + "decoder_backend": "eager", + "vae_backend": "eager", + "project_root": "" + }, + "x": 50, + "y": 100, + "w": 360, + "h": 220 + }, + { + "id": "audio_source", + "type": "node", + "node_type_id": "audio.AudioSource", + "params": { + "file_id": "/tmp/demon_input.wav", + "duration": 120, + "mode": "full", + "pacing": "realtime" + }, + "x": 50, + "y": 500, + "w": 320, + "h": 200 + }, + { + "id": "audio_encode", + "type": "node", + "node_type_id": "acestep.VAEEncodeAudio", + "params": {}, + "x": 470, + "y": 380, + "w": 240, + "h": 140 + }, + { + "id": "audio_semantic", + "type": "node", + "node_type_id": "acestep.SemanticExtract", + "params": {}, + "x": 800, + "y": 100, + "w": 240, + "h": 140 + }, + { + "id": "audio_text", + "type": "node", + "node_type_id": "acestep.EncodeText", + "params": { + "tags": "dubstep heavy bass", + "lyrics": "", + "task_type": "cover", + "bpm": 134, + "duration": 120, + "key": "G minor", + "language": "en" + }, + "x": 800, + "y": 600, + "w": 280, + "h": 280 + }, + { + "id": "audio_cond", + "type": "node", + "node_type_id": "acestep.EncodeConditioning", + "params": {}, + "x": 1140, + "y": 420, + "w": 240, + "h": 160 + }, + { + "id": "audio_solver", + "type": "node", + "node_type_id": "acestep.OdeSolver", + "params": {}, + "x": 1140, + "y": 200, + "w": 240, + "h": 120 + }, + { + "id": "audio_stream", + "type": "node", + "node_type_id": "acestep.StreamDenoise", + "params": { + "denoise": 0.6, + "seed": 42, + "shift": 3, + "pipeline_depth": 4, + "steps": 8, + "drain": false, + "duration": 120 + }, + "x": 1500, + "y": 380, + "w": 320, + "h": 360 + }, + { + "id": "audio_decode", + "type": "node", + "node_type_id": "acestep.StreamVAEDecode", + "params": { + "vae_window": 5, + "vae_overlap": 0.5, + "t_start": 0, + "follow_playhead": true, + "mse_skip_threshold": 0.001, + "prefetch_seconds": 1 + }, + "x": 1880, + "y": 380, + "w": 320, + "h": 280 + }, + { + "id": "output", + "type": "sink", + "x": 2260, + "y": 420, + "w": 240, + "h": 200 + } + ], + "edges": [ + {"from": "audio_model", "from_port": "vae", "to_node": "audio_encode", "to_port": "vae", "kind": "stream"}, + {"from": "audio_source", "from_port": "audio", "to_node": "audio_encode", "to_port": "audio", "kind": "stream"}, + {"from": "audio_model", "from_port": "model", "to_node": "audio_semantic", "to_port": "model", "kind": "stream"}, + {"from": "audio_encode", "from_port": "latent", "to_node": "audio_semantic", "to_port": "latent", "kind": "stream"}, + {"from": "audio_model", "from_port": "clip", "to_node": "audio_text", "to_port": "clip", "kind": "stream"}, + {"from": "audio_model", "from_port": "model", "to_node": "audio_cond", "to_port": "model", "kind": "stream"}, + {"from": "audio_text", "from_port": "text_embed", "to_node": "audio_cond", "to_port": "text_embed", "kind": "stream"}, + {"from": "audio_encode", "from_port": "latent", "to_node": "audio_cond", "to_port": "timbre_ref", "kind": "stream"}, + {"from": "audio_model", "from_port": "model", "to_node": "audio_stream", "to_port": "model", "kind": "stream"}, + {"from": "audio_solver", "from_port": "solver", "to_node": "audio_stream", "to_port": "solver", "kind": "stream"}, + {"from": "audio_cond", "from_port": "conditioning", "to_node": "audio_stream", "to_port": "positive", "kind": "stream"}, + {"from": "audio_encode", "from_port": "latent", "to_node": "audio_stream", "to_port": "source_latent", "kind": "stream"}, + {"from": "audio_semantic", "from_port": "latent", "to_node": "audio_stream", "to_port": "context_latent", "kind": "stream"}, + {"from": "audio_model", "from_port": "vae", "to_node": "audio_decode", "to_port": "vae", "kind": "stream"}, + {"from": "audio_stream", "from_port": "latent", "to_node": "audio_decode", "to_port": "latent", "kind": "stream"}, + {"from": "audio_decode", "from_port": "audio", "to_node": "output", "to_port": "audio", "kind": "stream"} + ], + "ui_state": { + "node_params": {} + } + } +} diff --git a/src/scope/core/nodes/__init__.py b/src/scope/core/nodes/__init__.py index 993af53b1..22fe9c951 100644 --- a/src/scope/core/nodes/__init__.py +++ b/src/scope/core/nodes/__init__.py @@ -11,16 +11,18 @@ """ from .base import BaseNode, Node, NodeDefinition, NodeParam, NodePort, Requirements -from .builtins import SchedulerNode +from .builtins import AudioSourceNode, SchedulerNode from .registry import NodeRegistry def register_builtin_nodes() -> None: """Register all built-in node types shipped with the foundation.""" NodeRegistry.register(SchedulerNode) + NodeRegistry.register(AudioSourceNode) __all__ = [ + "AudioSourceNode", "BaseNode", "Node", "NodeDefinition", diff --git a/src/scope/core/nodes/builtins/__init__.py b/src/scope/core/nodes/builtins/__init__.py index eb5d79632..eec3bf677 100644 --- a/src/scope/core/nodes/builtins/__init__.py +++ b/src/scope/core/nodes/builtins/__init__.py @@ -1,5 +1,6 @@ """Built-in nodes shipped with the foundation abstraction.""" +from .audio_io import AudioSourceNode from .scheduler import SchedulerNode -__all__ = ["SchedulerNode"] +__all__ = ["AudioSourceNode", "SchedulerNode"] diff --git a/src/scope/core/nodes/builtins/audio_io.py b/src/scope/core/nodes/builtins/audio_io.py new file mode 100644 index 000000000..0f976a518 --- /dev/null +++ b/src/scope/core/nodes/builtins/audio_io.py @@ -0,0 +1,304 @@ +"""Built-in audio I/O nodes: AudioSource (WAV file → audio stream). + +Terminal audio output is handled by the regular Sink node: audio edges +into a Sink are routed straight to the WebRTC audio track via the +session's audio_output_queue, with no intermediate node needed. +""" + +from __future__ import annotations + +import logging +import os +import struct +import time +from typing import Any, ClassVar + +import numpy as np +import torch + +from ..base import BaseNode, NodeDefinition, NodeParam, NodePort + +logger = logging.getLogger(__name__) + +SAMPLE_RATE = 48000 +CHUNK_DURATION = 0.1 # 100ms chunks for streaming +CHUNK_SAMPLES = int(SAMPLE_RATE * CHUNK_DURATION) + + +def _read_wav_float32(path: str) -> tuple[np.ndarray, int]: + """Parse a WAV file into float32 samples without the stdlib ``wave`` + module, which rejects IEEE-float (format 3) files. + + Returns (data, sample_rate) where ``data`` has shape (samples, channels). + Supports formats 1 (PCM int) and 3 (IEEE float) — the two common cases. + WAVE_FORMAT_EXTENSIBLE (0xFFFE) is unwrapped to its underlying format. + """ + with open(path, "rb") as f: + header = f.read(12) + if len(header) < 12 or header[:4] != b"RIFF" or header[8:12] != b"WAVE": + raise ValueError(f"Not a WAV file: {path}") + + fmt_code: int | None = None + n_channels = 1 + sample_rate = 0 + bits_per_sample = 0 + pcm_bytes = b"" + + while True: + chunk_header = f.read(8) + if len(chunk_header) < 8: + break + chunk_id, chunk_size = struct.unpack("<4sI", chunk_header) + chunk_data = f.read(chunk_size) + if chunk_size % 2 == 1: + f.read(1) # RIFF pads odd-sized chunks + + if chunk_id == b"fmt " and len(chunk_data) >= 16: + ( + fmt_code, + n_channels, + sample_rate, + _byte_rate, + _block_align, + bits_per_sample, + ) = struct.unpack("= 26: + fmt_code = struct.unpack(" None: + # Per-session playback state must reset between sessions — + # PipelineManager caches node instances across Run/Stop cycles, so + # without this the next session would see the "already emitted" + # latch from the previous run and stay silent. Audio buffer is + # left intact to avoid re-decoding the file on every restart. + self._position = 0 + self._last_call_time = None + self._full_emitted_key = None + + @classmethod + def get_definition(cls) -> NodeDefinition: + return NodeDefinition( + node_type_id=cls.node_type_id, + display_name="Audio Source", + category="audio", + description="Load audio from a WAV file at 48kHz stereo.", + continuous=True, + inputs=[], + outputs=[ + NodePort(name="audio", port_type="audio", description="Audio waveform"), + ], + params=[ + NodeParam( + name="file_id", + param_type="string", + default="", + description="Audio file path", + ), + NodeParam( + name="duration", + param_type="number", + default=15.0, + description="Duration (s)", + ui={"min": 1, "max": 600, "step": 1}, + ), + NodeParam( + name="mode", + param_type="select", + default="full", + description="Output mode", + ui={"options": ["full", "stream"]}, + ), + NodeParam( + name="pacing", + param_type="select", + default="realtime", + description="Pacing", + ui={"options": ["realtime", "downstream"]}, + ), + ], + ) + + def _load_audio(self, file_path: str, duration: float) -> None: + """Load, decode, resample to 48kHz stereo, and clip to duration.""" + data, sr = _read_wav_float32(file_path) # (samples, channels) + + if data.shape[1] == 1: + data = np.concatenate([data, data], axis=1) + elif data.shape[1] > 2: + data = data[:, :2] + data = data.T # (channels, samples) + + if sr != SAMPLE_RATE and sr > 0: + num_samples = data.shape[1] + new_len = int(num_samples * SAMPLE_RATE / sr) + old_indices = np.linspace(0, num_samples - 1, new_len) + resampled = np.zeros((data.shape[0], new_len), dtype=np.float32) + for ch in range(data.shape[0]): + resampled[ch] = np.interp(old_indices, np.arange(num_samples), data[ch]) + data = resampled + + max_samples = int(duration * SAMPLE_RATE) + if data.shape[1] > max_samples: + data = data[:, :max_samples] + + self._audio_data = data + self._position = 0 + self._loaded_file = file_path + logger.info( + "AudioSource loaded: %s (%.1fs)", + file_path, + data.shape[1] / SAMPLE_RATE, + ) + + def execute(self, inputs: dict[str, Any], **kwargs) -> dict[str, Any]: + file_id = kwargs.get("file_id", "") + duration = float(kwargs.get("duration", 15.0)) + # "full" = emit entire clip once (for batch DAGs); "stream" = 100ms chunks + mode = kwargs.get("mode", "stream") + pacing = kwargs.get("pacing", "realtime") + + if not file_id: + return {} + file_id = self._resolve_path(file_id) + if not file_id: + return {} + + if file_id != self._loaded_file: + try: + self._load_audio(file_id, duration) + except Exception as e: + logger.error("AudioSourceNode failed to load %s: %s", file_id, e) + return {} + + if self._audio_data is None or self._audio_data.shape[1] == 0: + return {} + + if mode == "full": + # Emit the entire clip once per (file, mode) pair and then + # stay silent. + key = (self._loaded_file, "full") + if self._full_emitted_key == key: + return {} + self._full_emitted_key = key + return self._emit_full() + # Stream mode: clear the full-mode flag so switching back to full + # later re-emits once. + self._full_emitted_key = None + return self._emit_chunk(pacing=pacing) + + @staticmethod + def _resolve_path(file_id: str) -> str | None: + """Resolve a file path. Absolute → cwd → ~/.daydream-scope/assets.""" + if os.path.isabs(file_id) and os.path.exists(file_id): + return file_id + if os.path.exists(file_id): + return os.path.abspath(file_id) + from pathlib import Path + + candidate = Path.home() / ".daydream-scope" / "assets" / file_id + if candidate.exists(): + return str(candidate) + logger.warning("AudioSource: file not found: %s", file_id) + return None + + def _emit_full(self) -> dict[str, Any]: + return {"audio": (torch.from_numpy(self._audio_data.copy()), SAMPLE_RATE)} + + def _emit_chunk(self, pacing: str = "realtime") -> dict[str, Any]: + # Pace to real-time unless downstream-paced — in that mode the + # maxsize=1 edge queues handle rate limiting via backpressure. + if pacing != "downstream": + now = time.monotonic() + if self._last_call_time is not None: + elapsed = now - self._last_call_time + if elapsed < CHUNK_DURATION * 0.8: + time.sleep(CHUNK_DURATION - elapsed) + self._last_call_time = time.monotonic() + else: + self._last_call_time = None + + total = self._audio_data.shape[1] + if self._position >= total: + return {} + + chunk = np.zeros((self._audio_data.shape[0], CHUNK_SAMPLES), dtype=np.float32) + remaining = CHUNK_SAMPLES + offset = 0 + while remaining > 0: + avail = min(remaining, total - self._position) + if avail <= 0: + # Stop at end of clip; emit a partial chunk (remaining + # samples stay zero-padded). + break + chunk[:, offset : offset + avail] = self._audio_data[ + :, self._position : self._position + avail + ] + self._position += avail + offset += avail + remaining -= avail + return {"audio": (torch.from_numpy(chunk), SAMPLE_RATE)} diff --git a/src/scope/core/nodes/processor.py b/src/scope/core/nodes/processor.py index 68c970b10..839d4c1dd 100644 --- a/src/scope/core/nodes/processor.py +++ b/src/scope/core/nodes/processor.py @@ -5,6 +5,7 @@ """ import logging +import os import queue import threading from typing import Any @@ -15,14 +16,18 @@ SLEEP_TIME = 0.01 +# Set SCOPE_NODE_TRACE=1 for per-node diagnostics: param updates, +# route-outputs backpressure events. Off by default. +_TRACE = os.environ.get("SCOPE_NODE_TRACE", "").lower() in ("1", "true", "yes") + class NodeProcessor: """Runs a BaseNode in a dedicated thread. Input queues feed the node, output queues fan out its results to downstream nodes. Source nodes (no inputs) execute once by default; nodes marked - ``continuous=True`` in their definition re-execute on every tick so - streaming sources and sinks stay alive. + ``continuous=True`` in their definition re-execute on every tick, which + is how streaming sources (audio) and sinks (audio loop) stay alive. """ def __init__( @@ -43,10 +48,24 @@ def __init__( definition = node.get_definition() - # Consumed by FrameProcessor.get_audio_packet() on the sink feeder. - # Kept here (even without a routing implementation) so a NodeProcessor - # can stand in as the sink's feeder without crashing the audio path. - self.audio_output_queue: queue.Queue = queue.Queue(maxsize=10) + # Output ports wired to a sink node via graph_executor. Only these + # route through ``audio_output_queue`` → FrameProcessor.get_audio(). + # A node whose audio output feeds another node (e.g. AudioSource → + # VAEEncodeAudio) must NOT push to audio_output_queue: with + # maxsize=1 + blocking put, nothing would ever drain it and the + # worker would deadlock after the second emission. + self.audio_sink_ports: set[str] = set() + # Names of parameters this node declares. Global param updates + # (no node_id) are broadcast to every processor; this filter + # keeps a graph-level tweak from spuriously marking every custom + # node for re-execution. + self._declared_param_names: set[str] = {p.name for p in definition.params} + + # Audio output queue consumed by FrameProcessor.get_audio() on the + # sink. Size 1 + blocking producer (see _route_audio) gives us + # backpressure: the audio decoder stalls until AudioProcessingTrack + # has served enough of the current chunk to pull a new one. + self.audio_output_queue: queue.Queue = queue.Queue(maxsize=1) self.worker_thread: threading.Thread | None = None self.shutdown_event = threading.Event() @@ -56,6 +75,15 @@ def __init__( self._source_executed = False self._has_executed = False self._continuous = definition.continuous + # Cached inputs from the last successful run, replayed when + # parameters change or when a static upstream (one-shot model + # handle) never re-emits. + self._last_inputs: dict[str, Any] = {} + self._needs_rerun = False + # Fresh values drained from input queues while waiting for every + # port to catch up. Draining unblocks upstream producers without + # committing to a run yet. + self._pending_inputs: dict[str, Any] = {} # PipelineProcessor interface compatibility: graph_executor populates # this for every processor; kept as an empty dict so that write is safe. @@ -91,7 +119,21 @@ def stop(self) -> None: logger.info("NodeProcessor stopped: %s", self.node_id) def update_parameters(self, parameters: dict[str, Any]) -> None: + # Only mark the node dirty when the update actually touches a + # parameter this node declares AND the value differs from the + # current one. FrameProcessor.update_parameters broadcasts + # global updates (no node_id) to every processor, so without + # this guard a stream-level tweak would fire _needs_rerun on + # every custom node and cascade through the DAG. + changed = False + for key, value in parameters.items(): + if key not in self._declared_param_names: + continue + if self.parameters.get(key) != value: + changed = True self.parameters.update(parameters) + if changed: + self._needs_rerun = True def set_beat_cache_reset_rate(self, rate): # PipelineProcessor compat pass @@ -122,46 +164,80 @@ def _process_once(self) -> None: all_queues = dict(self.input_queues) is_source_node = not all_queues - - # Source nodes execute once; continuous=True nodes re-execute every - # tick (for streaming I/O). - if is_source_node and self._source_executed and not self._continuous: - self.shutdown_event.wait(1.0) - return - - # Gather inputs. Continuous nodes consume whatever's available - # (empty inputs stay absent). Non-continuous nodes wait until every - # input queue has data, so they execute with a complete input set. inputs: dict[str, Any] = {} - if all_queues: - if self._continuous: + + if is_source_node: + # Source nodes run once, then re-run only when params change + # (or every tick when continuous). + if self._source_executed and not self._continuous and not self._needs_rerun: + self.shutdown_event.wait(1.0) + return + elif self._continuous: + # Continuous nodes: pick up whatever's currently in the queues + # and run every tick. Latched values from prior ticks are + # merged in so static upstreams (model/vae/clip handles) + # persist across ticks. + for port_name, q in all_queues.items(): + try: + inputs[port_name] = q.get_nowait() + except queue.Empty: + pass + # Wait until every port has been seen at least once. + if not self._has_executed and ( + set(all_queues.keys()) - inputs.keys() - self._last_inputs.keys() + ): + # Cache what arrived so we don't lose it while waiting. + self._last_inputs.update(inputs) + self.shutdown_event.wait(SLEEP_TIME) + return + # Merge with latched: fresh values override cache. + merged = {**self._last_inputs, **inputs} + inputs = merged + else: + # Non-continuous node with inputs: + # - First run waits until every port has received data at least + # once so the latch cache (_last_inputs) is populated. + # - Subsequent runs drain fresh values into _pending_inputs (to + # unblock upstream producers) and fire when at least one + # port has fresh data, OR when params change. + # - Static ports — upstreams that never re-emit, e.g. + # MODEL/VAE handles — are latched from _last_inputs. + if not self._has_executed: + if any(q.empty() for q in all_queues.values()): + self.shutdown_event.wait(SLEEP_TIME) + return + inputs = {name: q.get_nowait() for name, q in all_queues.items()} + else: for port_name, q in all_queues.items(): try: - inputs[port_name] = q.get_nowait() + self._pending_inputs[port_name] = q.get_nowait() except queue.Empty: pass - else: - if any(q.empty() for q in all_queues.values()): + fire = self._needs_rerun or bool(self._pending_inputs) + if not fire: self.shutdown_event.wait(SLEEP_TIME) return - inputs = {name: q.get_nowait() for name, q in all_queues.items()} - - # Non-continuous nodes skip re-execution when no new inputs arrived - # and they already have a cached output. - if self._has_executed and not inputs and not self._continuous: - self.shutdown_event.wait(SLEEP_TIME) - return + inputs = {} + for port_name in all_queues: + if port_name in self._pending_inputs: + inputs[port_name] = self._pending_inputs[port_name] + elif port_name in self._last_inputs: + inputs[port_name] = self._last_inputs[port_name] + self._pending_inputs.clear() outputs = self.node.execute(inputs, **self.parameters) if is_source_node: self._source_executed = True + self._needs_rerun = False if not outputs: self.shutdown_event.wait(SLEEP_TIME) return self._has_executed = True + if inputs: + self._last_inputs.update(inputs) self._route_outputs(outputs) def _route_outputs(self, outputs: dict[str, Any]) -> None: @@ -169,6 +245,18 @@ def _route_outputs(self, outputs: dict[str, Any]) -> None: if value is None: continue + # Audio outputs also feed FrameProcessor's audio path — but + # only for ports that graph_executor wired to a sink. An + # intermediate audio-producing node (AudioSource → encoder) + # must NOT push to audio_output_queue: with maxsize=1 + + # blocking put, nothing drains it and the worker would + # deadlock after the second emission. + if port_name in self.audio_sink_ports: + self._route_audio(value) + # When this audio output also routes to a sink, don't + # also fan it out as a generic stream queue: the sink + # only consumes via audio_output_queue. + # Fan out to all downstream queues on this port. Block briefly # when queues are full so producers throttle to consumer pace # and GPU tensors don't pile up in memory. @@ -181,3 +269,57 @@ def _route_outputs(self, outputs: dict[str, Any]) -> None: break except queue.Full: continue + + def _route_audio(self, value: Any) -> None: + """Extract audio tensor and push to audio_output_queue for WebRTC.""" + from fractions import Fraction + + import torch + + from scope.server.media_packets import AudioPacket, MediaTimestamp + + start_sample: int | None = None + if isinstance(value, tuple) and len(value) == 2: + audio_tensor, audio_sr = value + else: + audio_tensor = getattr(value, "waveform", None) + audio_sr = getattr(value, "sample_rate", 48000) + # ACEStep StreamVAEDecode tags each window with a ``start_sample`` + # offset (window t_start * sample_rate). Without this, overlapping + # windows produced by ``follow_playhead`` are appended back-to-back + # in AudioProcessingTrack and the overlapped region plays twice — + # heard as "parts repeated". Forward the offset as a PTS so the + # sink can detect and trim duplicates. + start_sample = getattr(value, "start_sample", None) + if audio_tensor is None: + return + if isinstance(audio_tensor, torch.Tensor): + if audio_tensor.is_cuda: + audio_tensor = audio_tensor.detach().cpu() + # VAE decoders (e.g. ACEStep) return (1, C, T); the audio + # track expects (C, T). Drop a leading singleton batch dim. + if audio_tensor.dim() == 3 and audio_tensor.shape[0] == 1: + audio_tensor = audio_tensor.squeeze(0) + # Upcast bfloat16/float16 to float32 before numpy conversion + # (AudioProcessingTrack calls .numpy() on the tensor). + if audio_tensor.dtype in (torch.bfloat16, torch.float16): + audio_tensor = audio_tensor.float() + + timestamp = MediaTimestamp() + if start_sample is not None and audio_sr: + timestamp = MediaTimestamp( + pts=int(start_sample), time_base=Fraction(1, int(audio_sr)) + ) + packet = AudioPacket( + audio=audio_tensor, sample_rate=int(audio_sr), timestamp=timestamp + ) + # Blocking-with-retry put. Stalls the worker thread when the audio + # track hasn't finished serving the previous chunk — this is the + # backpressure that rate-limits batch generators to real-time + # playback instead of silently dropping audio. + while not self.shutdown_event.is_set(): + try: + self.audio_output_queue.put(packet, timeout=0.1) + break + except queue.Full: + continue diff --git a/src/scope/server/app.py b/src/scope/server/app.py index 7a6ec2f7c..567a67040 100644 --- a/src/scope/server/app.py +++ b/src/scope/server/app.py @@ -211,6 +211,11 @@ def _configure_logging(): logging.getLogger("scope.server").setLevel(logging.INFO) logging.getLogger("scope.core").setLevel(logging.INFO) + # Plugin-specific tracing knobs — opt-in via env var, off by default + # so the regular log stream stays clean for non-plugin sessions. + if os.getenv("ACESTEP_BRIDGE_TRACE", "").lower() in ("1", "true", "yes"): + logging.getLogger("scope_plugin").setLevel(logging.INFO) + # Set INFO level for uvicorn logging.getLogger("uvicorn.error").setLevel(logging.INFO) diff --git a/src/scope/server/audio_track.py b/src/scope/server/audio_track.py index 0568e477f..7e6e4632c 100644 --- a/src/scope/server/audio_track.py +++ b/src/scope/server/audio_track.py @@ -2,7 +2,9 @@ import collections import fractions import logging +import threading import time +import weakref import numpy as np from aiortc import MediaStreamTrack @@ -25,6 +27,27 @@ AUDIO_MAX_BUFFER_SAMPLES = AUDIO_CLOCK_RATE * 60 +# Registry of live audio tracks so graph nodes that need the playhead (e.g. +# DEMON's StreamVAEDecode skip gate, which mirrors the realtime demo's +# ``audio_eng.position / SAMPLE_RATE``) can query it without a hard +# dependency on FrameProcessor. Weak refs so closed tracks drop out. +_PLAYHEAD_LOCK = threading.Lock() +_PLAYHEAD_TRACKS: "weakref.WeakSet[AudioProcessingTrack]" = weakref.WeakSet() + + +def get_current_playhead_seconds() -> float | None: + """Return the playhead position of the first live audio track, in seconds. + + Returns None if no track is registered yet or none are live. Callers + should treat None as "skip gate disabled this tick". + """ + with _PLAYHEAD_LOCK: + for track in _PLAYHEAD_TRACKS: + if track.readyState == "live": + return track.playhead_seconds + return None + + class AudioProcessingTrack(MediaStreamTrack): """WebRTC audio track that streams generated audio from the pipeline. @@ -56,6 +79,24 @@ def __init__( self._start: float | None = None self._timestamp: int = 0 self._last_preserved_pts: int | None = None + # Per-channel sample index where the next contiguous chunk is + # expected to begin. Used to detect and trim overlap when a + # streaming decoder (e.g. ACEStep StreamVAEDecode with + # follow_playhead) emits windows that overlap in time. None + # means "no PTS reference yet" — the next valid PTS sets it. + self._next_expected_pts: int | None = None + + with _PLAYHEAD_LOCK: + _PLAYHEAD_TRACKS.add(self) + + @property + def playhead_seconds(self) -> float: + """Current playback position in seconds (monotonic recv timestamp). + + Mirrors DEMON's ``audio_eng.position / SAMPLE_RATE`` read. Value + is 0 before the first ``recv`` call. + """ + return self._timestamp / AUDIO_CLOCK_RATE @staticmethod def _resample_audio( @@ -224,8 +265,29 @@ def _drain_audio_packets(self) -> None: audio_packet.timestamp.time_base ) chunk_pts = int(round(media_ts * AUDIO_CLOCK_RATE)) + + # Overlap trim: when a streaming decoder emits windows whose + # start_sample retreats into already-buffered territory (e.g. + # ACEStep StreamVAEDecode with vae_overlap > 0 + follow_playhead), + # drop the duplicate prefix so the listener hears each sample + # exactly once. Without this the overlap region plays twice. + if ( + chunk_pts is not None + and self._next_expected_pts is not None + and chunk_pts < self._next_expected_pts + ): + overlap_per_ch = self._next_expected_pts - chunk_pts + trim_samples = overlap_per_ch * self.channels + if trim_samples >= len(interleaved): + # Entire chunk lies in the past — drop it. + continue + interleaved = interleaved[trim_samples:] + chunk_pts = self._next_expected_pts + self._chunks.append((interleaved, chunk_pts)) self._buffered_samples += len(interleaved) + if chunk_pts is not None: + self._next_expected_pts = chunk_pts + len(interleaved) // self.channels def _trim_buffer(self) -> None: # Cap buffer to prevent unbounded growth. diff --git a/src/scope/server/graph_executor.py b/src/scope/server/graph_executor.py index b6430bf00..abf0540b4 100644 --- a/src/scope/server/graph_executor.py +++ b/src/scope/server/graph_executor.py @@ -252,6 +252,22 @@ def _attach_source_output_queue(source_node_id: str, q: queue.Queue) -> None: for e in graph.edges_to(sink_id): if e.kind == "stream": feeder_proc = node_processors.get(e.from_node) + # Audio edges to sinks are served via audio_output_queue, + # not dedicated sink queues — skip queue allocation so the + # feeder isn't blocked on a queue nobody drains. + if e.from_port == "audio" or e.to_port == "audio": + if feeder_proc is not None: + sink_processors_by_node[sink_id] = feeder_proc + sink_ports = getattr(feeder_proc, "audio_sink_ports", None) + if sink_ports is not None: + sink_ports.add(e.from_port) + logger.info( + "Sink %s: audio routed from %s port '%s' via audio_output_queue", + sink_id, + e.from_node, + e.from_port, + ) + break sink_node = node_by_id[sink_id] sink_mode = sink_node.sink_mode # WebRTC preview reads sink_queues_by_node; NDI/Spout/Syphon threads diff --git a/src/scope/server/headless.py b/src/scope/server/headless.py index 3ffd357a0..451cb65a1 100644 --- a/src/scope/server/headless.py +++ b/src/scope/server/headless.py @@ -70,8 +70,9 @@ async def iter_chunks(self): class HeadlessTsStreamer(HeadlessMediaSink): """Streams headless output as MPEG-TS using PyAV.""" - def __init__(self, expect_audio: bool): + def __init__(self, expect_audio: bool, expect_video: bool = True): self._expect_audio = expect_audio + self._expect_video = expect_video self._buffer = _TsStreamBuffer() self._container = None self._video_stream = None @@ -86,12 +87,13 @@ def _init_container(self, width: int, height: int): import av self._container = av.open(self._buffer, "w", format="mpegts") - self._video_stream = self._container.add_stream( - "libx264", rate=int(RECORDING_MAX_FPS) - ) - self._video_stream.width = width + (width % 2) - self._video_stream.height = height + (height % 2) - self._video_stream.pix_fmt = "yuv420p" + if self._expect_video: + self._video_stream = self._container.add_stream( + "libx264", rate=int(RECORDING_MAX_FPS) + ) + self._video_stream.width = width + (width % 2) + self._video_stream.height = height + (height % 2) + self._video_stream.pix_fmt = "yuv420p" if self._expect_audio: self._audio_stream = self._container.add_stream( "aac", rate=AUDIO_CLOCK_RATE @@ -100,7 +102,7 @@ def _init_container(self, width: int, height: int): self._initialized = True def on_video_frame(self, video_frame) -> None: - if self._closed: + if self._closed or not self._expect_video: return import av @@ -135,7 +137,17 @@ def on_audio_chunk( import numpy as np with self._lock: - if self._closed or not self._initialized or self._audio_stream is None: + if self._closed: + return + if not self._initialized: + # Audio-only graphs (e.g. DEMON music covers) never deliver + # a video frame to bootstrap the container, so init from + # the first audio chunk. + if not self._expect_video: + self._init_container(0, 0) + else: + return + if self._audio_stream is None: return audio_np = audio_tensor.numpy() if audio_np.ndim == 1: @@ -330,11 +342,13 @@ def __init__( self, frame_processor: "FrameProcessor", expect_audio: bool = False, + expect_video: bool = True, ): from .frame_processor import FrameProcessor self.frame_processor: FrameProcessor = frame_processor self.expect_audio = expect_audio + self.expect_video = expect_video # In graph mode this tracks the most recently consumed frame across all # sink queues, not a canonical sink. Callers that need stable per-sink # capture should pass sink_node_id to get_last_frame(). @@ -452,7 +466,10 @@ def remove_media_sink(self, sink: HeadlessMediaSink) -> None: self._media_sinks.remove(sink) def create_ts_streamer(self) -> HeadlessTsStreamer: - streamer = HeadlessTsStreamer(expect_audio=self.expect_audio) + streamer = HeadlessTsStreamer( + expect_audio=self.expect_audio, + expect_video=self.expect_video, + ) self.add_media_sink(streamer) return streamer diff --git a/src/scope/server/mcp_router.py b/src/scope/server/mcp_router.py index e0286293e..8cdd7716e 100644 --- a/src/scope/server/mcp_router.py +++ b/src/scope/server/mcp_router.py @@ -40,6 +40,39 @@ def _get_pipeline_manager() -> "PipelineManager": return pipeline_manager +def _graph_has_video_to_sink(graph_config) -> bool: + """True when any video-typed edge feeds a sink — i.e. the graph is not audio-only.""" + sink_ids = set(graph_config.get_sink_node_ids()) + if not sink_ids: + return False + for e in graph_config.edges: + if e.kind != "stream" or e.to_node not in sink_ids: + continue + if e.from_port == "audio" or e.to_port == "audio": + continue + return True + return False + + +def _graph_produces_audio(graph_config) -> bool: + """True when any audio-typed output port in the graph feeds a sink. + + Covers plain-node graphs (DEMON, TTS) that don't have a config-driven + pipeline node declaring ``produces_audio=True``. Detection is purely + structural: an edge whose ``from_port`` / ``to_port`` is named + ``audio`` and whose target is a sink node. + """ + sink_ids = set(graph_config.get_sink_node_ids()) + if not sink_ids: + return False + for e in graph_config.edges: + if e.kind != "stream": + continue + if e.to_node in sink_ids and (e.from_port == "audio" or e.to_port == "audio"): + return True + return False + + # --------------------------------------------------------------------------- # Parameter Control # --------------------------------------------------------------------------- @@ -333,9 +366,24 @@ async def start_stream( detail="FrameProcessor failed to start (check logs for details)", ) + expect_audio = NodeRegistry.chain_produces_audio(pipeline_id_list) + expect_video = True + # Graph-only audio: when no config-driven pipeline declares audio + # but the graph itself carries an audio edge into a sink (e.g. a + # DEMON node graph), the session still expects audio. The same + # check also tells us whether the graph is audio-only, in which + # case the headless TS streamer must skip the video track. + if request.graph is not None: + graph_has_audio_sink = _graph_produces_audio(graph_config) + graph_has_video_sink = _graph_has_video_to_sink(graph_config) + if graph_has_audio_sink: + expect_audio = True + if graph_has_audio_sink and not graph_has_video_sink: + expect_video = False session = HeadlessSession( frame_processor=frame_processor, - expect_audio=NodeRegistry.chain_produces_audio(pipeline_id_list), + expect_audio=expect_audio, + expect_video=expect_video, ) session.start_frame_consumer() webrtc_manager.add_headless_session(session) diff --git a/src/scope/server/webrtc.py b/src/scope/server/webrtc.py index 5e865fcbf..330ee0481 100644 --- a/src/scope/server/webrtc.py +++ b/src/scope/server/webrtc.py @@ -143,6 +143,40 @@ def _parse_graph_node_ids( ) +def _graph_sink_modalities(initial_parameters: dict) -> tuple[bool, bool] | None: + """Inspect graph sink edges; return (has_video, has_audio) or None. + + Returns ``None`` when no graph or no sinks are present (caller should + fall back to pipeline-id-derived modalities). Otherwise returns what + the graph actually emits to its sinks — authoritative over stale + ``pipeline_ids`` that may linger from a previous workflow. + """ + graph_data = initial_parameters.get("graph") + if not isinstance(graph_data, dict): + return None + sink_ids = { + n.get("id") + for n in graph_data.get("nodes", []) or [] + if n.get("type") == "sink" + } + if not sink_ids: + return None + has_video = False + has_audio = False + for e in graph_data.get("edges", []) or []: + if e.get("kind", "stream") != "stream": + continue + if e.get("to_node") not in sink_ids: + continue + from_port = e.get("from_port") + to_port = e.get("to_port") + if from_port == "audio" or to_port == "audio": + has_audio = True + elif from_port == "video" or to_port == "video": + has_video = True + return (has_video, has_audio) + + def _compute_hardware_sink_routes( initial_parameters: dict, all_sink_node_ids: list[str], @@ -430,12 +464,17 @@ async def handle_offer( # Create NotificationSender for this session to send notifications to the frontend notification_sender = NotificationSender() - # Determine media modalities from the local pipeline registry - # (authoritative for local mode). initial_parameters values are not - # used here because they may be stale from a previous pipeline load. + # Determine media modalities. When a graph is present with at least + # one sink, the graph's sink edges are authoritative — pipeline_ids + # may be stale from a previous workflow load (e.g. lingering + # ``longlive`` while the current graph is audio-only DEMON). pipeline_ids = initial_parameters.get("pipeline_ids", []) - produces_video = NodeRegistry.chain_produces_video(pipeline_ids) - produces_audio = NodeRegistry.chain_produces_audio(pipeline_ids) + graph_modalities = _graph_sink_modalities(initial_parameters) + if graph_modalities is not None: + produces_video, produces_audio = graph_modalities + else: + produces_video = NodeRegistry.chain_produces_video(pipeline_ids) + produces_audio = NodeRegistry.chain_produces_audio(pipeline_ids) # Parse graph from initial parameters to find sink/source/record node IDs ( From c98754b81f03df26e61648a6f51cea2603e05ab9 Mon Sep 17 00:00:00 2001 From: Rafal Leszko Date: Wed, 13 May 2026 14:59:58 +0000 Subject: [PATCH 2/9] refactor(audio): consolidate graph-modality detection, drop dead code MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up cleanup of the DEMON integration: - Replace ``_graph_produces_audio`` / ``_graph_has_video_to_sink`` (mcp_router) and ``_graph_sink_modalities`` (webrtc) with a single ``GraphConfig.get_sink_modalities()`` method that returns ``(has_video, has_audio)``. Both call sites now go through the parsed schema instead of re-walking the raw dict. - Drop unused ``SCOPE_NODE_TRACE`` env flag + the bare ``import os`` from ``nodes/processor.py``. - Simplify ``AudioSourceNode``'s "fire once in full mode" latch from a ``(file, mode)`` tuple to a plain ``bool``. - Generalise the plugin trace knob in ``app.py`` to honour both the generic ``SCOPE_PLUGIN_TRACE`` and the legacy ``ACESTEP_BRIDGE_TRACE`` so existing DEMON instructions keep working. Behaviour-preserving — no observable changes. Co-Authored-By: Claude Opus 4.7 (1M context) Signed-off-by: Rafal Leszko --- src/scope/core/nodes/builtins/audio_io.py | 18 ++++----- src/scope/core/nodes/processor.py | 5 --- src/scope/server/app.py | 11 ++++-- src/scope/server/graph_schema.py | 21 +++++++++++ src/scope/server/mcp_router.py | 44 +++------------------- src/scope/server/webrtc.py | 46 +++++------------------ 6 files changed, 52 insertions(+), 93 deletions(-) diff --git a/src/scope/core/nodes/builtins/audio_io.py b/src/scope/core/nodes/builtins/audio_io.py index 0f976a518..3a4ddf787 100644 --- a/src/scope/core/nodes/builtins/audio_io.py +++ b/src/scope/core/nodes/builtins/audio_io.py @@ -126,9 +126,9 @@ def __init__(self, node_id: str, config: dict[str, Any] | None = None): self._loaded_file: str = "" self._last_call_time: float | None = None # In mode=full we emit the entire clip once and then stay silent - # until the loaded file or mode changes. Otherwise every worker - # tick would re-push a multi-second clip and flood the graph. - self._full_emitted_key: tuple[str, str] | None = None + # until shutdown. Otherwise every worker tick would re-push a + # multi-second clip and flood the graph. + self._full_emitted = False def shutdown(self) -> None: # Per-session playback state must reset between sessions — @@ -138,7 +138,7 @@ def shutdown(self) -> None: # left intact to avoid re-decoding the file on every restart. self._position = 0 self._last_call_time = None - self._full_emitted_key = None + self._full_emitted = False @classmethod def get_definition(cls) -> NodeDefinition: @@ -239,16 +239,14 @@ def execute(self, inputs: dict[str, Any], **kwargs) -> dict[str, Any]: return {} if mode == "full": - # Emit the entire clip once per (file, mode) pair and then - # stay silent. - key = (self._loaded_file, "full") - if self._full_emitted_key == key: + # Emit the entire clip once and then stay silent. + if self._full_emitted: return {} - self._full_emitted_key = key + self._full_emitted = True return self._emit_full() # Stream mode: clear the full-mode flag so switching back to full # later re-emits once. - self._full_emitted_key = None + self._full_emitted = False return self._emit_chunk(pacing=pacing) @staticmethod diff --git a/src/scope/core/nodes/processor.py b/src/scope/core/nodes/processor.py index 839d4c1dd..630846732 100644 --- a/src/scope/core/nodes/processor.py +++ b/src/scope/core/nodes/processor.py @@ -5,7 +5,6 @@ """ import logging -import os import queue import threading from typing import Any @@ -16,10 +15,6 @@ SLEEP_TIME = 0.01 -# Set SCOPE_NODE_TRACE=1 for per-node diagnostics: param updates, -# route-outputs backpressure events. Off by default. -_TRACE = os.environ.get("SCOPE_NODE_TRACE", "").lower() in ("1", "true", "yes") - class NodeProcessor: """Runs a BaseNode in a dedicated thread. Input queues feed the node, diff --git a/src/scope/server/app.py b/src/scope/server/app.py index 567a67040..41c592590 100644 --- a/src/scope/server/app.py +++ b/src/scope/server/app.py @@ -211,9 +211,14 @@ def _configure_logging(): logging.getLogger("scope.server").setLevel(logging.INFO) logging.getLogger("scope.core").setLevel(logging.INFO) - # Plugin-specific tracing knobs — opt-in via env var, off by default - # so the regular log stream stays clean for non-plugin sessions. - if os.getenv("ACESTEP_BRIDGE_TRACE", "").lower() in ("1", "true", "yes"): + # Plugin tracing — opt-in via env var, off by default so the regular + # log stream stays clean for non-plugin sessions. Honours both the + # generic ``SCOPE_PLUGIN_TRACE`` flag and any plugin-specific knob a + # plugin itself reads (e.g. DEMON's ``ACESTEP_BRIDGE_TRACE``) so a + # single env var enables both the plugin's verbose path and our log + # routing. + plugin_trace_vars = ("SCOPE_PLUGIN_TRACE", "ACESTEP_BRIDGE_TRACE") + if any(os.getenv(v, "").lower() in ("1", "true", "yes") for v in plugin_trace_vars): logging.getLogger("scope_plugin").setLevel(logging.INFO) # Set INFO level for uvicorn diff --git a/src/scope/server/graph_schema.py b/src/scope/server/graph_schema.py index 6dbcb283e..767e3eec4 100644 --- a/src/scope/server/graph_schema.py +++ b/src/scope/server/graph_schema.py @@ -125,6 +125,27 @@ def get_record_node_ids(self) -> list[str]: """Return node ids that are record nodes.""" return [n.id for n in self.nodes if n.type == "record"] + def get_sink_modalities(self) -> tuple[bool, bool]: + """Return ``(has_video, has_audio)`` from stream edges into sinks. + + Authoritative for "what does this graph emit?" — used in place of + stale ``pipeline_ids`` declarations. Returns ``(False, False)`` when + the graph has no sinks. + """ + sink_ids = set(self.get_sink_node_ids()) + if not sink_ids: + return (False, False) + has_video = False + has_audio = False + for e in self.edges: + if e.kind != "stream" or e.to_node not in sink_ids: + continue + if e.from_port == "audio" or e.to_port == "audio": + has_audio = True + else: + has_video = True + return (has_video, has_audio) + def get_backend_node_ids(self) -> list[str]: """Return node ids that are backend (custom) nodes.""" return [n.id for n in self.nodes if n.type == "node"] diff --git a/src/scope/server/mcp_router.py b/src/scope/server/mcp_router.py index 8cdd7716e..2a426593f 100644 --- a/src/scope/server/mcp_router.py +++ b/src/scope/server/mcp_router.py @@ -40,39 +40,6 @@ def _get_pipeline_manager() -> "PipelineManager": return pipeline_manager -def _graph_has_video_to_sink(graph_config) -> bool: - """True when any video-typed edge feeds a sink — i.e. the graph is not audio-only.""" - sink_ids = set(graph_config.get_sink_node_ids()) - if not sink_ids: - return False - for e in graph_config.edges: - if e.kind != "stream" or e.to_node not in sink_ids: - continue - if e.from_port == "audio" or e.to_port == "audio": - continue - return True - return False - - -def _graph_produces_audio(graph_config) -> bool: - """True when any audio-typed output port in the graph feeds a sink. - - Covers plain-node graphs (DEMON, TTS) that don't have a config-driven - pipeline node declaring ``produces_audio=True``. Detection is purely - structural: an edge whose ``from_port`` / ``to_port`` is named - ``audio`` and whose target is a sink node. - """ - sink_ids = set(graph_config.get_sink_node_ids()) - if not sink_ids: - return False - for e in graph_config.edges: - if e.kind != "stream": - continue - if e.to_node in sink_ids and (e.from_port == "audio" or e.to_port == "audio"): - return True - return False - - # --------------------------------------------------------------------------- # Parameter Control # --------------------------------------------------------------------------- @@ -370,12 +337,13 @@ async def start_stream( expect_video = True # Graph-only audio: when no config-driven pipeline declares audio # but the graph itself carries an audio edge into a sink (e.g. a - # DEMON node graph), the session still expects audio. The same - # check also tells us whether the graph is audio-only, in which - # case the headless TS streamer must skip the video track. + # DEMON node graph), the session still expects audio. When the + # graph carries audio but no video into any sink, the headless TS + # streamer must skip the video track entirely. if request.graph is not None: - graph_has_audio_sink = _graph_produces_audio(graph_config) - graph_has_video_sink = _graph_has_video_to_sink(graph_config) + graph_has_video_sink, graph_has_audio_sink = ( + graph_config.get_sink_modalities() + ) if graph_has_audio_sink: expect_audio = True if graph_has_audio_sink and not graph_has_video_sink: diff --git a/src/scope/server/webrtc.py b/src/scope/server/webrtc.py index 330ee0481..49f2b4b02 100644 --- a/src/scope/server/webrtc.py +++ b/src/scope/server/webrtc.py @@ -28,6 +28,7 @@ from .cloud_track import CloudTrack from .credentials import get_turn_credentials from .frame_processor import FrameProcessor +from .graph_schema import GraphConfig from .headless import HeadlessSession from .kafka_publisher import publish_event from .livepeer import LivepeerConnection @@ -143,40 +144,6 @@ def _parse_graph_node_ids( ) -def _graph_sink_modalities(initial_parameters: dict) -> tuple[bool, bool] | None: - """Inspect graph sink edges; return (has_video, has_audio) or None. - - Returns ``None`` when no graph or no sinks are present (caller should - fall back to pipeline-id-derived modalities). Otherwise returns what - the graph actually emits to its sinks — authoritative over stale - ``pipeline_ids`` that may linger from a previous workflow. - """ - graph_data = initial_parameters.get("graph") - if not isinstance(graph_data, dict): - return None - sink_ids = { - n.get("id") - for n in graph_data.get("nodes", []) or [] - if n.get("type") == "sink" - } - if not sink_ids: - return None - has_video = False - has_audio = False - for e in graph_data.get("edges", []) or []: - if e.get("kind", "stream") != "stream": - continue - if e.get("to_node") not in sink_ids: - continue - from_port = e.get("from_port") - to_port = e.get("to_port") - if from_port == "audio" or to_port == "audio": - has_audio = True - elif from_port == "video" or to_port == "video": - has_video = True - return (has_video, has_audio) - - def _compute_hardware_sink_routes( initial_parameters: dict, all_sink_node_ids: list[str], @@ -469,9 +436,14 @@ async def handle_offer( # may be stale from a previous workflow load (e.g. lingering # ``longlive`` while the current graph is audio-only DEMON). pipeline_ids = initial_parameters.get("pipeline_ids", []) - graph_modalities = _graph_sink_modalities(initial_parameters) - if graph_modalities is not None: - produces_video, produces_audio = graph_modalities + graph_data = initial_parameters.get("graph") + graph_config = ( + GraphConfig.model_validate(graph_data) + if isinstance(graph_data, dict) + else None + ) + if graph_config is not None and graph_config.get_sink_node_ids(): + produces_video, produces_audio = graph_config.get_sink_modalities() else: produces_video = NodeRegistry.chain_produces_video(pipeline_ids) produces_audio = NodeRegistry.chain_produces_audio(pipeline_ids) From 66150123c03a8006dcfe6682d5ace914e3353b69 Mon Sep 17 00:00:00 2001 From: Rafal Leszko Date: Wed, 13 May 2026 15:10:13 +0000 Subject: [PATCH 3/9] refactor(logging): drop plugin-specific env knob from scope logging config MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A plugin should surface its own diagnostics — having Scope hardcode env var names like ``SCOPE_PLUGIN_TRACE`` / ``ACESTEP_BRIDGE_TRACE`` and lift the ``scope_plugin`` logger to INFO based on them puts plugin-specific concerns into core. Each plugin can set its own logger level inside its module init (e.g. DEMON's ``scope_plugin.bridge``) when its own trace flag is set. Co-Authored-By: Claude Opus 4.7 (1M context) Signed-off-by: Rafal Leszko --- src/scope/server/app.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/src/scope/server/app.py b/src/scope/server/app.py index 41c592590..7a6ec2f7c 100644 --- a/src/scope/server/app.py +++ b/src/scope/server/app.py @@ -211,16 +211,6 @@ def _configure_logging(): logging.getLogger("scope.server").setLevel(logging.INFO) logging.getLogger("scope.core").setLevel(logging.INFO) - # Plugin tracing — opt-in via env var, off by default so the regular - # log stream stays clean for non-plugin sessions. Honours both the - # generic ``SCOPE_PLUGIN_TRACE`` flag and any plugin-specific knob a - # plugin itself reads (e.g. DEMON's ``ACESTEP_BRIDGE_TRACE``) so a - # single env var enables both the plugin's verbose path and our log - # routing. - plugin_trace_vars = ("SCOPE_PLUGIN_TRACE", "ACESTEP_BRIDGE_TRACE") - if any(os.getenv(v, "").lower() in ("1", "true", "yes") for v in plugin_trace_vars): - logging.getLogger("scope_plugin").setLevel(logging.INFO) - # Set INFO level for uvicorn logging.getLogger("uvicorn.error").setLevel(logging.INFO) From 1858821671198413976e5b3523af4b0eda3a30b9 Mon Sep 17 00:00:00 2001 From: Rafal Leszko Date: Mon, 18 May 2026 12:33:28 +0000 Subject: [PATCH 4/9] refactor(audio): simplify AudioSource to one-shot emit, add tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Strip ``AudioSourceNode`` to the only path the DEMON workflow actually uses: load a WAV, emit the whole clip, done. Removes a pile of dead streaming-mode machinery that was speculative. Source changes: - Switch ``continuous=True`` → ``continuous=False``. ``NodeProcessor`` now re-runs the node only when a parameter changes, which is the right semantic for "load file, emit clip" — and incidentally fixes the ~100/sec ``AudioSource: file not found`` log flood that ``continuous`` was producing on a missing ``file_id``. - Drop ``_emit_chunk`` (chunked streaming) and ``_emit_full`` (inlined). - Drop the ``mode`` and ``pacing`` ``NodeParam``s; nothing reads them. - Drop ``_position``, ``_last_call_time``, ``_full_emitted`` fields and the ``shutdown()`` reset that only existed to clear them. - Drop the ``CHUNK_DURATION`` / ``CHUNK_SAMPLES`` constants and the ``time`` import. - Drop the defensive ``.copy()`` on the emitted buffer — with one-shot emission there's no aliasing risk. - Collapse ``_resolve_path``'s "is absolute and exists" + "exists" branches into one. - Keep: ``duration`` in the cache key (real bug fix), WAV decoder (PCM 8/16/24/32 + IEEE float 32/64 + ``WAVE_FORMAT_EXTENSIBLE``), resample, mono→stereo, asset-dir fallback. Other follow-ups on the DEMON audio wiring: - Rewrite the ``_route_outputs`` comment so it describes what the code actually does (audio sinks fan out via ``audio_output_queue``; non-sink consumers still go through ``output_queues``). - Collapse the ``_route_audio`` timestamp construction to a single conditional expression. Tests: - ``test_graph_sink_modalities.py`` — 6 cases covering ``GraphConfig.get_sink_modalities()``. - ``test_audio_source_node.py`` — WAV decoder (PCM16, IEEE-float32, malformed input) and ``AudioSourceNode`` (full-clip emission, missing file is silent, duration cache key). - ``TestOverlapTrim`` in ``test_audio_processing_track.py`` — three cases for ACEStep ``StreamVAEDecode`` overlap trimming. Net: ``audio_io.py`` shrinks from 307 → 222 lines. Co-Authored-By: Claude Opus 4.7 (1M context) Signed-off-by: Rafal Leszko --- src/scope/core/nodes/builtins/audio_io.py | 120 +++--------------- src/scope/core/nodes/processor.py | 36 +++--- tests/test_audio_processing_track.py | 80 ++++++++++++ tests/test_audio_source_node.py | 127 +++++++++++++++++++ tests/test_graph_sink_modalities.py | 142 ++++++++++++++++++++++ 5 files changed, 388 insertions(+), 117 deletions(-) create mode 100644 tests/test_audio_source_node.py create mode 100644 tests/test_graph_sink_modalities.py diff --git a/src/scope/core/nodes/builtins/audio_io.py b/src/scope/core/nodes/builtins/audio_io.py index 3a4ddf787..4a4e4da84 100644 --- a/src/scope/core/nodes/builtins/audio_io.py +++ b/src/scope/core/nodes/builtins/audio_io.py @@ -1,4 +1,4 @@ -"""Built-in audio I/O nodes: AudioSource (WAV file → audio stream). +"""Built-in audio I/O nodes: AudioSource (load a WAV file once). Terminal audio output is handled by the regular Sink node: audio edges into a Sink are routed straight to the WebRTC audio track via the @@ -10,7 +10,7 @@ import logging import os import struct -import time +from pathlib import Path from typing import Any, ClassVar import numpy as np @@ -21,8 +21,6 @@ logger = logging.getLogger(__name__) SAMPLE_RATE = 48000 -CHUNK_DURATION = 0.1 # 100ms chunks for streaming -CHUNK_SAMPLES = int(SAMPLE_RATE * CHUNK_DURATION) def _read_wav_float32(path: str) -> tuple[np.ndarray, int]: @@ -115,30 +113,15 @@ def _read_wav_float32(path: str) -> tuple[np.ndarray, int]: class AudioSourceNode(BaseNode): - """Load audio from a WAV file and stream it in 100ms chunks.""" + """Load an audio file and emit the entire clip once per (file_id, duration).""" node_type_id: ClassVar[str] = "audio.AudioSource" def __init__(self, node_id: str, config: dict[str, Any] | None = None): super().__init__(node_id, config) self._audio_data: np.ndarray | None = None - self._position = 0 self._loaded_file: str = "" - self._last_call_time: float | None = None - # In mode=full we emit the entire clip once and then stay silent - # until shutdown. Otherwise every worker tick would re-push a - # multi-second clip and flood the graph. - self._full_emitted = False - - def shutdown(self) -> None: - # Per-session playback state must reset between sessions — - # PipelineManager caches node instances across Run/Stop cycles, so - # without this the next session would see the "already emitted" - # latch from the previous run and stay silent. Audio buffer is - # left intact to avoid re-decoding the file on every restart. - self._position = 0 - self._last_call_time = None - self._full_emitted = False + self._loaded_duration: float = 0.0 @classmethod def get_definition(cls) -> NodeDefinition: @@ -147,7 +130,11 @@ def get_definition(cls) -> NodeDefinition: display_name="Audio Source", category="audio", description="Load audio from a WAV file at 48kHz stereo.", - continuous=True, + # continuous=False so NodeProcessor only re-runs us when a + # parameter actually changes; otherwise the worker would call + # execute() every tick and either flood the graph (on success) + # or flood the log (on missing-file). + continuous=False, inputs=[], outputs=[ NodePort(name="audio", port_type="audio", description="Audio waveform"), @@ -166,20 +153,6 @@ def get_definition(cls) -> NodeDefinition: description="Duration (s)", ui={"min": 1, "max": 600, "step": 1}, ), - NodeParam( - name="mode", - param_type="select", - default="full", - description="Output mode", - ui={"options": ["full", "stream"]}, - ), - NodeParam( - name="pacing", - param_type="select", - default="realtime", - description="Pacing", - ui={"options": ["realtime", "downstream"]}, - ), ], ) @@ -207,8 +180,8 @@ def _load_audio(self, file_path: str, duration: float) -> None: data = data[:, :max_samples] self._audio_data = data - self._position = 0 self._loaded_file = file_path + self._loaded_duration = duration logger.info( "AudioSource loaded: %s (%.1fs)", file_path, @@ -217,86 +190,33 @@ def _load_audio(self, file_path: str, duration: float) -> None: def execute(self, inputs: dict[str, Any], **kwargs) -> dict[str, Any]: file_id = kwargs.get("file_id", "") - duration = float(kwargs.get("duration", 15.0)) - # "full" = emit entire clip once (for batch DAGs); "stream" = 100ms chunks - mode = kwargs.get("mode", "stream") - pacing = kwargs.get("pacing", "realtime") - if not file_id: return {} - file_id = self._resolve_path(file_id) - if not file_id: + resolved = self._resolve_path(file_id) + if not resolved: return {} - if file_id != self._loaded_file: + duration = float(kwargs.get("duration", 15.0)) + # Cache key includes duration: a duration change must re-trim + # (or re-decode if duration grows past the current clip). + if resolved != self._loaded_file or duration != self._loaded_duration: try: - self._load_audio(file_id, duration) + self._load_audio(resolved, duration) except Exception as e: - logger.error("AudioSourceNode failed to load %s: %s", file_id, e) + logger.error("AudioSourceNode failed to load %s: %s", resolved, e) return {} if self._audio_data is None or self._audio_data.shape[1] == 0: return {} - - if mode == "full": - # Emit the entire clip once and then stay silent. - if self._full_emitted: - return {} - self._full_emitted = True - return self._emit_full() - # Stream mode: clear the full-mode flag so switching back to full - # later re-emits once. - self._full_emitted = False - return self._emit_chunk(pacing=pacing) + return {"audio": (torch.from_numpy(self._audio_data), SAMPLE_RATE)} @staticmethod def _resolve_path(file_id: str) -> str | None: - """Resolve a file path. Absolute → cwd → ~/.daydream-scope/assets.""" - if os.path.isabs(file_id) and os.path.exists(file_id): - return file_id + """Resolve a file path; falls back to ``~/.daydream-scope/assets``.""" if os.path.exists(file_id): return os.path.abspath(file_id) - from pathlib import Path - candidate = Path.home() / ".daydream-scope" / "assets" / file_id if candidate.exists(): return str(candidate) logger.warning("AudioSource: file not found: %s", file_id) return None - - def _emit_full(self) -> dict[str, Any]: - return {"audio": (torch.from_numpy(self._audio_data.copy()), SAMPLE_RATE)} - - def _emit_chunk(self, pacing: str = "realtime") -> dict[str, Any]: - # Pace to real-time unless downstream-paced — in that mode the - # maxsize=1 edge queues handle rate limiting via backpressure. - if pacing != "downstream": - now = time.monotonic() - if self._last_call_time is not None: - elapsed = now - self._last_call_time - if elapsed < CHUNK_DURATION * 0.8: - time.sleep(CHUNK_DURATION - elapsed) - self._last_call_time = time.monotonic() - else: - self._last_call_time = None - - total = self._audio_data.shape[1] - if self._position >= total: - return {} - - chunk = np.zeros((self._audio_data.shape[0], CHUNK_SAMPLES), dtype=np.float32) - remaining = CHUNK_SAMPLES - offset = 0 - while remaining > 0: - avail = min(remaining, total - self._position) - if avail <= 0: - # Stop at end of clip; emit a partial chunk (remaining - # samples stay zero-padded). - break - chunk[:, offset : offset + avail] = self._audio_data[ - :, self._position : self._position + avail - ] - self._position += avail - offset += avail - remaining -= avail - return {"audio": (torch.from_numpy(chunk), SAMPLE_RATE)} diff --git a/src/scope/core/nodes/processor.py b/src/scope/core/nodes/processor.py index 630846732..a895a3c90 100644 --- a/src/scope/core/nodes/processor.py +++ b/src/scope/core/nodes/processor.py @@ -240,21 +240,20 @@ def _route_outputs(self, outputs: dict[str, Any]) -> None: if value is None: continue - # Audio outputs also feed FrameProcessor's audio path — but - # only for ports that graph_executor wired to a sink. An - # intermediate audio-producing node (AudioSource → encoder) - # must NOT push to audio_output_queue: with maxsize=1 + - # blocking put, nothing drains it and the worker would - # deadlock after the second emission. + # Audio outputs only push to audio_output_queue for ports + # graph_executor wired to a sink — an intermediate audio port + # (e.g. AudioSource → VAEEncodeAudio) has no drainer, and the + # maxsize=1 blocking put would deadlock after the second emit. + # The fan-out below still runs so non-sink consumers receive + # the value via the normal output_queues path; sink consumers + # are not registered in output_queues, so the fan-out is a + # no-op for them. if port_name in self.audio_sink_ports: self._route_audio(value) - # When this audio output also routes to a sink, don't - # also fan it out as a generic stream queue: the sink - # only consumes via audio_output_queue. - # Fan out to all downstream queues on this port. Block briefly - # when queues are full so producers throttle to consumer pace - # and GPU tensors don't pile up in memory. + # Fan out to downstream node queues. Block briefly when a queue + # is full so producers throttle to consumer pace and GPU tensors + # don't accumulate in memory. out_queues = self.output_queues.get(port_name) if out_queues: for oq in out_queues: @@ -267,6 +266,9 @@ def _route_outputs(self, outputs: dict[str, Any]) -> None: def _route_audio(self, value: Any) -> None: """Extract audio tensor and push to audio_output_queue for WebRTC.""" + # Lazy imports: pulling torch/MediaTimestamp at module scope would + # also pull ``scope.server.media_packets`` into ``scope.core``, + # which the project layout disallows. from fractions import Fraction import torch @@ -300,11 +302,11 @@ def _route_audio(self, value: Any) -> None: if audio_tensor.dtype in (torch.bfloat16, torch.float16): audio_tensor = audio_tensor.float() - timestamp = MediaTimestamp() - if start_sample is not None and audio_sr: - timestamp = MediaTimestamp( - pts=int(start_sample), time_base=Fraction(1, int(audio_sr)) - ) + timestamp = ( + MediaTimestamp(pts=int(start_sample), time_base=Fraction(1, int(audio_sr))) + if start_sample is not None and audio_sr + else MediaTimestamp() + ) packet = AudioPacket( audio=audio_tensor, sample_rate=int(audio_sr), timestamp=timestamp ) diff --git a/tests/test_audio_processing_track.py b/tests/test_audio_processing_track.py index 9e6b14ae2..ccea1471a 100644 --- a/tests/test_audio_processing_track.py +++ b/tests/test_audio_processing_track.py @@ -362,3 +362,83 @@ def test_caps_buffer_at_max(self): # After cap-trimming and consuming one 20ms frame, must be under the cap assert track._buffered_samples <= max_interleaved + + +# --------------------------------------------------------------------------- +# Overlap-trim — streaming VAE decoders (ACEStep StreamVAEDecode + +# follow_playhead) re-emit windows whose start_sample retreats into already +# buffered territory; the track must drop the duplicate prefix. +# --------------------------------------------------------------------------- + + +class TestOverlapTrim: + def _packet(self, start_sample: int, n_samples: int) -> AudioPacket: + return AudioPacket( + audio=torch.ones(2, n_samples), + sample_rate=48000, + timestamp=MediaTimestamp( + pts=start_sample, time_base=fractions.Fraction(1, 48000) + ), + ) + + def test_trims_partial_overlap(self): + """Second window starts 100 samples before the first one ended; + those 100 samples (× 2 channels) are dropped from the front.""" + track = _make_track() + first = self._packet(start_sample=0, n_samples=500) + # Next chunk overlaps last 100 samples of `first`. + second = self._packet(start_sample=400, n_samples=500) + + chunks = iter([first, second, None]) + track.frame_processor.get_audio_packet = MagicMock( + side_effect=lambda: next(chunks, None) + ) + track._drain_audio_packets() + + # First chunk: 500 × 2 = 1000 samples, pts 0. + # Second chunk: trimmed by 100 samples × 2 channels = 200 → 800 samples, + # pts advances to 500 (the next expected sample after the first chunk). + assert len(track._chunks) == 2 + first_buf, first_pts = track._chunks[0] + second_buf, second_pts = track._chunks[1] + assert len(first_buf) == 1000 + assert first_pts == 0 + assert len(second_buf) == 800 + assert second_pts == 500 + assert track._buffered_samples == 1800 + + def test_drops_chunk_entirely_in_past(self): + """Window whose end lies before the current expected PTS is dropped.""" + track = _make_track() + first = self._packet(start_sample=0, n_samples=500) + # Entirely behind the playhead: start=0, end=100 < expected 500. + stale = self._packet(start_sample=0, n_samples=100) + + chunks = iter([first, stale, None]) + track.frame_processor.get_audio_packet = MagicMock( + side_effect=lambda: next(chunks, None) + ) + track._drain_audio_packets() + + assert len(track._chunks) == 1 + assert track._chunks[0][1] == 0 + # _next_expected_pts is still the end of the first chunk: 500. + assert track._next_expected_pts == 500 + + def test_contiguous_chunks_not_trimmed(self): + """Back-to-back windows (no overlap) flow through untouched.""" + track = _make_track() + first = self._packet(start_sample=0, n_samples=500) + second = self._packet(start_sample=500, n_samples=500) + + chunks = iter([first, second, None]) + track.frame_processor.get_audio_packet = MagicMock( + side_effect=lambda: next(chunks, None) + ) + track._drain_audio_packets() + + assert len(track._chunks) == 2 + assert len(track._chunks[0][0]) == 1000 + assert len(track._chunks[1][0]) == 1000 + assert track._chunks[1][1] == 500 + assert track._next_expected_pts == 1000 diff --git a/tests/test_audio_source_node.py b/tests/test_audio_source_node.py new file mode 100644 index 000000000..c113aa757 --- /dev/null +++ b/tests/test_audio_source_node.py @@ -0,0 +1,127 @@ +"""Tests for the built-in AudioSource node and its WAV decoder.""" + +from __future__ import annotations + +import struct +import wave +from pathlib import Path + +import numpy as np +import pytest + +from scope.core.nodes.builtins.audio_io import ( + SAMPLE_RATE, + AudioSourceNode, + _read_wav_float32, +) + + +def _write_pcm16(path: Path, samples: np.ndarray, sample_rate: int) -> None: + """Write float samples to a 16-bit PCM WAV via stdlib ``wave``.""" + pcm = np.clip(samples, -1.0, 1.0) + pcm = (pcm * 32767.0).astype(" None: + """Hand-roll an IEEE-float32 WAV (stdlib ``wave`` rejects format 3).""" + if samples.ndim == 1: + samples = samples[:, None] + n_channels = samples.shape[1] + data = samples.astype(" None: + sr = 16000 + samples = np.random.uniform(-0.5, 0.5, (sr, 2)).astype(np.float32) + wav = tmp_path / "pcm.wav" + _write_pcm16(wav, samples, sr) + + data, decoded_sr = _read_wav_float32(str(wav)) + assert decoded_sr == sr + assert data.shape == (sr, 2) + # 16-bit quantisation noise at most ~1/32767. + assert np.max(np.abs(data - samples)) < 1.5 / 32767 + + +def test_read_wav_float32_mono(tmp_path: Path) -> None: + sr = 22050 + samples = np.random.uniform(-0.9, 0.9, sr).astype(np.float32) + wav = tmp_path / "float.wav" + _write_float32(wav, samples, sr) + + data, decoded_sr = _read_wav_float32(str(wav)) + assert decoded_sr == sr + assert data.shape == (sr, 1) + np.testing.assert_allclose(data[:, 0], samples, atol=1e-6) + + +def test_read_wav_rejects_non_wav(tmp_path: Path) -> None: + bad = tmp_path / "bad.wav" + bad.write_bytes(b"NOT A WAV FILE") + with pytest.raises(ValueError): + _read_wav_float32(str(bad)) + + +def test_audio_source_emits_full_clip(tmp_path: Path) -> None: + """``execute`` returns the entire decoded clip as a (channels, samples) tensor.""" + sr = SAMPLE_RATE + duration_s = 1.0 + samples = np.random.uniform(-0.3, 0.3, (int(sr * duration_s), 2)).astype(np.float32) + wav = tmp_path / "clip.wav" + _write_pcm16(wav, samples, sr) + + node = AudioSourceNode(node_id="audio_src") + out = node.execute({}, file_id=str(wav), duration=duration_s) + + assert "audio" in out + tensor, out_sr = out["audio"] + assert out_sr == SAMPLE_RATE + assert tensor.shape == (2, int(sr * duration_s)) + + +def test_audio_source_missing_file_is_silent(tmp_path: Path) -> None: + """Missing ``file_id`` returns ``{}`` instead of raising.""" + node = AudioSourceNode(node_id="audio_src") + assert node.execute({}) == {} + assert node.execute({}, file_id=str(tmp_path / "does_not_exist.wav")) == {} + + +def test_audio_source_duration_change_retrims(tmp_path: Path) -> None: + """Changing only the duration must re-trim — the cache key includes it.""" + sr = SAMPLE_RATE + samples = np.random.uniform(-0.3, 0.3, (sr * 4, 2)).astype(np.float32) + wav = tmp_path / "clip.wav" + _write_pcm16(wav, samples, sr) + + node = AudioSourceNode(node_id="audio_src") + out_long = node.execute({}, file_id=str(wav), duration=4.0) + assert out_long["audio"][0].shape[1] == 4 * sr + + out_short = node.execute({}, file_id=str(wav), duration=1.0) + assert out_short["audio"][0].shape[1] == 1 * sr diff --git a/tests/test_graph_sink_modalities.py b/tests/test_graph_sink_modalities.py new file mode 100644 index 000000000..25eceee21 --- /dev/null +++ b/tests/test_graph_sink_modalities.py @@ -0,0 +1,142 @@ +"""Tests for ``GraphConfig.get_sink_modalities`` — the consolidated check +used by webrtc/mcp_router to decide what tracks a graph emits. +""" + +from __future__ import annotations + +from scope.server.graph_schema import GraphConfig + + +def _build(nodes: list[dict], edges: list[dict]) -> GraphConfig: + return GraphConfig.model_validate({"nodes": nodes, "edges": edges}) + + +def test_video_only_graph() -> None: + g = _build( + nodes=[ + {"id": "in", "type": "source"}, + {"id": "p", "type": "pipeline", "pipeline_id": "passthrough"}, + {"id": "out", "type": "sink"}, + ], + edges=[ + { + "from": "in", + "from_port": "video", + "to_node": "p", + "to_port": "video", + "kind": "stream", + }, + { + "from": "p", + "from_port": "video", + "to_node": "out", + "to_port": "video", + "kind": "stream", + }, + ], + ) + assert g.get_sink_modalities() == (True, False) + + +def test_audio_only_graph() -> None: + g = _build( + nodes=[ + {"id": "src", "type": "source"}, + {"id": "out", "type": "sink"}, + ], + edges=[ + { + "from": "src", + "from_port": "audio", + "to_node": "out", + "to_port": "audio", + "kind": "stream", + } + ], + ) + assert g.get_sink_modalities() == (False, True) + + +def test_mixed_video_and_audio_sinks() -> None: + g = _build( + nodes=[ + {"id": "src", "type": "source"}, + {"id": "video_out", "type": "sink"}, + {"id": "audio_out", "type": "sink"}, + ], + edges=[ + { + "from": "src", + "from_port": "video", + "to_node": "video_out", + "to_port": "video", + "kind": "stream", + }, + { + "from": "src", + "from_port": "audio", + "to_node": "audio_out", + "to_port": "audio", + "kind": "stream", + }, + ], + ) + assert g.get_sink_modalities() == (True, True) + + +def test_no_sink_returns_false_false() -> None: + g = _build( + nodes=[{"id": "src", "type": "source"}], + edges=[], + ) + assert g.get_sink_modalities() == (False, False) + + +def test_non_stream_edges_ignored() -> None: + # Only "stream" edges count toward sink modalities; "parameter" edges + # carry control data and shouldn't promote a sink to video/audio. + g = _build( + nodes=[ + {"id": "src", "type": "source"}, + {"id": "out", "type": "sink"}, + ], + edges=[ + { + "from": "src", + "from_port": "video", + "to_node": "out", + "to_port": "video", + "kind": "parameter", + } + ], + ) + assert g.get_sink_modalities() == (False, False) + + +def test_edges_not_targeting_sink_ignored() -> None: + # An audio edge between two non-sink nodes must not mark the graph as + # audio-producing — only edges into a sink count. + g = _build( + nodes=[ + {"id": "src", "type": "source"}, + {"id": "encoder", "type": "node", "node_type_id": "audio.Encode"}, + {"id": "out", "type": "sink"}, + ], + edges=[ + { + "from": "src", + "from_port": "audio", + "to_node": "encoder", + "to_port": "audio", + "kind": "stream", + }, + { + "from": "encoder", + "from_port": "video", + "to_node": "out", + "to_port": "video", + "kind": "stream", + }, + ], + ) + assert g.get_sink_modalities() == (True, False) From 6134bad690645c3d7ee931f321a85bbe3234ba97 Mon Sep 17 00:00:00 2001 From: Rafal Leszko Date: Mon, 18 May 2026 13:30:06 +0000 Subject: [PATCH 5/9] refactor(processor): shrink NodeProcessor diff vs main Trim the changes against ``main`` without altering behavior: - Drop the ``_pending_inputs`` field; drain straight into ``_last_inputs`` and use a local ``fresh`` dict per tick to gate the "skip when nothing changed" check. - Collapse the three-branch ``_process_once`` (source / continuous / non-continuous) into one drain-cache-fire flow. - Revert the class docstring tweak. - Trim multi-paragraph comments around ``audio_sink_ports``, ``audio_output_queue``, ``_last_inputs``, ``update_parameters``, and ``_route_outputs`` to one-liners where the code speaks for itself. - Rewrite ``update_parameters`` change-detection as a single ``any(...)``. Net diff on ``processor.py`` vs ``main`` drops from +170/-31 to +105/-22. Co-Authored-By: Claude Opus 4.7 (1M context) Signed-off-by: Rafal Leszko --- src/scope/core/nodes/processor.py | 178 ++++++++++-------------------- 1 file changed, 60 insertions(+), 118 deletions(-) diff --git a/src/scope/core/nodes/processor.py b/src/scope/core/nodes/processor.py index a895a3c90..facb84368 100644 --- a/src/scope/core/nodes/processor.py +++ b/src/scope/core/nodes/processor.py @@ -21,8 +21,8 @@ class NodeProcessor: output queues fan out its results to downstream nodes. Source nodes (no inputs) execute once by default; nodes marked - ``continuous=True`` in their definition re-execute on every tick, which - is how streaming sources (audio) and sinks (audio loop) stay alive. + ``continuous=True`` in their definition re-execute on every tick so + streaming sources and sinks stay alive. """ def __init__( @@ -43,23 +43,17 @@ def __init__( definition = node.get_definition() - # Output ports wired to a sink node via graph_executor. Only these - # route through ``audio_output_queue`` → FrameProcessor.get_audio(). - # A node whose audio output feeds another node (e.g. AudioSource → - # VAEEncodeAudio) must NOT push to audio_output_queue: with - # maxsize=1 + blocking put, nothing would ever drain it and the - # worker would deadlock after the second emission. + # Output ports wired straight to a sink; populated by graph_executor. + # Values on these ports are routed to ``audio_output_queue`` for + # FrameProcessor.get_audio_packet() to drain. self.audio_sink_ports: set[str] = set() - # Names of parameters this node declares. Global param updates - # (no node_id) are broadcast to every processor; this filter - # keeps a graph-level tweak from spuriously marking every custom - # node for re-execution. + # Parameter names this node declares — used to ignore broadcast + # updates aimed at other nodes. self._declared_param_names: set[str] = {p.name for p in definition.params} - # Audio output queue consumed by FrameProcessor.get_audio() on the - # sink. Size 1 + blocking producer (see _route_audio) gives us - # backpressure: the audio decoder stalls until AudioProcessingTrack - # has served enough of the current chunk to pull a new one. + # Consumed by FrameProcessor.get_audio_packet() on the sink feeder. + # maxsize=1 + blocking put (see _route_audio) gives backpressure so + # batch decoders can't outrun real-time playback. self.audio_output_queue: queue.Queue = queue.Queue(maxsize=1) self.worker_thread: threading.Thread | None = None @@ -70,15 +64,10 @@ def __init__( self._source_executed = False self._has_executed = False self._continuous = definition.continuous - # Cached inputs from the last successful run, replayed when - # parameters change or when a static upstream (one-shot model - # handle) never re-emits. + # Latch of last-seen inputs per port, so static upstreams (one-shot + # model/vae/clip handles) survive across param-triggered re-runs. self._last_inputs: dict[str, Any] = {} self._needs_rerun = False - # Fresh values drained from input queues while waiting for every - # port to catch up. Draining unblocks upstream producers without - # committing to a run yet. - self._pending_inputs: dict[str, Any] = {} # PipelineProcessor interface compatibility: graph_executor populates # this for every processor; kept as an empty dict so that write is safe. @@ -114,18 +103,12 @@ def stop(self) -> None: logger.info("NodeProcessor stopped: %s", self.node_id) def update_parameters(self, parameters: dict[str, Any]) -> None: - # Only mark the node dirty when the update actually touches a - # parameter this node declares AND the value differs from the - # current one. FrameProcessor.update_parameters broadcasts - # global updates (no node_id) to every processor, so without - # this guard a stream-level tweak would fire _needs_rerun on - # every custom node and cascade through the DAG. - changed = False - for key, value in parameters.items(): - if key not in self._declared_param_names: - continue - if self.parameters.get(key) != value: - changed = True + # FrameProcessor broadcasts node-less updates to every processor; + # only mark ourselves dirty when a value we actually declare changes. + changed = any( + key in self._declared_param_names and self.parameters.get(key) != value + for key, value in parameters.items() + ) self.parameters.update(parameters) if changed: self._needs_rerun = True @@ -159,66 +142,44 @@ def _process_once(self) -> None: all_queues = dict(self.input_queues) is_source_node = not all_queues - inputs: dict[str, Any] = {} - if is_source_node: - # Source nodes run once, then re-run only when params change - # (or every tick when continuous). - if self._source_executed and not self._continuous and not self._needs_rerun: - self.shutdown_event.wait(1.0) - return - elif self._continuous: - # Continuous nodes: pick up whatever's currently in the queues - # and run every tick. Latched values from prior ticks are - # merged in so static upstreams (model/vae/clip handles) - # persist across ticks. + # Source nodes execute once; continuous=True nodes re-execute every + # tick (for streaming I/O). A pending parameter change also re-wakes. + if ( + is_source_node + and self._source_executed + and not self._continuous + and not self._needs_rerun + ): + self.shutdown_event.wait(1.0) + return + + # Drain fresh values into the latch cache; ports whose upstream has + # already gone quiet (e.g. one-shot model handles) replay from cache. + fresh: dict[str, Any] = {} + inputs: dict[str, Any] = {} + if all_queues: for port_name, q in all_queues.items(): try: - inputs[port_name] = q.get_nowait() + fresh[port_name] = q.get_nowait() except queue.Empty: pass - # Wait until every port has been seen at least once. - if not self._has_executed and ( - set(all_queues.keys()) - inputs.keys() - self._last_inputs.keys() - ): - # Cache what arrived so we don't lose it while waiting. - self._last_inputs.update(inputs) + self._last_inputs.update(fresh) + inputs = dict(self._last_inputs) + # First run: wait until every port has been seen at least once. + if not self._has_executed and set(all_queues.keys()) - inputs.keys(): self.shutdown_event.wait(SLEEP_TIME) return - # Merge with latched: fresh values override cache. - merged = {**self._last_inputs, **inputs} - inputs = merged - else: - # Non-continuous node with inputs: - # - First run waits until every port has received data at least - # once so the latch cache (_last_inputs) is populated. - # - Subsequent runs drain fresh values into _pending_inputs (to - # unblock upstream producers) and fire when at least one - # port has fresh data, OR when params change. - # - Static ports — upstreams that never re-emit, e.g. - # MODEL/VAE handles — are latched from _last_inputs. - if not self._has_executed: - if any(q.empty() for q in all_queues.values()): - self.shutdown_event.wait(SLEEP_TIME) - return - inputs = {name: q.get_nowait() for name, q in all_queues.items()} - else: - for port_name, q in all_queues.items(): - try: - self._pending_inputs[port_name] = q.get_nowait() - except queue.Empty: - pass - fire = self._needs_rerun or bool(self._pending_inputs) - if not fire: - self.shutdown_event.wait(SLEEP_TIME) - return - inputs = {} - for port_name in all_queues: - if port_name in self._pending_inputs: - inputs[port_name] = self._pending_inputs[port_name] - elif port_name in self._last_inputs: - inputs[port_name] = self._last_inputs[port_name] - self._pending_inputs.clear() + + # Non-continuous nodes skip when nothing changed since last run. + if ( + self._has_executed + and not self._continuous + and not fresh + and not self._needs_rerun + ): + self.shutdown_event.wait(SLEEP_TIME) + return outputs = self.node.execute(inputs, **self.parameters) @@ -231,8 +192,6 @@ def _process_once(self) -> None: return self._has_executed = True - if inputs: - self._last_inputs.update(inputs) self._route_outputs(outputs) def _route_outputs(self, outputs: dict[str, Any]) -> None: @@ -240,20 +199,13 @@ def _route_outputs(self, outputs: dict[str, Any]) -> None: if value is None: continue - # Audio outputs only push to audio_output_queue for ports - # graph_executor wired to a sink — an intermediate audio port - # (e.g. AudioSource → VAEEncodeAudio) has no drainer, and the - # maxsize=1 blocking put would deadlock after the second emit. - # The fan-out below still runs so non-sink consumers receive - # the value via the normal output_queues path; sink consumers - # are not registered in output_queues, so the fan-out is a - # no-op for them. + # Sink-bound audio also goes to audio_output_queue for WebRTC. if port_name in self.audio_sink_ports: self._route_audio(value) - # Fan out to downstream node queues. Block briefly when a queue - # is full so producers throttle to consumer pace and GPU tensors - # don't accumulate in memory. + # Fan out to all downstream queues on this port. Block briefly + # when queues are full so producers throttle to consumer pace + # and GPU tensors don't pile up in memory. out_queues = self.output_queues.get(port_name) if out_queues: for oq in out_queues: @@ -266,9 +218,8 @@ def _route_outputs(self, outputs: dict[str, Any]) -> None: def _route_audio(self, value: Any) -> None: """Extract audio tensor and push to audio_output_queue for WebRTC.""" - # Lazy imports: pulling torch/MediaTimestamp at module scope would - # also pull ``scope.server.media_packets`` into ``scope.core``, - # which the project layout disallows. + # Lazy imports keep ``scope.core`` from reaching back into + # ``scope.server`` at module load (disallowed by the project layout). from fractions import Fraction import torch @@ -281,24 +232,17 @@ def _route_audio(self, value: Any) -> None: else: audio_tensor = getattr(value, "waveform", None) audio_sr = getattr(value, "sample_rate", 48000) - # ACEStep StreamVAEDecode tags each window with a ``start_sample`` - # offset (window t_start * sample_rate). Without this, overlapping - # windows produced by ``follow_playhead`` are appended back-to-back - # in AudioProcessingTrack and the overlapped region plays twice — - # heard as "parts repeated". Forward the offset as a PTS so the - # sink can detect and trim duplicates. + # ACEStep StreamVAEDecode tags each window with start_sample so + # AudioProcessingTrack can trim overlapping windows downstream. start_sample = getattr(value, "start_sample", None) if audio_tensor is None: return if isinstance(audio_tensor, torch.Tensor): if audio_tensor.is_cuda: audio_tensor = audio_tensor.detach().cpu() - # VAE decoders (e.g. ACEStep) return (1, C, T); the audio - # track expects (C, T). Drop a leading singleton batch dim. + # VAE decoders return (1, C, T); the audio track expects (C, T). if audio_tensor.dim() == 3 and audio_tensor.shape[0] == 1: audio_tensor = audio_tensor.squeeze(0) - # Upcast bfloat16/float16 to float32 before numpy conversion - # (AudioProcessingTrack calls .numpy() on the tensor). if audio_tensor.dtype in (torch.bfloat16, torch.float16): audio_tensor = audio_tensor.float() @@ -310,10 +254,8 @@ def _route_audio(self, value: Any) -> None: packet = AudioPacket( audio=audio_tensor, sample_rate=int(audio_sr), timestamp=timestamp ) - # Blocking-with-retry put. Stalls the worker thread when the audio - # track hasn't finished serving the previous chunk — this is the - # backpressure that rate-limits batch generators to real-time - # playback instead of silently dropping audio. + # Blocking put with retry: stalls the worker when the audio track + # hasn't drained the previous chunk — this is the backpressure. while not self.shutdown_event.is_set(): try: self.audio_output_queue.put(packet, timeout=0.1) From abf46864e1fe5cd2b32646c6e017f56bf96c1c50 Mon Sep 17 00:00:00 2001 From: Rafal Leszko Date: Mon, 18 May 2026 13:54:31 +0000 Subject: [PATCH 6/9] refactor(audio): move AudioPacket construction into media_packets MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ``NodeProcessor._route_audio`` was bridging two layers — pulling tensor fields out of a node output, normalizing device/shape/dtype, and building an ``AudioPacket`` with a PTS-bearing ``MediaTimestamp``. All of that is server-domain work; only the queue push is core-domain. Extract the construction into a free function ``audio_packet_from_node_output`` next to ``AudioPacket`` itself, and have ``NodeProcessor._route_audio`` call it via the same lazy import that the original code already needed for ``AudioPacket``/``torch``. ``_route_audio`` shrinks from 47 lines to 14 — it now just converts and applies the backpressured put — and the tensor-shape knowledge lives beside the types it produces. Four focused tests cover the new helper (tuple input, waveform-object PTS, shape/dtype normalization, missing waveform). Co-Authored-By: Claude Opus 4.7 (1M context) Signed-off-by: Rafal Leszko --- src/scope/core/nodes/processor.py | 41 +++++----------------------- src/scope/server/media_packets.py | 44 +++++++++++++++++++++++++++++++ tests/test_audio_packets.py | 37 +++++++++++++++++++++++++- 3 files changed, 86 insertions(+), 36 deletions(-) diff --git a/src/scope/core/nodes/processor.py b/src/scope/core/nodes/processor.py index facb84368..649177f0e 100644 --- a/src/scope/core/nodes/processor.py +++ b/src/scope/core/nodes/processor.py @@ -217,43 +217,14 @@ def _route_outputs(self, outputs: dict[str, Any]) -> None: continue def _route_audio(self, value: Any) -> None: - """Extract audio tensor and push to audio_output_queue for WebRTC.""" - # Lazy imports keep ``scope.core`` from reaching back into + """Convert a node output value and push it to audio_output_queue.""" + # Lazy import keeps ``scope.core`` from reaching back into # ``scope.server`` at module load (disallowed by the project layout). - from fractions import Fraction - - import torch - - from scope.server.media_packets import AudioPacket, MediaTimestamp - - start_sample: int | None = None - if isinstance(value, tuple) and len(value) == 2: - audio_tensor, audio_sr = value - else: - audio_tensor = getattr(value, "waveform", None) - audio_sr = getattr(value, "sample_rate", 48000) - # ACEStep StreamVAEDecode tags each window with start_sample so - # AudioProcessingTrack can trim overlapping windows downstream. - start_sample = getattr(value, "start_sample", None) - if audio_tensor is None: + from scope.server.media_packets import audio_packet_from_node_output + + packet = audio_packet_from_node_output(value) + if packet is None: return - if isinstance(audio_tensor, torch.Tensor): - if audio_tensor.is_cuda: - audio_tensor = audio_tensor.detach().cpu() - # VAE decoders return (1, C, T); the audio track expects (C, T). - if audio_tensor.dim() == 3 and audio_tensor.shape[0] == 1: - audio_tensor = audio_tensor.squeeze(0) - if audio_tensor.dtype in (torch.bfloat16, torch.float16): - audio_tensor = audio_tensor.float() - - timestamp = ( - MediaTimestamp(pts=int(start_sample), time_base=Fraction(1, int(audio_sr))) - if start_sample is not None and audio_sr - else MediaTimestamp() - ) - packet = AudioPacket( - audio=audio_tensor, sample_rate=int(audio_sr), timestamp=timestamp - ) # Blocking put with retry: stalls the worker when the audio track # hasn't drained the previous chunk — this is the backpressure. while not self.shutdown_event.is_set(): diff --git a/src/scope/server/media_packets.py b/src/scope/server/media_packets.py index 66eb3a13c..c311d6815 100644 --- a/src/scope/server/media_packets.py +++ b/src/scope/server/media_packets.py @@ -60,3 +60,47 @@ def ensure_audio_packet( return item audio, sample_rate = item return AudioPacket(audio=audio, sample_rate=sample_rate) + + +def audio_packet_from_node_output(value: Any) -> AudioPacket | None: + """Build an ``AudioPacket`` from a graph node's audio output port value. + + Accepts the two shapes that built-in and plugin nodes emit: + * ``(tensor, sample_rate)`` tuples (e.g. ``AudioSourceNode``). + * Objects with ``.waveform`` / ``.sample_rate`` / optional + ``.start_sample`` attributes (e.g. ACEStep ``StreamVAEDecode``, + whose ``start_sample`` is forwarded as PTS so ``AudioProcessingTrack`` + can trim overlapping windows downstream). + + Normalizes the tensor for ``AudioProcessingTrack`` consumption: moves + to CPU, drops a leading ``(1, C, T)`` batch dim, and upcasts + bfloat16/float16 to float32 so the subsequent ``.numpy()`` call works. + Returns ``None`` when no audio tensor can be extracted. + """ + start_sample: int | None = None + if isinstance(value, tuple) and len(value) == 2: + audio_tensor, audio_sr = value + else: + audio_tensor = getattr(value, "waveform", None) + audio_sr = getattr(value, "sample_rate", 48000) + start_sample = getattr(value, "start_sample", None) + + if audio_tensor is None: + return None + + if isinstance(audio_tensor, torch.Tensor): + if audio_tensor.is_cuda: + audio_tensor = audio_tensor.detach().cpu() + if audio_tensor.dim() == 3 and audio_tensor.shape[0] == 1: + audio_tensor = audio_tensor.squeeze(0) + if audio_tensor.dtype in (torch.bfloat16, torch.float16): + audio_tensor = audio_tensor.float() + + timestamp = ( + MediaTimestamp(pts=int(start_sample), time_base=Fraction(1, int(audio_sr))) + if start_sample is not None and audio_sr + else MediaTimestamp() + ) + return AudioPacket( + audio=audio_tensor, sample_rate=int(audio_sr), timestamp=timestamp + ) diff --git a/tests/test_audio_packets.py b/tests/test_audio_packets.py index 6ed87a1b1..eaa164b75 100644 --- a/tests/test_audio_packets.py +++ b/tests/test_audio_packets.py @@ -8,7 +8,11 @@ from scope.server.cloud_relay import CloudRelay from scope.server.frame_processor import FrameProcessor -from scope.server.media_packets import AudioPacket, MediaTimestamp +from scope.server.media_packets import ( + AudioPacket, + MediaTimestamp, + audio_packet_from_node_output, +) def _make_frame_processor_with_audio_queue(items): @@ -82,3 +86,34 @@ def remove_audio_callback(self, callback): # pragma: no cover - unused in test assert packet is not None assert packet.sample_rate == 48_000 assert packet.timestamp == MediaTimestamp(pts=321, time_base=Fraction(1, 48_000)) + + +def test_audio_packet_from_tuple(): + audio = torch.ones((2, 16)) + packet = audio_packet_from_node_output((audio, 48_000)) + assert packet == AudioPacket(audio=audio, sample_rate=48_000) + + +def test_audio_packet_from_waveform_object_carries_pts(): + """``start_sample`` flows through as PTS so AudioProcessingTrack can trim.""" + audio = torch.zeros((2, 32)) + value = SimpleNamespace(waveform=audio, sample_rate=48_000, start_sample=500) + packet = audio_packet_from_node_output(value) + assert packet is not None + assert packet.sample_rate == 48_000 + assert packet.timestamp == MediaTimestamp(pts=500, time_base=Fraction(1, 48_000)) + + +def test_audio_packet_drops_cuda_batch_dim_and_upcasts_bfloat16(): + # Skip CUDA-only steps; cover shape + dtype normalization (bf16 → fp32 + + # ``(1, C, T)`` squeeze) which AudioProcessingTrack.numpy() requires. + audio = torch.zeros((1, 2, 16), dtype=torch.bfloat16) + packet = audio_packet_from_node_output((audio, 48_000)) + assert packet is not None + assert packet.audio.shape == (2, 16) + assert packet.audio.dtype == torch.float32 + + +def test_audio_packet_from_missing_waveform_is_none(): + value = SimpleNamespace(something_else=True) + assert audio_packet_from_node_output(value) is None From 0283766071dd0e73abcc8f4f50e985edce91a6cf Mon Sep 17 00:00:00 2001 From: Rafal Leszko Date: Mon, 18 May 2026 13:59:19 +0000 Subject: [PATCH 7/9] chore(audio): drop demon_workflow.json from the PR The demo workflow lives in the PR description instead so it doesn't ship in the repo as committed product. Anyone wanting to try the DEMON graph can copy the JSON from there. Co-Authored-By: Claude Opus 4.7 (1M context) Signed-off-by: Rafal Leszko --- demon_workflow.json | 223 -------------------------------------------- 1 file changed, 223 deletions(-) delete mode 100644 demon_workflow.json diff --git a/demon_workflow.json b/demon_workflow.json deleted file mode 100644 index 1fce450fc..000000000 --- a/demon_workflow.json +++ /dev/null @@ -1,223 +0,0 @@ -{ - "format": "scope-workflow", - "format_version": "1.0", - "metadata": { - "name": "DEMON Realtime Cover", - "created_at": "2026-05-13T00:00:00.000Z", - "scope_version": "0.2.4" - }, - "pipelines": [ - { - "pipeline_id": "audio.AudioSource", - "source": { "type": "builtin" }, - "loras": [], - "params": {} - }, - { - "pipeline_id": "acestep.LoadModel", - "source": { "type": "local", "plugin_name": "demon", "package_spec": "demon" }, - "loras": [], - "params": {} - }, - { - "pipeline_id": "acestep.VAEEncodeAudio", - "source": { "type": "local", "plugin_name": "demon", "package_spec": "demon" }, - "loras": [], - "params": {} - }, - { - "pipeline_id": "acestep.SemanticExtract", - "source": { "type": "local", "plugin_name": "demon", "package_spec": "demon" }, - "loras": [], - "params": {} - }, - { - "pipeline_id": "acestep.EncodeText", - "source": { "type": "local", "plugin_name": "demon", "package_spec": "demon" }, - "loras": [], - "params": {} - }, - { - "pipeline_id": "acestep.EncodeConditioning", - "source": { "type": "local", "plugin_name": "demon", "package_spec": "demon" }, - "loras": [], - "params": {} - }, - { - "pipeline_id": "acestep.OdeSolver", - "source": { "type": "local", "plugin_name": "demon", "package_spec": "demon" }, - "loras": [], - "params": {} - }, - { - "pipeline_id": "acestep.StreamDenoise", - "source": { "type": "local", "plugin_name": "demon", "package_spec": "demon" }, - "loras": [], - "params": {} - }, - { - "pipeline_id": "acestep.StreamVAEDecode", - "source": { "type": "local", "plugin_name": "demon", "package_spec": "demon" }, - "loras": [], - "params": {} - } - ], - "graph": { - "nodes": [ - { - "id": "audio_model", - "type": "node", - "node_type_id": "acestep.LoadModel", - "params": { - "config_path": "acestep-v15-turbo", - "device": "cuda", - "use_flash_attention": true, - "decoder_backend": "eager", - "vae_backend": "eager", - "project_root": "" - }, - "x": 50, - "y": 100, - "w": 360, - "h": 220 - }, - { - "id": "audio_source", - "type": "node", - "node_type_id": "audio.AudioSource", - "params": { - "file_id": "/tmp/demon_input.wav", - "duration": 120, - "mode": "full", - "pacing": "realtime" - }, - "x": 50, - "y": 500, - "w": 320, - "h": 200 - }, - { - "id": "audio_encode", - "type": "node", - "node_type_id": "acestep.VAEEncodeAudio", - "params": {}, - "x": 470, - "y": 380, - "w": 240, - "h": 140 - }, - { - "id": "audio_semantic", - "type": "node", - "node_type_id": "acestep.SemanticExtract", - "params": {}, - "x": 800, - "y": 100, - "w": 240, - "h": 140 - }, - { - "id": "audio_text", - "type": "node", - "node_type_id": "acestep.EncodeText", - "params": { - "tags": "dubstep heavy bass", - "lyrics": "", - "task_type": "cover", - "bpm": 134, - "duration": 120, - "key": "G minor", - "language": "en" - }, - "x": 800, - "y": 600, - "w": 280, - "h": 280 - }, - { - "id": "audio_cond", - "type": "node", - "node_type_id": "acestep.EncodeConditioning", - "params": {}, - "x": 1140, - "y": 420, - "w": 240, - "h": 160 - }, - { - "id": "audio_solver", - "type": "node", - "node_type_id": "acestep.OdeSolver", - "params": {}, - "x": 1140, - "y": 200, - "w": 240, - "h": 120 - }, - { - "id": "audio_stream", - "type": "node", - "node_type_id": "acestep.StreamDenoise", - "params": { - "denoise": 0.6, - "seed": 42, - "shift": 3, - "pipeline_depth": 4, - "steps": 8, - "drain": false, - "duration": 120 - }, - "x": 1500, - "y": 380, - "w": 320, - "h": 360 - }, - { - "id": "audio_decode", - "type": "node", - "node_type_id": "acestep.StreamVAEDecode", - "params": { - "vae_window": 5, - "vae_overlap": 0.5, - "t_start": 0, - "follow_playhead": true, - "mse_skip_threshold": 0.001, - "prefetch_seconds": 1 - }, - "x": 1880, - "y": 380, - "w": 320, - "h": 280 - }, - { - "id": "output", - "type": "sink", - "x": 2260, - "y": 420, - "w": 240, - "h": 200 - } - ], - "edges": [ - {"from": "audio_model", "from_port": "vae", "to_node": "audio_encode", "to_port": "vae", "kind": "stream"}, - {"from": "audio_source", "from_port": "audio", "to_node": "audio_encode", "to_port": "audio", "kind": "stream"}, - {"from": "audio_model", "from_port": "model", "to_node": "audio_semantic", "to_port": "model", "kind": "stream"}, - {"from": "audio_encode", "from_port": "latent", "to_node": "audio_semantic", "to_port": "latent", "kind": "stream"}, - {"from": "audio_model", "from_port": "clip", "to_node": "audio_text", "to_port": "clip", "kind": "stream"}, - {"from": "audio_model", "from_port": "model", "to_node": "audio_cond", "to_port": "model", "kind": "stream"}, - {"from": "audio_text", "from_port": "text_embed", "to_node": "audio_cond", "to_port": "text_embed", "kind": "stream"}, - {"from": "audio_encode", "from_port": "latent", "to_node": "audio_cond", "to_port": "timbre_ref", "kind": "stream"}, - {"from": "audio_model", "from_port": "model", "to_node": "audio_stream", "to_port": "model", "kind": "stream"}, - {"from": "audio_solver", "from_port": "solver", "to_node": "audio_stream", "to_port": "solver", "kind": "stream"}, - {"from": "audio_cond", "from_port": "conditioning", "to_node": "audio_stream", "to_port": "positive", "kind": "stream"}, - {"from": "audio_encode", "from_port": "latent", "to_node": "audio_stream", "to_port": "source_latent", "kind": "stream"}, - {"from": "audio_semantic", "from_port": "latent", "to_node": "audio_stream", "to_port": "context_latent", "kind": "stream"}, - {"from": "audio_model", "from_port": "vae", "to_node": "audio_decode", "to_port": "vae", "kind": "stream"}, - {"from": "audio_stream", "from_port": "latent", "to_node": "audio_decode", "to_port": "latent", "kind": "stream"}, - {"from": "audio_decode", "from_port": "audio", "to_node": "output", "to_port": "audio", "kind": "stream"} - ], - "ui_state": { - "node_params": {} - } - } -} From 6ca421350d0fb6d52e9d084d2bc7e3d687e5b68c Mon Sep 17 00:00:00 2001 From: Rafal Leszko Date: Mon, 18 May 2026 14:10:40 +0000 Subject: [PATCH 8/9] refactor(audio_track): drop multi-track registry, use single ref Scope serves at most one session at a time, so the ``WeakSet`` + lock + iteration in the playhead registry are over-engineered. Replace them with a single module-level optional reference that each new ``AudioProcessingTrack`` overwrites in ``__init__``. The ``readyState != "live"`` check still guards the brief gap between a session ending and the next one starting. Drops the ``threading`` and ``weakref`` imports added by this PR. Co-Authored-By: Claude Opus 4.7 (1M context) Signed-off-by: Rafal Leszko --- src/scope/server/audio_track.py | 32 ++++++++++++++------------------ 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/src/scope/server/audio_track.py b/src/scope/server/audio_track.py index 7e6e4632c..81730a2cd 100644 --- a/src/scope/server/audio_track.py +++ b/src/scope/server/audio_track.py @@ -2,9 +2,7 @@ import collections import fractions import logging -import threading import time -import weakref import numpy as np from aiortc import MediaStreamTrack @@ -27,25 +25,23 @@ AUDIO_MAX_BUFFER_SAMPLES = AUDIO_CLOCK_RATE * 60 -# Registry of live audio tracks so graph nodes that need the playhead (e.g. -# DEMON's StreamVAEDecode skip gate, which mirrors the realtime demo's -# ``audio_eng.position / SAMPLE_RATE``) can query it without a hard -# dependency on FrameProcessor. Weak refs so closed tracks drop out. -_PLAYHEAD_LOCK = threading.Lock() -_PLAYHEAD_TRACKS: "weakref.WeakSet[AudioProcessingTrack]" = weakref.WeakSet() +# Playhead handle for graph nodes that need the current audio position +# (e.g. DEMON's StreamVAEDecode skip gate, which mirrors the realtime +# demo's ``audio_eng.position / SAMPLE_RATE``). Scope serves one session +# at a time, so a single optional reference is enough — each new +# AudioProcessingTrack overwrites it during __init__. +_current_track: "AudioProcessingTrack | None" = None def get_current_playhead_seconds() -> float | None: - """Return the playhead position of the first live audio track, in seconds. + """Playhead of the live audio track in seconds, or None if none is live. - Returns None if no track is registered yet or none are live. Callers - should treat None as "skip gate disabled this tick". + Callers should treat None as "skip gate disabled this tick". """ - with _PLAYHEAD_LOCK: - for track in _PLAYHEAD_TRACKS: - if track.readyState == "live": - return track.playhead_seconds - return None + track = _current_track + if track is None or track.readyState != "live": + return None + return track.playhead_seconds class AudioProcessingTrack(MediaStreamTrack): @@ -86,8 +82,8 @@ def __init__( # means "no PTS reference yet" — the next valid PTS sets it. self._next_expected_pts: int | None = None - with _PLAYHEAD_LOCK: - _PLAYHEAD_TRACKS.add(self) + global _current_track + _current_track = self @property def playhead_seconds(self) -> float: From 3693ee3da7d070e53470c75b696b0eaf7938301d Mon Sep 17 00:00:00 2001 From: Rafal Leszko Date: Mon, 18 May 2026 14:24:57 +0000 Subject: [PATCH 9/9] fix(webrtc): keep registry audio when a graph has no audio port MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The graph-aware modality detection added in this PR was REPLACING the registry result, which broke pipelines that produce audio internally without exposing it as a graph port — most notably LTX-2. In graph mode, the graph reports ``(video, no audio)`` because LTX-2's audio output isn't on a port, and the previous code dropped ``produces_audio`` to False, so WebRTC never created an audio track. Fix: start from the registry (it knows about internal audio), then OR in whatever the graph adds. Only drop video when the graph is *explicitly* audio-only — that's the DEMON case where ``pipeline_ids`` can be stale ``longlive`` from a previous workflow load. This mirrors the logic already used in mcp_router.py. Co-Authored-By: Claude Opus 4.7 (1M context) Signed-off-by: Rafal Leszko --- src/scope/server/webrtc.py | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/src/scope/server/webrtc.py b/src/scope/server/webrtc.py index 49f2b4b02..0f021cff2 100644 --- a/src/scope/server/webrtc.py +++ b/src/scope/server/webrtc.py @@ -431,11 +431,15 @@ async def handle_offer( # Create NotificationSender for this session to send notifications to the frontend notification_sender = NotificationSender() - # Determine media modalities. When a graph is present with at least - # one sink, the graph's sink edges are authoritative — pipeline_ids - # may be stale from a previous workflow load (e.g. lingering - # ``longlive`` while the current graph is audio-only DEMON). + # Determine media modalities. Start from the registry (pipelines + # like LTX-2 produce audio internally without exposing a graph + # port). When a graph is present, add anything the graph declares + # via sink edges, and drop video only if the graph is explicitly + # audio-only — that's the DEMON case where ``pipeline_ids`` can + # be a stale ``longlive`` from a previous workflow load. pipeline_ids = initial_parameters.get("pipeline_ids", []) + produces_video = NodeRegistry.chain_produces_video(pipeline_ids) + produces_audio = NodeRegistry.chain_produces_audio(pipeline_ids) graph_data = initial_parameters.get("graph") graph_config = ( GraphConfig.model_validate(graph_data) @@ -443,10 +447,12 @@ async def handle_offer( else None ) if graph_config is not None and graph_config.get_sink_node_ids(): - produces_video, produces_audio = graph_config.get_sink_modalities() - else: - produces_video = NodeRegistry.chain_produces_video(pipeline_ids) - produces_audio = NodeRegistry.chain_produces_audio(pipeline_ids) + graph_video, graph_audio = graph_config.get_sink_modalities() + produces_audio = produces_audio or graph_audio + if graph_audio and not graph_video: + produces_video = False + else: + produces_video = produces_video or graph_video # Parse graph from initial parameters to find sink/source/record node IDs (