From 3ec858145ef2fba8436c60d313c5c1b898e59d1f Mon Sep 17 00:00:00 2001 From: wolo Date: Mon, 15 Jun 2026 20:50:21 +0000 Subject: [PATCH 1/2] fix(workflow): serialize dynamic-node emit across concurrent children MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A DynamicFn may run children concurrently (the documented WithUseSubBranch pattern), and every child forwards its events up through one shared emit callback (makeEmit, wrapping the parent's single yield). With no synchronization, concurrent children call the same yield at once, which panics the range-over-func iterator and races the parent runNode's completion accumulator. Guard emit with a per-activation mutex so all yields — from the DynamicFn's own emit and from RunNode via the sub-scheduler — are serialized. Add a -race regression test that fans children out across goroutines; it panics/races on the unpatched code. --- workflow/dynamic_node.go | 16 +++++++-- workflow/run_node_test.go | 74 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 88 insertions(+), 2 deletions(-) diff --git a/workflow/dynamic_node.go b/workflow/dynamic_node.go index 496b30a12..163d51e46 100644 --- a/workflow/dynamic_node.go +++ b/workflow/dynamic_node.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "iter" + "sync" "github.com/google/jsonschema-go/jsonschema" @@ -104,7 +105,11 @@ func (n *dynamicNode[IN, OUT]) Run(ctx agent.Context, input any) iter.Seq2[*sess return } - emit := makeEmit(yield, ctx) + // One mutex serializes every yield for this activation: the emit + // passed to the DynamicFn and the same emit driven by RunNode via + // the sub-scheduler. Concurrent children must not yield at once. + var emitMu sync.Mutex + emit := makeEmit(yield, ctx, &emitMu) sub := newDynamicSubScheduler(ctx, n.composePath(ctx), emit) orchestratorCtx := newDynamicNodeContext(ctx, sub.ParentPath(), "", sub, sub.OutputForAncestors()) @@ -186,8 +191,15 @@ func (n *dynamicNode[IN, OUT]) composePath(parent NodeContext) string { // When yield returns false without ctx cancellation (no current // consumer triggers this, but the contract must not depend on it), // return context.Canceled as a stand-in. -func makeEmit(yield func(*session.Event, error) bool, parentCtx NodeContext) func(*session.Event) error { +// +// mu serializes yield: a DynamicFn may run concurrent children (see +// WithUseSubBranch) that all emit through this one callback, and calling +// the same yield from multiple goroutines panics the iterator and races +// the parent runNode's completion accumulator. +func makeEmit(yield func(*session.Event, error) bool, parentCtx NodeContext, mu *sync.Mutex) func(*session.Event) error { return func(ev *session.Event) error { + mu.Lock() + defer mu.Unlock() if err := parentCtx.Err(); err != nil { return err } diff --git a/workflow/run_node_test.go b/workflow/run_node_test.go index b8a80691f..0ed8cb724 100644 --- a/workflow/run_node_test.go +++ b/workflow/run_node_test.go @@ -18,6 +18,7 @@ import ( "errors" "iter" "reflect" + "strconv" "strings" "sync" "testing" @@ -633,3 +634,76 @@ func (n *countingStubNode) runCount() int { defer n.mu.Unlock() return n.calls } + +// TestRunNode_ConcurrentChildren_NoRace runs several children from +// separate goroutines (the WithUseSubBranch pattern). They all emit +// through one shared yield, so without serialization this races the +// range-over-func loop state and panics the iterator. A start gate +// releases the goroutines together to maximize the overlap. Each +// goroutine recovers its panic so the failure is reported even without +// -race; -race is the reliable signal. +func TestRunNode_ConcurrentChildren_NoRace(t *testing.T) { + const n = 8 + + var ( + mu sync.Mutex + panics []any + ) + errs := make([]error, n) + + orch := NewDynamicNode[string, string]( + "orch", + func(ctx NodeContext, _ string, _ func(*session.Event) error) (string, error) { + start := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(n) + for i := 0; i < n; i++ { + // Distinct child + run-id per goroutine so the only shared + // mutable state is the parent yield path; a shared run-id + // would instead exercise the idempotency-cache race. + child := newStubNode("child", "out") + go func(i int) { + defer wg.Done() + defer func() { + if r := recover(); r != nil { + mu.Lock() + panics = append(panics, r) + mu.Unlock() + } + }() + <-start + _, errs[i] = RunNode[string](ctx, child, nil, + WithUseSubBranch(), WithRunID("c"+strconv.Itoa(i))) + }(i) + } + close(start) + wg.Wait() + return "done", errors.Join(errs...) + }, + NodeConfig{}, + ) + + events, err := drainDynamicWithErr(t, orch, "") + + mu.Lock() + gotPanics := append([]any(nil), panics...) + mu.Unlock() + if len(gotPanics) > 0 { + t.Fatalf("%d recovered panic(s) from concurrent children sharing one yield; first: %v", + len(gotPanics), gotPanics[0]) + } + if err != nil { + t.Fatalf("orchestrator error: %v", err) + } + + // n child outputs forwarded up + the parent's own terminal output. + outputs := 0 + for _, ev := range events { + if ev.Output != nil { + outputs++ + } + } + if want := n + 1; outputs != want { + t.Errorf("output-bearing events = %d, want %d", outputs, want) + } +} From 636f217356a3645f006923561c9f1b34564bafb9 Mon Sep 17 00:00:00 2001 From: wolo Date: Tue, 16 Jun 2026 11:43:33 +0000 Subject: [PATCH 2/2] fix(workflow): move emit mutex into makeEmit Address review nit: the per-activation mutex is owned solely by the emit closure, so create it inside makeEmit instead of passing it in. Simplifies the makeEmit prototype and removes the caller-side mutex plumbing. No behavior change. --- workflow/dynamic_node.go | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/workflow/dynamic_node.go b/workflow/dynamic_node.go index 163d51e46..9662071f5 100644 --- a/workflow/dynamic_node.go +++ b/workflow/dynamic_node.go @@ -105,11 +105,7 @@ func (n *dynamicNode[IN, OUT]) Run(ctx agent.Context, input any) iter.Seq2[*sess return } - // One mutex serializes every yield for this activation: the emit - // passed to the DynamicFn and the same emit driven by RunNode via - // the sub-scheduler. Concurrent children must not yield at once. - var emitMu sync.Mutex - emit := makeEmit(yield, ctx, &emitMu) + emit := makeEmit(yield, ctx) sub := newDynamicSubScheduler(ctx, n.composePath(ctx), emit) orchestratorCtx := newDynamicNodeContext(ctx, sub.ParentPath(), "", sub, sub.OutputForAncestors()) @@ -192,11 +188,12 @@ func (n *dynamicNode[IN, OUT]) composePath(parent NodeContext) string { // consumer triggers this, but the contract must not depend on it), // return context.Canceled as a stand-in. // -// mu serializes yield: a DynamicFn may run concurrent children (see -// WithUseSubBranch) that all emit through this one callback, and calling -// the same yield from multiple goroutines panics the iterator and races -// the parent runNode's completion accumulator. -func makeEmit(yield func(*session.Event, error) bool, parentCtx NodeContext, mu *sync.Mutex) func(*session.Event) error { +// A single mutex serializes yield: a DynamicFn may run concurrent +// children (see WithUseSubBranch) that all emit through this one +// callback, and calling the same yield from multiple goroutines panics +// the iterator and races the parent runNode's completion accumulator. +func makeEmit(yield func(*session.Event, error) bool, parentCtx NodeContext) func(*session.Event) error { + var mu sync.Mutex return func(ev *session.Event) error { mu.Lock() defer mu.Unlock()