From 51a983029d88b6bc1865f1ea6b262cc7c351b9cb Mon Sep 17 00:00:00 2001 From: emranemran Date: Thu, 16 Apr 2026 11:46:03 -0700 Subject: [PATCH 1/3] feat: add Kafka event publishing to livepeer fal wrapper Publish lifecycle events from the fal runner proxy (src/scope/cloud/livepeer_fal_app.py) to Kafka for cloud session observability. - Adds a KafkaPublisher wrapping aiokafka with env-var-based config (KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_SASL_USERNAME, KAFKA_SASL_PASSWORD) and SASL_SSL when credentials are set. - Forwards those env vars to the runner subprocess and installs aiokafka via --extra kafka so scope.server.kafka_publisher in the runner can also emit events. - Publishes websocket_connected on /ws accept and websocket_disconnected on teardown, including GPU type, fal region/runner/log labels, duration, and session start/end timestamps. Initial implementation intercepts the first client message in the proxy to learn manifest_id / user_id; a follow-up commit replaces that with header extraction so no message parsing is needed. Signed-off-by: emranemran Co-Authored-By: Claude Opus 4.7 (1M context) --- src/scope/cloud/livepeer_fal_app.py | 273 +++++++++++++++++++++++++++- 1 file changed, 271 insertions(+), 2 deletions(-) diff --git a/src/scope/cloud/livepeer_fal_app.py b/src/scope/cloud/livepeer_fal_app.py index 3a48ed49d..6c5bc5612 100644 --- a/src/scope/cloud/livepeer_fal_app.py +++ b/src/scope/cloud/livepeer_fal_app.py @@ -28,6 +28,121 @@ RUNNER_FAILURE_WINDOW_SECONDS = 60.0 ASSETS_DIR_PATH = "/tmp/.daydream-scope/assets" + +# --------------------------------------------------------------------------- +# Kafka publisher — matches fal_app.py KafkaPublisher for event parity +# --------------------------------------------------------------------------- + +kafka_publisher: "KafkaPublisher | None" = None + + +class KafkaPublisher: + """Async Kafka event publisher for fal.ai websocket events.""" + + def __init__(self): + self._producer = None + self._started = False + self._topic = None + + async def start(self) -> bool: + """Start the Kafka producer.""" + import json as _json # noqa: F811 + + bootstrap_servers = os.getenv("KAFKA_BOOTSTRAP_SERVERS") + self._topic = os.getenv("KAFKA_TOPIC", "network_events") + sasl_username = os.getenv("KAFKA_SASL_USERNAME") + sasl_password = os.getenv("KAFKA_SASL_PASSWORD") + + print( + f"[Kafka] Starting publisher (KAFKA_BOOTSTRAP_SERVERS={bootstrap_servers})" + ) + if not bootstrap_servers: + print("[Kafka] Not configured, event publishing disabled") + return False + + try: + from aiokafka import AIOKafkaProducer + + config = { + "bootstrap_servers": bootstrap_servers, + "value_serializer": lambda v: _json.dumps(v).encode("utf-8"), + "key_serializer": lambda k: k.encode("utf-8") if k else None, + } + + if sasl_username and sasl_password: + import ssl + + ssl_context = ssl.create_default_context() + config.update( + { + "security_protocol": "SASL_SSL", + "sasl_mechanism": "PLAIN", + "sasl_plain_username": sasl_username, + "sasl_plain_password": sasl_password, + "ssl_context": ssl_context, + } + ) + + self._producer = AIOKafkaProducer(**config) + await self._producer.start() + self._started = True + print(f"[Kafka] Publisher started, topic: {self._topic}") + return True + + except ImportError: + print("[Kafka] aiokafka not installed, Kafka disabled") + return False + except Exception as e: + print(f"[Kafka] Failed to start producer: {e}") + return False + + async def stop(self): + """Stop the Kafka producer.""" + if self._producer and self._started: + try: + await self._producer.stop() + print("[Kafka] Publisher stopped") + except Exception as e: + print(f"[Kafka] Error stopping producer: {e}") + finally: + self._started = False + self._producer = None + + async def publish(self, event_type: str, data: dict) -> bool: + """Publish an event to Kafka.""" + import uuid as _uuid + + if not self._started or not self._producer: + return False + + event_id = str(_uuid.uuid4()) + timestamp_ms = str(int(time.time() * 1000)) + + event = { + "id": event_id, + "type": "stream_trace", + "timestamp": timestamp_ms, + "data": { + "type": event_type, + "client_source": "scope", + "timestamp": timestamp_ms, + **data, + }, + } + + try: + await self._producer.send_and_wait(self._topic, value=event, key=event_id) + print(f"[Kafka] Published event: {event_type}") + return True + except Exception as e: + print(f"[Kafka] Failed to publish event {event_type}: {e}") + return False + + @property + def is_running(self) -> bool: + return self._started + + # Gates startup cleanup so only one cleanup run executes at a time. _cleanup_event: asyncio.Event | None = None @@ -148,6 +263,8 @@ def _build_runner_command() -> list[str]: "run", "--extra", "livepeer", + "--extra", + "kafka", "livepeer-runner", "--host", RUNNER_HOST, @@ -156,18 +273,81 @@ def _build_runner_command() -> list[str]: ] -async def _proxy_ws(client_ws: WebSocket) -> None: +async def _handle_first_client_text(text_data: str, metadata: dict | None) -> None: + """Parse the first client text message, print manifest_id, publish connected. + + Best-effort — any parsing or publishing failure is logged with the + ``[KAFKA-DEBUG]`` prefix but never raises, so the proxy loop is unaffected. + """ + import json + + if metadata is None: + return + + try: + parsed = json.loads(text_data) + except (json.JSONDecodeError, TypeError) as exc: + print(f"[KAFKA-DEBUG] First client text not JSON: {exc}") + return + + manifest_id = parsed.get("manifest_id") if isinstance(parsed, dict) else None + params = parsed.get("params") if isinstance(parsed, dict) else None + user_id = params.get("daydream_user_id") if isinstance(params, dict) else None + + metadata["manifest_id"] = manifest_id + metadata["user_id"] = user_id + + print( + f"[KAFKA-DEBUG] First client message parsed manifest_id={manifest_id} " + f"user_id={user_id}" + ) + + if kafka_publisher is None or not kafka_publisher.is_running: + print("[KAFKA-DEBUG] Skipping websocket_connected: Kafka not running") + return + + connection_info = metadata.get("connection_info") or {} + ok = await kafka_publisher.publish( + "websocket_connected", + { + "user_id": user_id, + "connection_id": manifest_id, + "connection_info": connection_info, + }, + ) + print(f"[KAFKA-DEBUG] websocket_connected publish result: ok={ok}") + + +async def _proxy_ws(client_ws: WebSocket, metadata: dict | None = None) -> None: """Connect to the local runner and proxy traffic bidirectionally. + If *metadata* is provided, the first client→runner text message is + parsed (best-effort) to extract ``manifest_id`` and ``user_id`` and a + ``websocket_connected`` Kafka event is published inline. + Raises WebSocketDisconnect if the client disconnects. Returns normally if the runner connection drops. """ + import websockets from websockets.exceptions import ConnectionClosed + # NOTE: Previously we sniffed the two-message handshake explicitly + # (recv runner ready, then receive client job_info) before starting + # the parallel proxy loop. That is disabled here for debugging — + # keep the proxy transparent and sniff inline in client_to_runner. + # + # async with websockets.connect(RUNNER_LOCAL_WS_URL) as runner_ws: + # ready_msg = await runner_ws.recv() + # ... + # job_msg = await client_ws.receive() + # ... + async with websockets.connect(RUNNER_LOCAL_WS_URL) as runner_ws: + first_client_text_seen = False async def client_to_runner() -> None: + nonlocal first_client_text_seen while True: message = await client_ws.receive() msg_type = message.get("type") @@ -175,6 +355,9 @@ async def client_to_runner() -> None: text_data = message.get("text") bytes_data = message.get("bytes") if text_data is not None: + if not first_client_text_seen: + first_client_text_seen = True + await _handle_first_client_text(text_data, metadata) await runner_ws.send(text_data) elif bytes_data is not None: await runner_ws.send(bytes_data) @@ -228,6 +411,7 @@ class LivepeerScopeApp(fal.App, keep_alive=300): requirements = [ "websockets", "httpx", + "aiokafka", ] def setup(self): @@ -266,6 +450,11 @@ def setup(self): "DAYDREAM_SCOPE_BUNDLED_PLUGINS_FILE", "LIVEPEER_DEBUG", "UV_CACHE_DIR", + # Kafka (for scope.server.kafka_publisher in the runner subprocess) + "KAFKA_BOOTSTRAP_SERVERS", + "KAFKA_TOPIC", + "KAFKA_SASL_USERNAME", + "KAFKA_SASL_PASSWORD", ] runner_env = {k: os.environ[k] for k in env_allowlist if k in os.environ} runner_env.setdefault("UV_CACHE_DIR", "/tmp/uv-cache") @@ -317,6 +506,52 @@ async def websocket_handler(self, client_ws: WebSocket) -> None: await client_ws.accept() + # Initialize Kafka publisher (lazy, once per process) + global kafka_publisher + if kafka_publisher is None: + print("[KAFKA-DEBUG] kafka_publisher is None, initializing") + kafka_publisher = KafkaPublisher() + started = await kafka_publisher.start() + print(f"[KAFKA-DEBUG] KafkaPublisher.start() returned: {started}") + else: + print( + f"[KAFKA-DEBUG] kafka_publisher already initialized " + f"(is_running={kafka_publisher.is_running})" + ) + + # Smoke-test publish: verify basic Kafka write path works end-to-end + # before any real events. Filter logs on "[KAFKA-DEBUG]" to check. + if kafka_publisher is not None and kafka_publisher.is_running: + print("[KAFKA-DEBUG] Attempting smoke-test publish (kafka_smoke_test)") + smoke_ok = await kafka_publisher.publish( + "kafka_smoke_test", + {"note": "livepeer_fal_app websocket_handler startup ping"}, + ) + print(f"[KAFKA-DEBUG] Smoke-test publish result: ok={smoke_ok}") + else: + print("[KAFKA-DEBUG] Skipping smoke-test publish: Kafka not running") + + connection_start_time = time.time() + metadata: dict = {} + + import json + + fal_log_labels_raw = os.getenv("FAL_LOG_LABELS", "unknown") + try: + fal_log_labels = json.loads(fal_log_labels_raw) + except (json.JSONDecodeError, TypeError): + fal_log_labels = fal_log_labels_raw + + connection_info = { + "gpu_type": LivepeerScopeApp.machine_type, + "fal_region": os.getenv("NOMAD_DC", "unknown"), + "fal_runner_id": os.getenv( + "FAL_JOB_ID", os.getenv("FAL_RUNNER_ID", "unknown") + ), + "fal_log_labels": fal_log_labels, + } + metadata["connection_info"] = connection_info + # Ensure any previous session data is cleaned up event = _get_cleanup_event() await event.wait() @@ -328,7 +563,7 @@ async def websocket_handler(self, client_ws: WebSocket) -> None: while True: print(f"Connecting proxy to runner websocket at {RUNNER_LOCAL_WS_URL}") try: - await _proxy_ws(client_ws) + await _proxy_ws(client_ws, metadata=metadata) except ( ConnectionClosed, InvalidStatus, @@ -337,6 +572,10 @@ async def websocket_handler(self, client_ws: WebSocket) -> None: ) as exc: print(f"Livepeer fal ws runner connection failed: {exc}") + # websocket_connected is published inline from + # _handle_first_client_text the first time the client sends a + # text message through the proxy — no post-proxy publish here. + now = time.monotonic() cutoff = now - RUNNER_FAILURE_WINDOW_SECONDS failure_timestamps.append(now) @@ -358,6 +597,36 @@ async def websocket_handler(self, client_ws: WebSocket) -> None: except Exception as exc: print(f"Livepeer fal ws proxy error: {type(exc).__name__}: {exc}") finally: + # Publish websocket_disconnected event + if kafka_publisher and kafka_publisher.is_running: + end_time = time.time() + elapsed_ms = int((end_time - connection_start_time) * 1000) + print( + "[KAFKA-DEBUG] Attempting websocket_disconnected publish " + f"(manifest_id={metadata.get('manifest_id')} " + f"user_id={metadata.get('user_id')} " + f"duration_ms={elapsed_ms})" + ) + disc_ok = await kafka_publisher.publish( + "websocket_disconnected", + { + "user_id": metadata.get("user_id"), + "connection_id": metadata.get("manifest_id"), + "connection_info": connection_info, + "duration_ms": elapsed_ms, + "session_start_time_ms": int(connection_start_time * 1000), + "session_end_time_ms": int(end_time * 1000), + }, + ) + print( + f"[KAFKA-DEBUG] websocket_disconnected publish result: ok={disc_ok}" + ) + else: + print( + "[KAFKA-DEBUG] Skipping websocket_disconnected: " + "Kafka publisher not running" + ) + await run_cleanup() with suppress(Exception): await client_ws.close() From cbfecc635e9a4c2bdf0b0a59deb7b1c8aa6b80c6 Mon Sep 17 00:00:00 2001 From: Josh Allmann Date: Sat, 18 Apr 2026 22:06:42 -0700 Subject: [PATCH 2/3] Extract manifest_id / user_id from websocket headers Use headers instead of sniffing websocket messages. --- src/scope/cloud/livepeer_fal_app.py | 93 +++++++---------------------- 1 file changed, 22 insertions(+), 71 deletions(-) diff --git a/src/scope/cloud/livepeer_fal_app.py b/src/scope/cloud/livepeer_fal_app.py index 6c5bc5612..8cfad9dc6 100644 --- a/src/scope/cloud/livepeer_fal_app.py +++ b/src/scope/cloud/livepeer_fal_app.py @@ -273,58 +273,9 @@ def _build_runner_command() -> list[str]: ] -async def _handle_first_client_text(text_data: str, metadata: dict | None) -> None: - """Parse the first client text message, print manifest_id, publish connected. - - Best-effort — any parsing or publishing failure is logged with the - ``[KAFKA-DEBUG]`` prefix but never raises, so the proxy loop is unaffected. - """ - import json - - if metadata is None: - return - - try: - parsed = json.loads(text_data) - except (json.JSONDecodeError, TypeError) as exc: - print(f"[KAFKA-DEBUG] First client text not JSON: {exc}") - return - - manifest_id = parsed.get("manifest_id") if isinstance(parsed, dict) else None - params = parsed.get("params") if isinstance(parsed, dict) else None - user_id = params.get("daydream_user_id") if isinstance(params, dict) else None - - metadata["manifest_id"] = manifest_id - metadata["user_id"] = user_id - - print( - f"[KAFKA-DEBUG] First client message parsed manifest_id={manifest_id} " - f"user_id={user_id}" - ) - - if kafka_publisher is None or not kafka_publisher.is_running: - print("[KAFKA-DEBUG] Skipping websocket_connected: Kafka not running") - return - - connection_info = metadata.get("connection_info") or {} - ok = await kafka_publisher.publish( - "websocket_connected", - { - "user_id": user_id, - "connection_id": manifest_id, - "connection_info": connection_info, - }, - ) - print(f"[KAFKA-DEBUG] websocket_connected publish result: ok={ok}") - - -async def _proxy_ws(client_ws: WebSocket, metadata: dict | None = None) -> None: +async def _proxy_ws(client_ws: WebSocket) -> None: """Connect to the local runner and proxy traffic bidirectionally. - If *metadata* is provided, the first client→runner text message is - parsed (best-effort) to extract ``manifest_id`` and ``user_id`` and a - ``websocket_connected`` Kafka event is published inline. - Raises WebSocketDisconnect if the client disconnects. Returns normally if the runner connection drops. """ @@ -332,22 +283,9 @@ async def _proxy_ws(client_ws: WebSocket, metadata: dict | None = None) -> None: import websockets from websockets.exceptions import ConnectionClosed - # NOTE: Previously we sniffed the two-message handshake explicitly - # (recv runner ready, then receive client job_info) before starting - # the parallel proxy loop. That is disabled here for debugging — - # keep the proxy transparent and sniff inline in client_to_runner. - # - # async with websockets.connect(RUNNER_LOCAL_WS_URL) as runner_ws: - # ready_msg = await runner_ws.recv() - # ... - # job_msg = await client_ws.receive() - # ... - async with websockets.connect(RUNNER_LOCAL_WS_URL) as runner_ws: - first_client_text_seen = False async def client_to_runner() -> None: - nonlocal first_client_text_seen while True: message = await client_ws.receive() msg_type = message.get("type") @@ -355,9 +293,6 @@ async def client_to_runner() -> None: text_data = message.get("text") bytes_data = message.get("bytes") if text_data is not None: - if not first_client_text_seen: - first_client_text_seen = True - await _handle_first_client_text(text_data, metadata) await runner_ws.send(text_data) elif bytes_data is not None: await runner_ws.send(bytes_data) @@ -533,6 +468,14 @@ async def websocket_handler(self, client_ws: WebSocket) -> None: connection_start_time = time.time() metadata: dict = {} + manifest_id = client_ws.headers.get("manifest-id") + user_id = client_ws.headers.get("daydream-user-id") + metadata["manifest_id"] = manifest_id + metadata["user_id"] = user_id + print( + f"[KAFKA-DEBUG] Handshake headers manifest_id={manifest_id} " + f"user_id={user_id}" + ) import json @@ -551,6 +494,18 @@ async def websocket_handler(self, client_ws: WebSocket) -> None: "fal_log_labels": fal_log_labels, } metadata["connection_info"] = connection_info + if kafka_publisher is None or not kafka_publisher.is_running: + print("[KAFKA-DEBUG] Skipping websocket_connected: Kafka not running") + else: + ok = await kafka_publisher.publish( + "websocket_connected", + { + "user_id": user_id, + "connection_id": manifest_id, + "connection_info": connection_info, + }, + ) + print(f"[KAFKA-DEBUG] websocket_connected publish result: ok={ok}") # Ensure any previous session data is cleaned up event = _get_cleanup_event() @@ -563,7 +518,7 @@ async def websocket_handler(self, client_ws: WebSocket) -> None: while True: print(f"Connecting proxy to runner websocket at {RUNNER_LOCAL_WS_URL}") try: - await _proxy_ws(client_ws, metadata=metadata) + await _proxy_ws(client_ws) except ( ConnectionClosed, InvalidStatus, @@ -572,10 +527,6 @@ async def websocket_handler(self, client_ws: WebSocket) -> None: ) as exc: print(f"Livepeer fal ws runner connection failed: {exc}") - # websocket_connected is published inline from - # _handle_first_client_text the first time the client sends a - # text message through the proxy — no post-proxy publish here. - now = time.monotonic() cutoff = now - RUNNER_FAILURE_WINDOW_SECONDS failure_timestamps.append(now) From af5d47b50e746cae6dcbcdf0cf301af0ef95b9dc Mon Sep 17 00:00:00 2001 From: emranemran Date: Sun, 19 Apr 2026 21:13:32 -0700 Subject: [PATCH 3/3] chore: strip smoke-test and [KAFKA-DEBUG] prints Remove the one-time Kafka smoke-test publish and the print() tracing that were added while bootstrapping the Kafka path. The KafkaPublisher class retains its own [Kafka] structured logs; actual websocket_connected and websocket_disconnected events still publish as before. Co-Authored-By: Claude Opus 4.7 (1M context) Signed-off-by: emranemran --- src/scope/cloud/livepeer_fal_app.py | 51 +++-------------------------- 1 file changed, 5 insertions(+), 46 deletions(-) diff --git a/src/scope/cloud/livepeer_fal_app.py b/src/scope/cloud/livepeer_fal_app.py index 8cfad9dc6..d0a57ee4b 100644 --- a/src/scope/cloud/livepeer_fal_app.py +++ b/src/scope/cloud/livepeer_fal_app.py @@ -441,30 +441,11 @@ async def websocket_handler(self, client_ws: WebSocket) -> None: await client_ws.accept() - # Initialize Kafka publisher (lazy, once per process) + # Initialize Kafka publisher (lazy, once per process). global kafka_publisher if kafka_publisher is None: - print("[KAFKA-DEBUG] kafka_publisher is None, initializing") kafka_publisher = KafkaPublisher() - started = await kafka_publisher.start() - print(f"[KAFKA-DEBUG] KafkaPublisher.start() returned: {started}") - else: - print( - f"[KAFKA-DEBUG] kafka_publisher already initialized " - f"(is_running={kafka_publisher.is_running})" - ) - - # Smoke-test publish: verify basic Kafka write path works end-to-end - # before any real events. Filter logs on "[KAFKA-DEBUG]" to check. - if kafka_publisher is not None and kafka_publisher.is_running: - print("[KAFKA-DEBUG] Attempting smoke-test publish (kafka_smoke_test)") - smoke_ok = await kafka_publisher.publish( - "kafka_smoke_test", - {"note": "livepeer_fal_app websocket_handler startup ping"}, - ) - print(f"[KAFKA-DEBUG] Smoke-test publish result: ok={smoke_ok}") - else: - print("[KAFKA-DEBUG] Skipping smoke-test publish: Kafka not running") + await kafka_publisher.start() connection_start_time = time.time() metadata: dict = {} @@ -472,10 +453,6 @@ async def websocket_handler(self, client_ws: WebSocket) -> None: user_id = client_ws.headers.get("daydream-user-id") metadata["manifest_id"] = manifest_id metadata["user_id"] = user_id - print( - f"[KAFKA-DEBUG] Handshake headers manifest_id={manifest_id} " - f"user_id={user_id}" - ) import json @@ -494,10 +471,8 @@ async def websocket_handler(self, client_ws: WebSocket) -> None: "fal_log_labels": fal_log_labels, } metadata["connection_info"] = connection_info - if kafka_publisher is None or not kafka_publisher.is_running: - print("[KAFKA-DEBUG] Skipping websocket_connected: Kafka not running") - else: - ok = await kafka_publisher.publish( + if kafka_publisher is not None and kafka_publisher.is_running: + await kafka_publisher.publish( "websocket_connected", { "user_id": user_id, @@ -505,7 +480,6 @@ async def websocket_handler(self, client_ws: WebSocket) -> None: "connection_info": connection_info, }, ) - print(f"[KAFKA-DEBUG] websocket_connected publish result: ok={ok}") # Ensure any previous session data is cleaned up event = _get_cleanup_event() @@ -548,17 +522,10 @@ async def websocket_handler(self, client_ws: WebSocket) -> None: except Exception as exc: print(f"Livepeer fal ws proxy error: {type(exc).__name__}: {exc}") finally: - # Publish websocket_disconnected event if kafka_publisher and kafka_publisher.is_running: end_time = time.time() elapsed_ms = int((end_time - connection_start_time) * 1000) - print( - "[KAFKA-DEBUG] Attempting websocket_disconnected publish " - f"(manifest_id={metadata.get('manifest_id')} " - f"user_id={metadata.get('user_id')} " - f"duration_ms={elapsed_ms})" - ) - disc_ok = await kafka_publisher.publish( + await kafka_publisher.publish( "websocket_disconnected", { "user_id": metadata.get("user_id"), @@ -569,14 +536,6 @@ async def websocket_handler(self, client_ws: WebSocket) -> None: "session_end_time_ms": int(end_time * 1000), }, ) - print( - f"[KAFKA-DEBUG] websocket_disconnected publish result: ok={disc_ok}" - ) - else: - print( - "[KAFKA-DEBUG] Skipping websocket_disconnected: " - "Kafka publisher not running" - ) await run_cleanup() with suppress(Exception):