-
Notifications
You must be signed in to change notification settings - Fork 32
Open
Labels
enhancementNew feature or requestNew feature or request
Description
What the backend actually does today (why “tiny nodes” still cost ~80–200ms)
Even when you execute a single trivial node (concat/add), the worker goes through the full job infrastructure:
- WS receive + decode + Pydantic validation (small but non-zero)
- Create ProcessingContext
- Create persistent run state in the DB (multiple DB writes)
- Start a new job execution
-
- In threaded mode, it still creates a new dedicated thread + event loop per job
- Run the workflow runner (your actual node compute; for concat/add this is basically negligible)
- Stream messages back to the client using a polling loop with a default 10ms sleep when no messages (adds jitter)
You can see the biggest recurring costs directly in code:
- Polling sleep jitter (up to ~10ms per “wait for next message”):
unified_websocket_runner.pyLines 216-237
async def process_workflow_messages( job_execution: JobExecution, sleep_interval: float = 0.01, explicit_types: bool = False,) -> AsyncGenerator[dict[str, Any], None]: ... while job_execution.is_running(): if job_execution.context.has_messages(): async for msg in process_message(job_execution.context, explicit_types): yield msg else: await asyncio.sleep(sleep_interval)
- DB writes on every run (RunState.create_run, then mark “running”):
job_execution_manager.pyLines 121-156
# 1. Create RunState record (DB source of truth)async with ResourceScope(): run_state = await RunState.create_run( run_id=run_id, execution_strategy=request.execution_strategy.value, worker_id=worker_id, ) run_state.execution_id = execution_id await run_state.save()...# Update run state to runningasync with ResourceScope(): run_state = await RunState.get(run_id) run_state.status = "running" await run_state.save()
- Thread/event-loop creation per job (thread start cost + OS scheduling variance):
threaded_job_execution.pyLines 210-234
runner = WorkflowRunner(job_id=job_id)# Create persistent event loop for this jobevent_loop = ThreadedEventLoop()event_loop.start()# Create the job record in databasejob_model = Job(...)async with ResourceScope(): await job_model.save()
Why 0.08s sometimes, 0.2s other times?
That variance is exactly what you’d expect from:
- SQLite/file-backed DB commits + ResourceScope acquisition (can vary with disk/cache)
- Thread creation/scheduling (can vary with OS load)
- 10ms polling interval jitter in the message streamer
- Occasional GC / logging overhead / background tasks
Theoretical improvements (future ideas) + “ideal” latency floors
If you keep the same architecture (WS + job records + per-job threads), you won’t get “1ms” runs—most time isn’t the node, it’s the framework around it.
What would move the needle most:
- Fast path for ephemeral node runs (skip persistence)
Avoid creating RunState/Job rows for “fire-and-forget local runs”. This is likely the single biggest latency win. - Reuse execution infrastructure
Instead of one thread+event-loop per job, use a shared event loop / pool for lightweight jobs. - Event-driven message streaming
Replace polling (sleep_interval=0.01) with “await next message” semantics → reduces the ~0–10ms jitter (and more importantly removes extra wakeups).
Rough “best case” numbers (order-of-magnitude)
- Ideal architecture (in-process direct call, no DB, no per-job thread, no polling): ~2–15ms overhead + node compute
- Still WS but optimized (no DB + shared runner + event-driven streaming): ~10–40ms typical
- Keep DB persistence and per-job thread creation: hard to get below ~50–150ms typical on older hardware, and you’ll still see spikes
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or request