diff --git a/workflow/dynamic_node.go b/workflow/dynamic_node.go index 496b30a12..9662071f5 100644 --- a/workflow/dynamic_node.go +++ b/workflow/dynamic_node.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "iter" + "sync" "github.com/google/jsonschema-go/jsonschema" @@ -186,8 +187,16 @@ func (n *dynamicNode[IN, OUT]) composePath(parent NodeContext) string { // When yield returns false without ctx cancellation (no current // consumer triggers this, but the contract must not depend on it), // return context.Canceled as a stand-in. +// +// A single mutex serializes yield: a DynamicFn may run concurrent +// children (see WithUseSubBranch) that all emit through this one +// callback, and calling the same yield from multiple goroutines panics +// the iterator and races the parent runNode's completion accumulator. func makeEmit(yield func(*session.Event, error) bool, parentCtx NodeContext) func(*session.Event) error { + var mu sync.Mutex return func(ev *session.Event) error { + mu.Lock() + defer mu.Unlock() if err := parentCtx.Err(); err != nil { return err } diff --git a/workflow/run_node_test.go b/workflow/run_node_test.go index b8a80691f..0ed8cb724 100644 --- a/workflow/run_node_test.go +++ b/workflow/run_node_test.go @@ -18,6 +18,7 @@ import ( "errors" "iter" "reflect" + "strconv" "strings" "sync" "testing" @@ -633,3 +634,76 @@ func (n *countingStubNode) runCount() int { defer n.mu.Unlock() return n.calls } + +// TestRunNode_ConcurrentChildren_NoRace runs several children from +// separate goroutines (the WithUseSubBranch pattern). They all emit +// through one shared yield, so without serialization this races the +// range-over-func loop state and panics the iterator. A start gate +// releases the goroutines together to maximize the overlap. Each +// goroutine recovers its panic so the failure is reported even without +// -race; -race is the reliable signal. +func TestRunNode_ConcurrentChildren_NoRace(t *testing.T) { + const n = 8 + + var ( + mu sync.Mutex + panics []any + ) + errs := make([]error, n) + + orch := NewDynamicNode[string, string]( + "orch", + func(ctx NodeContext, _ string, _ func(*session.Event) error) (string, error) { + start := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(n) + for i := 0; i < n; i++ { + // Distinct child + run-id per goroutine so the only shared + // mutable state is the parent yield path; a shared run-id + // would instead exercise the idempotency-cache race. + child := newStubNode("child", "out") + go func(i int) { + defer wg.Done() + defer func() { + if r := recover(); r != nil { + mu.Lock() + panics = append(panics, r) + mu.Unlock() + } + }() + <-start + _, errs[i] = RunNode[string](ctx, child, nil, + WithUseSubBranch(), WithRunID("c"+strconv.Itoa(i))) + }(i) + } + close(start) + wg.Wait() + return "done", errors.Join(errs...) + }, + NodeConfig{}, + ) + + events, err := drainDynamicWithErr(t, orch, "") + + mu.Lock() + gotPanics := append([]any(nil), panics...) + mu.Unlock() + if len(gotPanics) > 0 { + t.Fatalf("%d recovered panic(s) from concurrent children sharing one yield; first: %v", + len(gotPanics), gotPanics[0]) + } + if err != nil { + t.Fatalf("orchestrator error: %v", err) + } + + // n child outputs forwarded up + the parent's own terminal output. + outputs := 0 + for _, ev := range events { + if ev.Output != nil { + outputs++ + } + } + if want := n + 1; outputs != want { + t.Errorf("output-bearing events = %d, want %d", outputs, want) + } +}