Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,18 @@ type Adapter struct {
mu sync.RWMutex
closed bool

// promptGate is a 1-slot semaphore that serializes session/prompt calls so
// at most one is in flight against the bridge at a time. The ScheduleWakeup
// path injects a synthetic prompt via fireWakeup; without this gate it can
// race a user prompt, and the claude-agent-acp bridge then returns each
// prompt's stop_reason against the wrong turn — shifting chat turns one
// prompt behind. A queued synthetic prompt waits here and drains the wakeup
// turn once the in-flight prompt finishes. It is a channel rather than a
// sync.Mutex so the wait honours the caller's context (a wakeup whose
// timeout/lifetime context is cancelled while queued aborts instead of
// blocking on a stuck turn).
promptGate chan struct{}

// lifetimeCtx is cancelled by Close. Background work that may outlive
// the call site (e.g. the synthetic wakeup prompt goroutine) derives its
// context from this one so it aborts when the adapter shuts down rather
Expand All @@ -189,6 +201,7 @@ func NewAdapter(cfg *shared.Config, log *logger.Logger) *Adapter {
pendingWakeups: make(map[string]*pendingWakeup),
usageBySession: make(map[string]*usageTracker),
attachMgr: shared.NewAttachmentManager(cfg.WorkDir, l.Zap()),
promptGate: make(chan struct{}, 1),
lifetimeCtx: ctx,
lifetimeCancel: cancel,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,54 @@ import (
// If pending context is set (from SetPendingContext), it will be prepended to the message.
// Attachments (images) are converted to ACP ImageBlocks and included in the prompt.
// When the prompt completes, a complete event is emitted via the updates channel.
func (a *Adapter) Prompt(ctx context.Context, message string, attachments []v1.MessageAttachment) error {
// A user prompt always targets the current session, so it is not pinned.
return a.sendPrompt(ctx, message, attachments, "")
}

// sendPrompt serializes session/prompt calls through promptGate and sends one
// prompt to the agent. When expectSession is non-empty the prompt is pinned to
// that session: if the adapter's active session changed (or the adapter closed)
// while this call waited on the gate, the prompt is dropped instead of being
// sent to whatever session is now current. This is the ScheduleWakeup path —
// a wakeup must reach the session it was scheduled for or not at all.
//
//nolint:cyclop,funlen // pre-existing complexity preserved from adapter.go file split
func (a *Adapter) Prompt(ctx context.Context, message string, attachments []v1.MessageAttachment) error {
func (a *Adapter) sendPrompt(ctx context.Context, message string, attachments []v1.MessageAttachment, expectSession string) error {
// Acquire the prompt gate, honouring ctx so a queued wakeup whose context
// is cancelled (timeout / adapter Close) aborts instead of blocking on a
// stuck in-flight turn.
select {
case a.promptGate <- struct{}{}:
defer func() { <-a.promptGate }()
case <-ctx.Done():
return ctx.Err()
}

a.mu.Lock()
conn := a.acpConn
sessionID := a.sessionID
pendingContext := a.pendingContext
a.pendingContext = "" // Clear after use
closed := a.closed
// A pinned prompt that no longer matches the active session (or a closed
// adapter) is dropped. Wakeup-pinned prompts are synthetic turns — they
// must not consume pendingContext reserved for the next user prompt (e.g.
// fork_session resume context).
drop := expectSession != "" && (closed || sessionID != expectSession)
Comment thread
carlosflorencio marked this conversation as resolved.
var pendingContext string
if !drop && expectSession == "" {
pendingContext = a.pendingContext
a.pendingContext = "" // Clear after use
}
Comment thread
carlosflorencio marked this conversation as resolved.
a.mu.Unlock()

if drop {
a.logger.Info("dropping queued wakeup prompt: session changed or adapter closed before it ran",
zap.String("scheduled_for", expectSession),
zap.String("current_session", sessionID),
zap.Bool("closed", closed))
return nil
}

if conn == nil {
return fmt.Errorf("adapter not initialized")
}
Expand Down Expand Up @@ -170,14 +208,15 @@ func (a *Adapter) Prompt(ctx context.Context, message string, attachments []v1.M
// turn and emits visible ACP frames. The session must still match (the user
// hasn't started a fresh session) and the adapter must not be closed.
//
// Concurrent-prompt safety: if a user prompt is already in flight when this
// runs, both end up calling conn.Prompt() on the same ClientSideConnection.
// That's safe at the wire level — the ACP SDK's Connection.sendMessage
// holds a write mutex, so request frames never interleave on stdin, and
// JSON-RPC pairs each response back to its originating request via id —
// but it does mean two prompts can be in flight against the bridge at
// once. The bridge serialises them in the order it receives them, which
// is exactly what we want for a wakeup that races a user message.
// Prompt serialization: the synthetic prompt goes through sendPrompt, which
// gates on promptGate so only one session/prompt is in flight at a time. If a
// user prompt is already running the wakeup waits behind it rather than racing
// it — two concurrent conn.Prompt() calls would let the bridge return each
// prompt's stop_reason against the wrong turn, shifting chat turns one prompt
// behind. Because the wakeup can wait, the session is re-validated inside
// sendPrompt (via the pinned expectSession argument): if a NewSession/LoadSession
// changed the active session while the wakeup queued, it is dropped instead of
// being injected into the new session.
func (a *Adapter) fireWakeup(sessionID, prompt string) {
a.mu.RLock()
closed := a.closed
Expand Down Expand Up @@ -205,7 +244,9 @@ func (a *Adapter) fireWakeup(sessionID, prompt string) {
// prompt instead of letting it run against a dead subprocess.
ctx, cancel := context.WithTimeout(a.lifetimeCtx, wakeupPromptTimeout)
defer cancel()
if err := a.Prompt(ctx, prompt, nil); err != nil {
// Pin to the scheduled session: if the active session changed while this
// wakeup waited on the prompt gate, sendPrompt drops it.
if err := a.sendPrompt(ctx, prompt, nil, sessionID); err != nil {
a.logger.Error("synthetic wakeup prompt failed",
zap.String("session_id", sessionID),
zap.Error(err))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
package acp

import (
"context"
"io"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/coder/acp-go-sdk"
"github.com/kandev/kandev/internal/agentctl/types/streams"
)

// concurrencyFakeAgent is a minimal acp.Agent whose Prompt handler blocks until
// released, recording how many Prompt calls are in flight simultaneously. It
// lets a test observe whether kandev issues two overlapping conn.Prompt() calls.
type concurrencyFakeAgent struct {
entered chan struct{} // one signal per Prompt entry
release chan struct{} // closed to unblock all parked Prompts
inFlight atomic.Int32
maxInFlight atomic.Int32
}

func (f *concurrencyFakeAgent) Prompt(_ context.Context, _ acp.PromptRequest) (acp.PromptResponse, error) {
cur := f.inFlight.Add(1)
for {
old := f.maxInFlight.Load()
if cur <= old || f.maxInFlight.CompareAndSwap(old, cur) {
break
}
}
f.entered <- struct{}{}
<-f.release
f.inFlight.Add(-1)
return acp.PromptResponse{StopReason: acp.StopReasonEndTurn}, nil
}

func (f *concurrencyFakeAgent) Initialize(_ context.Context, params acp.InitializeRequest) (acp.InitializeResponse, error) {
return acp.InitializeResponse{ProtocolVersion: params.ProtocolVersion}, nil
}

func (f *concurrencyFakeAgent) NewSession(_ context.Context, _ acp.NewSessionRequest) (acp.NewSessionResponse, error) {
return acp.NewSessionResponse{SessionId: "sess-concurrency-test"}, nil
}

func (f *concurrencyFakeAgent) Authenticate(_ context.Context, _ acp.AuthenticateRequest) (acp.AuthenticateResponse, error) {
return acp.AuthenticateResponse{}, nil
}
func (f *concurrencyFakeAgent) Cancel(_ context.Context, _ acp.CancelNotification) error { return nil }
func (f *concurrencyFakeAgent) CloseSession(_ context.Context, _ acp.CloseSessionRequest) (acp.CloseSessionResponse, error) {
return acp.CloseSessionResponse{}, nil
}
func (f *concurrencyFakeAgent) ListSessions(_ context.Context, _ acp.ListSessionsRequest) (acp.ListSessionsResponse, error) {
return acp.ListSessionsResponse{}, nil
}
func (f *concurrencyFakeAgent) ResumeSession(_ context.Context, _ acp.ResumeSessionRequest) (acp.ResumeSessionResponse, error) {
return acp.ResumeSessionResponse{}, nil
}
func (f *concurrencyFakeAgent) SetSessionConfigOption(_ context.Context, _ acp.SetSessionConfigOptionRequest) (acp.SetSessionConfigOptionResponse, error) {
return acp.SetSessionConfigOptionResponse{}, nil
}
func (f *concurrencyFakeAgent) SetSessionMode(_ context.Context, _ acp.SetSessionModeRequest) (acp.SetSessionModeResponse, error) {
return acp.SetSessionModeResponse{}, nil
}

// setupConcurrencyFakeAgent wires an adapter to a blocking fake agent and
// registers cleanup immediately so early t.Fatal paths cannot leak pipes or
// background goroutines.
func setupConcurrencyFakeAgent(t *testing.T) (*Adapter, *concurrencyFakeAgent) {
t.Helper()

a := newTestAdapter()
c2aR, c2aW := io.Pipe()
a2cR, a2cW := io.Pipe()
t.Cleanup(func() {
_ = a.Close()
_ = c2aW.Close()
_ = a2cW.Close()
})

if err := a.Connect(c2aW, a2cR); err != nil {
t.Fatalf("Connect: %v", err)
}

fa := &concurrencyFakeAgent{
entered: make(chan struct{}, 8),
release: make(chan struct{}),
}
_ = acp.NewAgentSideConnection(fa, a2cW, c2aR)
return a, fa
}

// waitForPromptComplete blocks until sendPrompt emits EventTypeComplete or times out.
func waitForPromptComplete(t *testing.T, a *Adapter) {
t.Helper()
deadline := time.After(100 * time.Millisecond)
for {
select {
case ev := <-a.updatesCh:
if ev.Type == streams.EventTypeComplete {
return
}
case <-deadline:
t.Fatal("timed out waiting for prompt complete event")
}
}
}

// TestWakeupDoesNotRaceConcurrentPromptWithUserPrompt reproduces the
// turn-misalignment bug: when a ScheduleWakeup timer fires while a user prompt
// is still in flight, fireWakeup must NOT issue a second, concurrent
// conn.Prompt() against the bridge. Two overlapping prompts are what desync the
// bridge's per-prompt stop_reason from the turn it belongs to, shifting chat
// turns one prompt behind.
//
// The fake agent blocks inside Prompt and records peak concurrency. Before this
// fix, the synthetic wakeup prompt overlapped the user prompt (maxInFlight == 2)
// because fireWakeup called conn.Prompt() concurrently. This test verifies the
// fix serializes them so maxInFlight never exceeds 1 during the overlap window.
func TestWakeupDoesNotRaceConcurrentPromptWithUserPrompt(t *testing.T) {
a, fa := setupConcurrencyFakeAgent(t)

ctx := context.Background()
if err := a.Initialize(ctx); err != nil {
t.Fatalf("Initialize: %v", err)
}
sid, err := a.NewSession(ctx, nil)
if err != nil {
t.Fatalf("NewSession: %v", err)
}

var userWG sync.WaitGroup
userWG.Add(1)
go func() {
defer userWG.Done()
_ = a.Prompt(ctx, "user message", nil)
}()

// Wait until the user prompt is parked inside the agent's Prompt handler.
select {
case <-fa.entered:
case <-time.After(5 * time.Second):
t.Fatal("user prompt never reached the agent")
}

// Fire the wakeup while the user prompt is still in flight.
a.fireWakeup(sid, "synthetic wakeup prompt")

// Wakeup must not enter the agent while the user prompt is still parked.
select {
case <-fa.entered:
t.Fatal("wakeup entered agent while user prompt was in flight")
case <-time.After(100 * time.Millisecond):
}

if fa.maxInFlight.Load() > 1 {
t.Fatalf("maxInFlight=%d during overlap window; prompts must be serialized so the bridge's stop_reason stays aligned with its turn", fa.maxInFlight.Load())
}

// Release the user prompt; the queued wakeup should run serially afterwards.
close(fa.release)
userWG.Wait()
waitForPromptComplete(t, a)

select {
case <-fa.entered:
case <-time.After(100 * time.Millisecond):
t.Fatal("wakeup never ran after user prompt completed")
}
waitForPromptComplete(t, a)
}

// TestWakeupDroppedWhenSessionChangesWhileQueued covers the case where a wakeup
// prompt queues behind an in-flight user prompt and the adapter's active session
// changes (NewSession/LoadSession/reset) before the wakeup gets the gate. The
// queued wakeup must target the session it was scheduled for; if that session is
// no longer current it must be dropped, not injected into the new session.
func TestWakeupDroppedWhenSessionChangesWhileQueued(t *testing.T) {
a, fa := setupConcurrencyFakeAgent(t)

ctx := context.Background()
if err := a.Initialize(ctx); err != nil {
t.Fatalf("Initialize: %v", err)
}
origSession, err := a.NewSession(ctx, nil)
if err != nil {
t.Fatalf("NewSession: %v", err)
}

var userWG sync.WaitGroup
userWG.Add(1)
go func() {
defer userWG.Done()
_ = a.Prompt(ctx, "user message", nil)
}()
select {
case <-fa.entered:
case <-time.After(5 * time.Second):
t.Fatal("user prompt never reached the agent")
}

// Wakeup fires for the original session and queues behind the gate.
a.fireWakeup(origSession, "scheduled wakeup for original session")

// The active session changes while the wakeup waits (e.g. reset/resume).
a.mu.Lock()
a.sessionID = "different-session-after-reset"
a.mu.Unlock()

// Release the in-flight user prompt so the queued wakeup can run.
close(fa.release)
userWG.Wait()

select {
case <-fa.entered:
Comment thread
carlosflorencio marked this conversation as resolved.
t.Fatal("queued wakeup was sent after the session changed; it must be dropped to avoid injecting into an unrelated session")
case <-time.After(100 * time.Millisecond):
}
waitForPromptComplete(t, a)
}

// TestWakeupDoesNotConsumePendingContext verifies that a synthetic wakeup prompt
// does not read-and-clear pendingContext, which is reserved for the next user
// prompt (e.g. fork_session resume context).
func TestWakeupDoesNotConsumePendingContext(t *testing.T) {
a, fa := setupConcurrencyFakeAgent(t)

ctx := context.Background()
if err := a.Initialize(ctx); err != nil {
t.Fatalf("Initialize: %v", err)
}
sid, err := a.NewSession(ctx, nil)
if err != nil {
t.Fatalf("NewSession: %v", err)
}

const canary = "resume-context-for-next-user-prompt"
a.mu.Lock()
a.pendingContext = canary
a.mu.Unlock()

a.fireWakeup(sid, "scheduled wakeup prompt")

select {
case <-fa.entered:
case <-time.After(100 * time.Millisecond):
t.Fatal("wakeup never reached the agent")
}
close(fa.release)
Comment thread
carlosflorencio marked this conversation as resolved.
waitForPromptComplete(t, a)

a.mu.Lock()
got := a.pendingContext
a.mu.Unlock()
if got != canary {
t.Fatalf("pendingContext=%q, want %q — wakeup must not consume resume context", got, canary)
}
}
Loading