diff --git a/workflow/dynamic_scheduler.go b/workflow/dynamic_scheduler.go index 7baec97a3..cfa41d10b 100644 --- a/workflow/dynamic_scheduler.go +++ b/workflow/dynamic_scheduler.go @@ -147,13 +147,21 @@ func newDynamicSubScheduler(parent agent.Context, parentPath string, emitUp func // resumed orchestrator (which re-runs from the top) serves already // completed children from cache instead of re-executing them. Each // child's terminal event carries its childPath in NodeInfo.Path and a -// non-nil Output; keyed by childPath, so only stable WithRunID calls -// hit (auto-counter ids regenerate per activation and miss). +// non-nil Output; keyed by childPath. +// +// Only events from the current invocation are considered. Auto-counter +// run-ids reset to 1 on every fresh activation, so a later user turn +// reuses the same childPath ("/@1") as a prior turn; +// without the invocation filter those stale outputs would be served +// from cache and the child would never re-run on the new turn. Mirrors +// adk-python, which gates rehydration on event.invocation_id (see +// _reconstruct_node_states / _scan_parent_child_sequence). func (s *dynamicSubScheduler) rehydrateCache() { sess := s.parentCtx.Session() if sess == nil { return } + invocationID := s.parentCtx.InvocationID() prefix := s.parentPath + "/" s.mu.Lock() defer s.mu.Unlock() @@ -161,6 +169,9 @@ func (s *dynamicSubScheduler) rehydrateCache() { if ev == nil || ev.Output == nil || ev.NodeInfo == nil { continue } + if invocationID != "" && ev.InvocationID != invocationID { + continue + } if !strings.HasPrefix(ev.NodeInfo.Path, prefix) { continue } diff --git a/workflow/dynamic_scheduler_test.go b/workflow/dynamic_scheduler_test.go index 97ef5bedb..cd207f3ad 100644 --- a/workflow/dynamic_scheduler_test.go +++ b/workflow/dynamic_scheduler_test.go @@ -21,6 +21,7 @@ import ( "strings" "sync" "testing" + "time" "github.com/google/jsonschema-go/jsonschema" "google.golang.org/genai" @@ -185,12 +186,81 @@ func TestSubScheduler_RunNode_ErrorWinsOverInterrupt(t *testing.T) { } } +// TestSubScheduler_RehydrateCache_InvocationScope verifies that +// rehydrateCache serves a child's output from cache only when the +// terminal event belongs to the current invocation. Auto-counter +// run-ids reset to 1 on every fresh activation, so a later user turn +// reuses the same childPath ("wf/child@1"); without the invocation +// filter the prior turn's output would be replayed and the child would +// never re-run — the regression behind the empty second turn of the +// dynamic workflow example. The current-invocation case must still hit +// to preserve within-invocation resume/dedup. +// +// MockInvocationContext.InvocationID() is "test-invocation-id". +func TestSubScheduler_RehydrateCache_InvocationScope(t *testing.T) { + const childPath = "wf/child@1" + event := func(invocationID, output string) *session.Event { + return &session.Event{ + InvocationID: invocationID, + Output: output, + NodeInfo: &session.NodeInfo{Path: childPath}, + } + } + + tests := []struct { + name string + events sliceEvents + wantHit bool + wantValue any + }{ + { + name: "other invocation ignored", + events: sliceEvents{event("prev-invocation", "stale")}, + wantHit: false, + }, + { + name: "current invocation wins over prior", + events: sliceEvents{event("prev-invocation", "stale"), event("test-invocation-id", "fresh")}, + wantHit: true, + wantValue: "fresh", + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctx := newMockCtx(t) + ctx.sess = &eventsSession{events: tc.events} + sub := newDynamicSubScheduler(newNodeContext(ctx, nil), "wf", noopEmit).(*dynamicSubScheduler) + + out, ok := sub.lookupCachedOutput(childPath) + if ok != tc.wantHit { + t.Fatalf("lookupCachedOutput(%q) hit = %v, want %v (out=%v)", childPath, ok, tc.wantHit, out) + } + if ok && out != tc.wantValue { + t.Errorf("cached output = %v, want %v", out, tc.wantValue) + } + }) + } +} + // ============================================================================= // Test fixtures and helpers // ============================================================================= func noopEmit(*session.Event) error { return nil } +// eventsSession is a minimal session.Session exposing a fixed event +// history; only Events() is consulted by rehydrateCache. +type eventsSession struct { + events session.Events +} + +func (s *eventsSession) ID() string { return "test-session" } +func (s *eventsSession) AppName() string { return "test-app" } +func (s *eventsSession) UserID() string { return "test-user" } +func (s *eventsSession) State() session.State { return nil } +func (s *eventsSession) Events() session.Events { return s.events } +func (s *eventsSession) LastUpdateTime() time.Time { return time.Time{} } + func newTopLevelCtx(t *testing.T) agent.Context { t.Helper() return newNodeContext(newMockCtx(t), nil)