diff --git a/apps/backend/internal/agentctl/server/adapter/transport/acp/adapter.go b/apps/backend/internal/agentctl/server/adapter/transport/acp/adapter.go index dae954906..0e4c52ed2 100644 --- a/apps/backend/internal/agentctl/server/adapter/transport/acp/adapter.go +++ b/apps/backend/internal/agentctl/server/adapter/transport/acp/adapter.go @@ -164,6 +164,18 @@ type Adapter struct { mu sync.RWMutex closed bool + // promptGate is a 1-slot semaphore that serializes session/prompt calls so + // at most one is in flight against the bridge at a time. The ScheduleWakeup + // path injects a synthetic prompt via fireWakeup; without this gate it can + // race a user prompt, and the claude-agent-acp bridge then returns each + // prompt's stop_reason against the wrong turn — shifting chat turns one + // prompt behind. A queued synthetic prompt waits here and drains the wakeup + // turn once the in-flight prompt finishes. It is a channel rather than a + // sync.Mutex so the wait honours the caller's context (a wakeup whose + // timeout/lifetime context is cancelled while queued aborts instead of + // blocking on a stuck turn). + promptGate chan struct{} + // lifetimeCtx is cancelled by Close. Background work that may outlive // the call site (e.g. the synthetic wakeup prompt goroutine) derives its // context from this one so it aborts when the adapter shuts down rather @@ -189,6 +201,7 @@ func NewAdapter(cfg *shared.Config, log *logger.Logger) *Adapter { pendingWakeups: make(map[string]*pendingWakeup), usageBySession: make(map[string]*usageTracker), attachMgr: shared.NewAttachmentManager(cfg.WorkDir, l.Zap()), + promptGate: make(chan struct{}, 1), lifetimeCtx: ctx, lifetimeCancel: cancel, } diff --git a/apps/backend/internal/agentctl/server/adapter/transport/acp/adapter_prompt.go b/apps/backend/internal/agentctl/server/adapter/transport/acp/adapter_prompt.go index 2528821bb..0bb4b5ce6 100644 --- a/apps/backend/internal/agentctl/server/adapter/transport/acp/adapter_prompt.go +++ b/apps/backend/internal/agentctl/server/adapter/transport/acp/adapter_prompt.go @@ -16,16 +16,54 @@ import ( // If pending context is set (from SetPendingContext), it will be prepended to the message. // Attachments (images) are converted to ACP ImageBlocks and included in the prompt. // When the prompt completes, a complete event is emitted via the updates channel. +func (a *Adapter) Prompt(ctx context.Context, message string, attachments []v1.MessageAttachment) error { + // A user prompt always targets the current session, so it is not pinned. + return a.sendPrompt(ctx, message, attachments, "") +} + +// sendPrompt serializes session/prompt calls through promptGate and sends one +// prompt to the agent. When expectSession is non-empty the prompt is pinned to +// that session: if the adapter's active session changed (or the adapter closed) +// while this call waited on the gate, the prompt is dropped instead of being +// sent to whatever session is now current. This is the ScheduleWakeup path — +// a wakeup must reach the session it was scheduled for or not at all. // //nolint:cyclop,funlen // pre-existing complexity preserved from adapter.go file split -func (a *Adapter) Prompt(ctx context.Context, message string, attachments []v1.MessageAttachment) error { +func (a *Adapter) sendPrompt(ctx context.Context, message string, attachments []v1.MessageAttachment, expectSession string) error { + // Acquire the prompt gate, honouring ctx so a queued wakeup whose context + // is cancelled (timeout / adapter Close) aborts instead of blocking on a + // stuck in-flight turn. + select { + case a.promptGate <- struct{}{}: + defer func() { <-a.promptGate }() + case <-ctx.Done(): + return ctx.Err() + } + a.mu.Lock() conn := a.acpConn sessionID := a.sessionID - pendingContext := a.pendingContext - a.pendingContext = "" // Clear after use + closed := a.closed + // A pinned prompt that no longer matches the active session (or a closed + // adapter) is dropped. Wakeup-pinned prompts are synthetic turns — they + // must not consume pendingContext reserved for the next user prompt (e.g. + // fork_session resume context). + drop := expectSession != "" && (closed || sessionID != expectSession) + var pendingContext string + if !drop && expectSession == "" { + pendingContext = a.pendingContext + a.pendingContext = "" // Clear after use + } a.mu.Unlock() + if drop { + a.logger.Info("dropping queued wakeup prompt: session changed or adapter closed before it ran", + zap.String("scheduled_for", expectSession), + zap.String("current_session", sessionID), + zap.Bool("closed", closed)) + return nil + } + if conn == nil { return fmt.Errorf("adapter not initialized") } @@ -170,14 +208,15 @@ func (a *Adapter) Prompt(ctx context.Context, message string, attachments []v1.M // turn and emits visible ACP frames. The session must still match (the user // hasn't started a fresh session) and the adapter must not be closed. // -// Concurrent-prompt safety: if a user prompt is already in flight when this -// runs, both end up calling conn.Prompt() on the same ClientSideConnection. -// That's safe at the wire level — the ACP SDK's Connection.sendMessage -// holds a write mutex, so request frames never interleave on stdin, and -// JSON-RPC pairs each response back to its originating request via id — -// but it does mean two prompts can be in flight against the bridge at -// once. The bridge serialises them in the order it receives them, which -// is exactly what we want for a wakeup that races a user message. +// Prompt serialization: the synthetic prompt goes through sendPrompt, which +// gates on promptGate so only one session/prompt is in flight at a time. If a +// user prompt is already running the wakeup waits behind it rather than racing +// it — two concurrent conn.Prompt() calls would let the bridge return each +// prompt's stop_reason against the wrong turn, shifting chat turns one prompt +// behind. Because the wakeup can wait, the session is re-validated inside +// sendPrompt (via the pinned expectSession argument): if a NewSession/LoadSession +// changed the active session while the wakeup queued, it is dropped instead of +// being injected into the new session. func (a *Adapter) fireWakeup(sessionID, prompt string) { a.mu.RLock() closed := a.closed @@ -205,7 +244,9 @@ func (a *Adapter) fireWakeup(sessionID, prompt string) { // prompt instead of letting it run against a dead subprocess. ctx, cancel := context.WithTimeout(a.lifetimeCtx, wakeupPromptTimeout) defer cancel() - if err := a.Prompt(ctx, prompt, nil); err != nil { + // Pin to the scheduled session: if the active session changed while this + // wakeup waited on the prompt gate, sendPrompt drops it. + if err := a.sendPrompt(ctx, prompt, nil, sessionID); err != nil { a.logger.Error("synthetic wakeup prompt failed", zap.String("session_id", sessionID), zap.Error(err)) diff --git a/apps/backend/internal/agentctl/server/adapter/transport/acp/wakeup_concurrency_test.go b/apps/backend/internal/agentctl/server/adapter/transport/acp/wakeup_concurrency_test.go new file mode 100644 index 000000000..77ef21f99 --- /dev/null +++ b/apps/backend/internal/agentctl/server/adapter/transport/acp/wakeup_concurrency_test.go @@ -0,0 +1,259 @@ +package acp + +import ( + "context" + "io" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/coder/acp-go-sdk" + "github.com/kandev/kandev/internal/agentctl/types/streams" +) + +// concurrencyFakeAgent is a minimal acp.Agent whose Prompt handler blocks until +// released, recording how many Prompt calls are in flight simultaneously. It +// lets a test observe whether kandev issues two overlapping conn.Prompt() calls. +type concurrencyFakeAgent struct { + entered chan struct{} // one signal per Prompt entry + release chan struct{} // closed to unblock all parked Prompts + inFlight atomic.Int32 + maxInFlight atomic.Int32 +} + +func (f *concurrencyFakeAgent) Prompt(_ context.Context, _ acp.PromptRequest) (acp.PromptResponse, error) { + cur := f.inFlight.Add(1) + for { + old := f.maxInFlight.Load() + if cur <= old || f.maxInFlight.CompareAndSwap(old, cur) { + break + } + } + f.entered <- struct{}{} + <-f.release + f.inFlight.Add(-1) + return acp.PromptResponse{StopReason: acp.StopReasonEndTurn}, nil +} + +func (f *concurrencyFakeAgent) Initialize(_ context.Context, params acp.InitializeRequest) (acp.InitializeResponse, error) { + return acp.InitializeResponse{ProtocolVersion: params.ProtocolVersion}, nil +} + +func (f *concurrencyFakeAgent) NewSession(_ context.Context, _ acp.NewSessionRequest) (acp.NewSessionResponse, error) { + return acp.NewSessionResponse{SessionId: "sess-concurrency-test"}, nil +} + +func (f *concurrencyFakeAgent) Authenticate(_ context.Context, _ acp.AuthenticateRequest) (acp.AuthenticateResponse, error) { + return acp.AuthenticateResponse{}, nil +} +func (f *concurrencyFakeAgent) Cancel(_ context.Context, _ acp.CancelNotification) error { return nil } +func (f *concurrencyFakeAgent) CloseSession(_ context.Context, _ acp.CloseSessionRequest) (acp.CloseSessionResponse, error) { + return acp.CloseSessionResponse{}, nil +} +func (f *concurrencyFakeAgent) ListSessions(_ context.Context, _ acp.ListSessionsRequest) (acp.ListSessionsResponse, error) { + return acp.ListSessionsResponse{}, nil +} +func (f *concurrencyFakeAgent) ResumeSession(_ context.Context, _ acp.ResumeSessionRequest) (acp.ResumeSessionResponse, error) { + return acp.ResumeSessionResponse{}, nil +} +func (f *concurrencyFakeAgent) SetSessionConfigOption(_ context.Context, _ acp.SetSessionConfigOptionRequest) (acp.SetSessionConfigOptionResponse, error) { + return acp.SetSessionConfigOptionResponse{}, nil +} +func (f *concurrencyFakeAgent) SetSessionMode(_ context.Context, _ acp.SetSessionModeRequest) (acp.SetSessionModeResponse, error) { + return acp.SetSessionModeResponse{}, nil +} + +// setupConcurrencyFakeAgent wires an adapter to a blocking fake agent and +// registers cleanup immediately so early t.Fatal paths cannot leak pipes or +// background goroutines. +func setupConcurrencyFakeAgent(t *testing.T) (*Adapter, *concurrencyFakeAgent) { + t.Helper() + + a := newTestAdapter() + c2aR, c2aW := io.Pipe() + a2cR, a2cW := io.Pipe() + t.Cleanup(func() { + _ = a.Close() + _ = c2aW.Close() + _ = a2cW.Close() + }) + + if err := a.Connect(c2aW, a2cR); err != nil { + t.Fatalf("Connect: %v", err) + } + + fa := &concurrencyFakeAgent{ + entered: make(chan struct{}, 8), + release: make(chan struct{}), + } + _ = acp.NewAgentSideConnection(fa, a2cW, c2aR) + return a, fa +} + +// waitForPromptComplete blocks until sendPrompt emits EventTypeComplete or times out. +func waitForPromptComplete(t *testing.T, a *Adapter) { + t.Helper() + deadline := time.After(100 * time.Millisecond) + for { + select { + case ev := <-a.updatesCh: + if ev.Type == streams.EventTypeComplete { + return + } + case <-deadline: + t.Fatal("timed out waiting for prompt complete event") + } + } +} + +// TestWakeupDoesNotRaceConcurrentPromptWithUserPrompt reproduces the +// turn-misalignment bug: when a ScheduleWakeup timer fires while a user prompt +// is still in flight, fireWakeup must NOT issue a second, concurrent +// conn.Prompt() against the bridge. Two overlapping prompts are what desync the +// bridge's per-prompt stop_reason from the turn it belongs to, shifting chat +// turns one prompt behind. +// +// The fake agent blocks inside Prompt and records peak concurrency. Before this +// fix, the synthetic wakeup prompt overlapped the user prompt (maxInFlight == 2) +// because fireWakeup called conn.Prompt() concurrently. This test verifies the +// fix serializes them so maxInFlight never exceeds 1 during the overlap window. +func TestWakeupDoesNotRaceConcurrentPromptWithUserPrompt(t *testing.T) { + a, fa := setupConcurrencyFakeAgent(t) + + ctx := context.Background() + if err := a.Initialize(ctx); err != nil { + t.Fatalf("Initialize: %v", err) + } + sid, err := a.NewSession(ctx, nil) + if err != nil { + t.Fatalf("NewSession: %v", err) + } + + var userWG sync.WaitGroup + userWG.Add(1) + go func() { + defer userWG.Done() + _ = a.Prompt(ctx, "user message", nil) + }() + + // Wait until the user prompt is parked inside the agent's Prompt handler. + select { + case <-fa.entered: + case <-time.After(5 * time.Second): + t.Fatal("user prompt never reached the agent") + } + + // Fire the wakeup while the user prompt is still in flight. + a.fireWakeup(sid, "synthetic wakeup prompt") + + // Wakeup must not enter the agent while the user prompt is still parked. + select { + case <-fa.entered: + t.Fatal("wakeup entered agent while user prompt was in flight") + case <-time.After(100 * time.Millisecond): + } + + if fa.maxInFlight.Load() > 1 { + t.Fatalf("maxInFlight=%d during overlap window; prompts must be serialized so the bridge's stop_reason stays aligned with its turn", fa.maxInFlight.Load()) + } + + // Release the user prompt; the queued wakeup should run serially afterwards. + close(fa.release) + userWG.Wait() + waitForPromptComplete(t, a) + + select { + case <-fa.entered: + case <-time.After(100 * time.Millisecond): + t.Fatal("wakeup never ran after user prompt completed") + } + waitForPromptComplete(t, a) +} + +// TestWakeupDroppedWhenSessionChangesWhileQueued covers the case where a wakeup +// prompt queues behind an in-flight user prompt and the adapter's active session +// changes (NewSession/LoadSession/reset) before the wakeup gets the gate. The +// queued wakeup must target the session it was scheduled for; if that session is +// no longer current it must be dropped, not injected into the new session. +func TestWakeupDroppedWhenSessionChangesWhileQueued(t *testing.T) { + a, fa := setupConcurrencyFakeAgent(t) + + ctx := context.Background() + if err := a.Initialize(ctx); err != nil { + t.Fatalf("Initialize: %v", err) + } + origSession, err := a.NewSession(ctx, nil) + if err != nil { + t.Fatalf("NewSession: %v", err) + } + + var userWG sync.WaitGroup + userWG.Add(1) + go func() { + defer userWG.Done() + _ = a.Prompt(ctx, "user message", nil) + }() + select { + case <-fa.entered: + case <-time.After(5 * time.Second): + t.Fatal("user prompt never reached the agent") + } + + // Wakeup fires for the original session and queues behind the gate. + a.fireWakeup(origSession, "scheduled wakeup for original session") + + // The active session changes while the wakeup waits (e.g. reset/resume). + a.mu.Lock() + a.sessionID = "different-session-after-reset" + a.mu.Unlock() + + // Release the in-flight user prompt so the queued wakeup can run. + close(fa.release) + userWG.Wait() + + select { + case <-fa.entered: + t.Fatal("queued wakeup was sent after the session changed; it must be dropped to avoid injecting into an unrelated session") + case <-time.After(100 * time.Millisecond): + } + waitForPromptComplete(t, a) +} + +// TestWakeupDoesNotConsumePendingContext verifies that a synthetic wakeup prompt +// does not read-and-clear pendingContext, which is reserved for the next user +// prompt (e.g. fork_session resume context). +func TestWakeupDoesNotConsumePendingContext(t *testing.T) { + a, fa := setupConcurrencyFakeAgent(t) + + ctx := context.Background() + if err := a.Initialize(ctx); err != nil { + t.Fatalf("Initialize: %v", err) + } + sid, err := a.NewSession(ctx, nil) + if err != nil { + t.Fatalf("NewSession: %v", err) + } + + const canary = "resume-context-for-next-user-prompt" + a.mu.Lock() + a.pendingContext = canary + a.mu.Unlock() + + a.fireWakeup(sid, "scheduled wakeup prompt") + + select { + case <-fa.entered: + case <-time.After(100 * time.Millisecond): + t.Fatal("wakeup never reached the agent") + } + close(fa.release) + waitForPromptComplete(t, a) + + a.mu.Lock() + got := a.pendingContext + a.mu.Unlock() + if got != canary { + t.Fatalf("pendingContext=%q, want %q — wakeup must not consume resume context", got, canary) + } +}