diff --git a/apps/backend/Makefile b/apps/backend/Makefile index 41410567c..30b682571 100644 --- a/apps/backend/Makefile +++ b/apps/backend/Makefile @@ -48,7 +48,7 @@ BUILD_TIME := $(shell date -u '+%Y-%m-%dT%H:%M:%SZ' 2>/dev/null || powershell -c LDFLAGS += -X main.Version=$(VERSION) -X main.Commit=$(COMMIT) -X main.BuildTime=$(BUILD_TIME) -.PHONY: all build build-all build-agentctl build-agentctl-linux build-acpdbg acpdbg build-mock-agent build-mock-agent-linux build-preview build-winjob clean run dev start-debug test test-e2e test-sprites-e2e lint fmt vet help +.PHONY: all build build-all build-agentctl build-agentctl-linux build-acpdbg acpdbg build-mock-agent build-mock-agent-linux build-preview build-winjob clean run dev start-debug test test-e2e test-sprites-e2e test-lifecycle-goleak lint fmt vet help ## Default target all: build @@ -167,6 +167,18 @@ test-sprites-e2e: build-agentctl-linux KANDEV_AGENTCTL_LINUX_BINARY=$(CURDIR)/$(BUILD_DIR)/agentctl-linux-amd64 \ $(GO) test -v -count=1 -tags sprites_e2e -run TestSpritesE2E -timeout 10m ./internal/agent/runtime/lifecycle/ +## Stress-run the lifecycle goleak suite — repro target for the CI-only flake +## where StreamManager/WorkspaceStream goroutines linger past TestMain on +## slow runners. Local hardware almost never reproduces the leak under a +## single pass; -count=20 (configurable) plus -race is the smallest cadence +## that consistently surfaces the race during development. +## +## Override the loop count with `make test-lifecycle-goleak LIFECYCLE_GOLEAK_COUNT=N`. +LIFECYCLE_GOLEAK_COUNT ?= 20 +test-lifecycle-goleak: + @echo "Stress-running lifecycle goleak suite ($(LIFECYCLE_GOLEAK_COUNT)x)..." + $(CGO_PREFIX) $(GO) test -tags fts5 -race -count=$(LIFECYCLE_GOLEAK_COUNT) -timeout 600s ./internal/agent/runtime/lifecycle/ ./internal/agent/runtime/agentctl/ + ## Run tests with coverage test-coverage: @echo "Running tests with coverage..." @@ -227,6 +239,7 @@ help: @echo " test Run tests (full suite — fails on Windows pending fixture cleanup)" @echo " test-windows Run only the Windows-clean subset (matches CI windows-latest job)" @echo " test-e2e Run E2E adapter tests (real agents, costs money)" + @echo " test-lifecycle-goleak Stress-run lifecycle goleak suite (override LIFECYCLE_GOLEAK_COUNT=N)" @echo " test-coverage Run tests with coverage report" @echo " lint Run golangci-lint" @echo " fmt Format code" diff --git a/apps/backend/internal/agent/runtime/agentctl/client.go b/apps/backend/internal/agent/runtime/agentctl/client.go index 4b1538681..7d3b04aa7 100644 --- a/apps/backend/internal/agent/runtime/agentctl/client.go +++ b/apps/backend/internal/agent/runtime/agentctl/client.go @@ -38,7 +38,18 @@ type Client struct { // WebSocket connections for streaming agentStreamConn *websocket.Conn workspaceStreamConn *websocket.Conn - mu sync.RWMutex + // workspaceStream is the most-recent workspace stream returned by + // StreamWorkspace, retained so Client.Close can wait for its read/write + // goroutines to drain. Cleared by readWorkspaceStream's defer once the + // stream tears down. + workspaceStream *WorkspaceStream + // closed flips to true on Client.Close and prevents new StreamWorkspace + // dials from leaking goroutines past the close barrier. Agent (updates) + // stream is not gated on this flag because the cascade flow legitimately + // stops + restarts the agent stream on the same client; gating it would + // strand workflow step transitions on a closed client. + closed bool + mu sync.RWMutex // Shared write mutex for agent stream (used by StreamUpdates and sendStreamRequest) streamWriteMu sync.Mutex @@ -656,10 +667,32 @@ type ( ProcessStatusUpdate = types.ProcessStatusUpdate ) -// Close closes all connections and releases resources +// Close closes all connections and releases resources. It is a drain +// barrier for workspace stream goroutines: when Close returns, the workspace +// read/write loops have fully exited and future StreamWorkspace calls return +// immediately with an error. The agent (updates) stream is closed but not +// drained synchronously — the cascade flow legitimately calls Close on a +// client whose updates stream is still mid-event, and blocking would stall +// workflow step transitions. func (c *Client) Close() { + c.mu.Lock() + c.closed = true + ws := c.workspaceStream + c.mu.Unlock() + c.CloseUpdatesStream() + // CloseWorkspaceStream closes the raw conn to wake the blocked read loop. + // ws.Close (below) is needed to close the writeLoop's closeCh; closeOnce + // makes ws.Close idempotent so the duplicate conn.Close it issues just + // logs at Debug. Both calls together wake both goroutines deterministically. c.CloseWorkspaceStream() + + // Wait for the workspace stream's read/write goroutines to fully unwind. + if ws != nil { + ws.Close() + ws.Wait() + } + if c.httpClient != nil { c.httpClient.CloseIdleConnections() } diff --git a/apps/backend/internal/agent/runtime/agentctl/client_close_test.go b/apps/backend/internal/agent/runtime/agentctl/client_close_test.go new file mode 100644 index 000000000..74a412463 --- /dev/null +++ b/apps/backend/internal/agent/runtime/agentctl/client_close_test.go @@ -0,0 +1,126 @@ +package client + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + "time" + + "github.com/gorilla/websocket" + "github.com/kandev/kandev/internal/common/logger" +) + +// closeBarrierMockServer is a minimal agentctl mock that exposes the two +// WebSocket endpoints needed for Close/drain regression coverage. Handlers +// stay open until the client tears down, mirroring the behaviour real +// agentctl exhibits when the manager hasn't asked it to exit. +type closeBarrierMockServer struct { + server *httptest.Server + + mu sync.Mutex + wsConns []*websocket.Conn + connected chan struct{} + once sync.Once +} + +func newCloseBarrierMockServer(t *testing.T) *closeBarrierMockServer { + t.Helper() + m := &closeBarrierMockServer{connected: make(chan struct{})} + upgrader := websocket.Upgrader{CheckOrigin: func(*http.Request) bool { return true }} + + handler := func(w http.ResponseWriter, r *http.Request) { + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + return + } + m.mu.Lock() + m.wsConns = append(m.wsConns, conn) + m.mu.Unlock() + m.once.Do(func() { close(m.connected) }) + // Block until client closes. + for { + if _, _, err := conn.ReadMessage(); err != nil { + _ = conn.Close() + return + } + } + } + + mux := http.NewServeMux() + mux.HandleFunc("/api/v1/agent/stream", handler) + mux.HandleFunc("/api/v1/workspace/stream", handler) + m.server = httptest.NewServer(mux) + t.Cleanup(func() { + m.mu.Lock() + for _, c := range m.wsConns { + _ = c.Close() + } + m.mu.Unlock() + m.server.Close() + }) + return m +} + +func newCloseBarrierTestClient(t *testing.T, serverURL string) *Client { + t.Helper() + url := strings.TrimPrefix(serverURL, "http://") + parts := strings.SplitN(url, ":", 2) + host := parts[0] + var port int + _, _ = fmt.Sscanf(parts[1], "%d", &port) + log, _ := logger.NewLogger(logger.LoggingConfig{Level: "error", Format: "json"}) + return NewClient(host, port, log) +} + +// TestClientClose_DrainsWorkspaceStream is the regression test for the +// CI-only goleak flake around StreamManager and WorkspaceStream goroutines +// surviving Close. After Close returns, the workspace read/write loops must +// have fully unwound — otherwise tests with `defer client.Close()` see +// lingering goroutines and goleak.VerifyTestMain fails. The agent (updates) +// stream is closed but not drained synchronously: the cascade flow legitimately +// stops + restarts the updates stream on the same client. +func TestClientClose_DrainsWorkspaceStream(t *testing.T) { + mock := newCloseBarrierMockServer(t) + client := newCloseBarrierTestClient(t, mock.server.URL) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + ws, err := client.StreamWorkspace(ctx, WorkspaceStreamCallbacks{}) + if err != nil { + t.Fatalf("StreamWorkspace failed: %v", err) + } + if ws == nil { + t.Fatal("nil WorkspaceStream") + } + + select { + case <-mock.connected: + case <-time.After(2 * time.Second): + t.Fatal("mock server never observed a WS connection") + } + + // Close must return promptly and have drained the workspace stream. A hung + // goroutine here would block Close forever (or, pre-fix, return early and + // leave the goroutine running past goleak's check). + done := make(chan struct{}) + go func() { + client.Close() + close(done) + }() + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("Client.Close did not return within 2s — workspace drain is stuck") + } + + // Post-Close, StreamWorkspace must error so a racy second close path + // doesn't strand a new dial past the barrier. + if _, err := client.StreamWorkspace(context.Background(), WorkspaceStreamCallbacks{}); err == nil { + t.Error("StreamWorkspace after Close should return error, got nil") + } +} diff --git a/apps/backend/internal/agent/runtime/agentctl/workspace_stream.go b/apps/backend/internal/agent/runtime/agentctl/workspace_stream.go index 7619fdf40..dbcd8093c 100644 --- a/apps/backend/internal/agent/runtime/agentctl/workspace_stream.go +++ b/apps/backend/internal/agent/runtime/agentctl/workspace_stream.go @@ -45,6 +45,10 @@ type WorkspaceStream struct { // StreamWorkspace opens a unified WebSocket connection for all workspace events func (c *Client) StreamWorkspace(ctx context.Context, callbacks WorkspaceStreamCallbacks) (*WorkspaceStream, error) { c.mu.Lock() + if c.closed { + c.mu.Unlock() + return nil, fmt.Errorf("agentctl client closed") + } if c.workspaceStreamConn != nil { c.mu.Unlock() return nil, fmt.Errorf("workspace stream already connected") @@ -57,12 +61,6 @@ func (c *Client) StreamWorkspace(ctx context.Context, callbacks WorkspaceStreamC return nil, fmt.Errorf("failed to connect to workspace stream: %w", err) } - c.mu.Lock() - c.workspaceStreamConn = conn - c.mu.Unlock() - - c.logger.Info("connected to workspace stream", zap.String("url", wsURL)) - stream := &WorkspaceStream{ conn: conn, inputCh: make(chan types.WorkspaceStreamMessage, 64), @@ -70,6 +68,29 @@ func (c *Client) StreamWorkspace(ctx context.Context, callbacks WorkspaceStreamC logger: c.logger, } + // Race: Close may have fired between the dial returning and us re-acquiring + // the lock. Drop the new conn + stream instead of leaking the read/write + // goroutines past Client.Close's drain barrier. + c.mu.Lock() + if c.closed { + c.mu.Unlock() + _ = conn.Close() + return nil, fmt.Errorf("agentctl client closed during workspace stream dial") + } + // Re-check after dial: two concurrent StreamWorkspace callers can both pass + // the pre-dial guard and race here. The later one would orphan the first + // conn and its goroutines without this check. + if c.workspaceStreamConn != nil { + c.mu.Unlock() + _ = conn.Close() + return nil, fmt.Errorf("workspace stream already connected") + } + c.workspaceStreamConn = conn + c.workspaceStream = stream + c.mu.Unlock() + + c.logger.Info("connected to workspace stream", zap.String("url", wsURL)) + // Track both goroutines on the per-stream wg so WorkspaceStream.Wait can // block until they have fully unwound. The workspace read loop only invokes // data callbacks (shell/git/process) and self-closes on exit — it never @@ -104,7 +125,14 @@ var workspaceTracedTypes = map[types.WorkspaceMessageType]bool{ func (c *Client) readWorkspaceStream(conn *websocket.Conn, stream *WorkspaceStream, callbacks WorkspaceStreamCallbacks) { defer func() { c.mu.Lock() - c.workspaceStreamConn = nil + // Guard both resets by identity — a concurrent StreamWorkspace caller + // may have replaced the conn/stream pointers since this read loop started. + if c.workspaceStreamConn == conn { + c.workspaceStreamConn = nil + } + if c.workspaceStream == stream { + c.workspaceStream = nil + } c.mu.Unlock() stream.Close() }() diff --git a/apps/backend/internal/agent/runtime/lifecycle/manager_interaction.go b/apps/backend/internal/agent/runtime/lifecycle/manager_interaction.go index 3441bff77..48eb9c3be 100644 --- a/apps/backend/internal/agent/runtime/lifecycle/manager_interaction.go +++ b/apps/backend/internal/agent/runtime/lifecycle/manager_interaction.go @@ -575,8 +575,13 @@ func (m *Manager) RestartAgentProcess(ctx context.Context, executionID string) e return fmt.Errorf("failed to get agent config for restart: %w", err) } - // 1. Close WebSocket streams (updates + workspace) - execution.agentctl.Close() + // 1. Close WebSocket streams (updates + workspace). Use per-stream Close + // methods rather than client.Close — the latter is a terminal drain + // barrier that flips the client into a closed state and would block + // every StreamUpdates/StreamWorkspace call that this same restart path + // makes a few lines below. + execution.agentctl.CloseUpdatesStream() + execution.agentctl.CloseWorkspaceStream() // 2. Stop the agent subprocess via agentctl (keeps agentctl server alive) if err := execution.agentctl.Stop(ctx); err != nil { diff --git a/apps/backend/internal/agent/runtime/lifecycle/streams.go b/apps/backend/internal/agent/runtime/lifecycle/streams.go index 143df20b8..2d6d3caeb 100644 --- a/apps/backend/internal/agent/runtime/lifecycle/streams.go +++ b/apps/backend/internal/agent/runtime/lifecycle/streams.go @@ -31,36 +31,100 @@ type StreamManager struct { logger *logger.Logger callbacks StreamCallbacks mcpHandler agentctl.MCPHandler - // stopCh is the Manager-owned shutdown signal. SessionTraceContext is - // deliberately uncancellable (it carries a long-lived span), so the - // reconnect/backoff loops below select on stopCh to drain on Manager.Stop. - // nil is treated as "no shutdown signal" and falls back to the prior - // uncancellable behaviour (compat with constructors used by isolated tests). - stopCh <-chan struct{} - wg sync.WaitGroup - wgMu sync.Mutex - stopped bool + // stopCh is the Manager-owned shutdown signal. The retry/backoff and + // connected `<-ws.Done() / <-stop>` select read from it so they drain on + // Manager.Stop. May be nil when isolated tests don't care about external + // shutdown; waitCh below covers Wait-driven drains in that case. + stopCh <-chan struct{} + // waitCh is closed by Wait() so retry/backoff and the connected select + // drain even when the external stopCh isn't closed by the caller (or is + // nil). Together with stopCh this makes Wait an absolute drain barrier, + // which goleak.VerifyTestMain depends on under CI load. + waitCh chan struct{} + waitChOnce sync.Once + wg sync.WaitGroup + wgMu sync.Mutex + stopped bool } +// stopChannelContext wraps a parent ctx with two auxiliary stop channels. +// Done() returns a per-instance merged channel that closes when any of +// parent.Done(), primary or secondary fires. The merge goroutine spawned by +// Done() exits as soon as any signal fires, and Wait()'s waitCh close +// guarantees that happens at teardown time. +// +// We keep merge spawn behind a sync.Once so repeated Done() calls (the runtime +// re-asks every select tick) don't pile up goroutines. The optional wg field +// lets a StreamManager track the merge goroutine so sm.wg.Wait remains a true +// drain barrier even when the outer stream goroutine returns first (the +// connectUpdatesStream path returns immediately after the dial, so without +// this the merge goroutine could outlive sm.wg.Wait and trip goleak). type stopChannelContext struct { context.Context - stopCh <-chan struct{} + primary <-chan struct{} + secondary <-chan struct{} + + wg *sync.WaitGroup + + once sync.Once + merged chan struct{} } -func (c stopChannelContext) Done() <-chan struct{} { - if c.stopCh == nil { +func (c *stopChannelContext) Done() <-chan struct{} { + if c.primary == nil && c.secondary == nil { return c.Context.Done() } - return c.stopCh + c.once.Do(func() { + c.merged = make(chan struct{}) + if c.wg != nil { + c.wg.Add(1) + } + go c.mergeStops() + }) + return c.merged } -func (c stopChannelContext) Err() error { - select { - case <-c.stopCh: - return context.Canceled +func (c *stopChannelContext) mergeStops() { + defer close(c.merged) + if c.wg != nil { + defer c.wg.Done() + } + switch { + case c.primary != nil && c.secondary != nil: + select { + case <-c.primary: + case <-c.secondary: + case <-c.Context.Done(): + } + case c.primary != nil: + select { + case <-c.primary: + case <-c.Context.Done(): + } default: - return c.Context.Err() + select { + case <-c.secondary: + case <-c.Context.Done(): + } + } +} + +func (c *stopChannelContext) Err() error { + if c.primary != nil { + select { + case <-c.primary: + return context.Canceled + default: + } } + if c.secondary != nil { + select { + case <-c.secondary: + return context.Canceled + default: + } + } + return c.Context.Err() } // NewStreamManager creates a new StreamManager. @@ -68,12 +132,15 @@ func (c stopChannelContext) Err() error { // stopCh is the Manager-owned shutdown signal used by the workspace-stream // retry backoff to drain cleanly. Pass nil from tests that exercise the // manager in isolation; production callers wire it from Manager.stopCh. +// Either way, Wait() closes a per-StreamManager internal channel that the +// same drain sites observe — so Wait remains an absolute drain barrier. func NewStreamManager(log *logger.Logger, callbacks StreamCallbacks, mcpHandler agentctl.MCPHandler, stopCh <-chan struct{}) *StreamManager { return &StreamManager{ logger: log.WithFields(zap.String("component", "stream-manager")), callbacks: callbacks, mcpHandler: mcpHandler, stopCh: stopCh, + waitCh: make(chan struct{}), } } @@ -113,10 +180,14 @@ func (sm *StreamManager) ConnectMCPStream(execution *AgentExecution) { } // Wait blocks until all StreamManager-owned stream goroutines have exited. +// Closes the internal waitCh first so any goroutine still parked in the retry +// backoff or the connected `<-ws.Done() / <-stop>` select drains without +// depending on the caller having closed the external stopCh. func (sm *StreamManager) Wait() { sm.wgMu.Lock() sm.stopped = true sm.wgMu.Unlock() + sm.waitChOnce.Do(func() { close(sm.waitCh) }) sm.wg.Wait() } @@ -168,31 +239,39 @@ func (sm *StreamManager) ReconnectAll(execution *AgentExecution) { } // sleepOrStop blocks for d or until the Manager begins shutting down. -// Returns true when the timer fires, false when stopCh closes first. -// A nil stopCh degrades to time.Sleep semantics (always returns true). +// Returns true when the timer fires, false when either the external stopCh +// or the internal Wait-driven waitCh fires first. func (sm *StreamManager) sleepOrStop(d time.Duration) bool { - if sm.stopCh == nil { - time.Sleep(d) - return true - } timer := time.NewTimer(d) defer timer.Stop() + if sm.stopCh == nil { + select { + case <-timer.C: + return true + case <-sm.waitCh: + return false + } + } select { case <-timer.C: return true case <-sm.stopCh: return false + case <-sm.waitCh: + return false } } // streamContext preserves the execution's session trace values while making -// in-flight WebSocket dials cancellable by the manager shutdown signal. +// in-flight WebSocket dials cancellable by either the external Manager +// shutdown signal or StreamManager.Wait's internal drain signal. func (sm *StreamManager) streamContext(execution *AgentExecution) context.Context { - ctx := execution.SessionTraceContext() - if sm.stopCh == nil { - return ctx + return &stopChannelContext{ + Context: execution.SessionTraceContext(), + primary: sm.stopCh, + secondary: sm.waitCh, + wg: &sm.wg, } - return stopChannelContext{Context: ctx, stopCh: sm.stopCh} } // connectUpdatesStream handles the updates WebSocket stream with ready signaling @@ -387,17 +466,32 @@ func (sm *StreamManager) connectWorkspaceStream(execution *AgentExecution, ready // Signal that workspace stream is ready signalReady() - // Wait for the stream to close. Also exits on Manager shutdown so the - // goroutine drains when the remote end keeps the connection open — - // in that case we close ws ourselves so the underlying WS read/write - // loops in agentctl.WorkspaceStream also exit. ws.Close is idempotent - // via closeOnce. A nil stopCh (isolated tests) blocks on the nil - // channel forever, which degrades to plain <-ws.Done() semantics. - select { - case <-ws.Done(): - case <-sm.stopCh: + // Wait for the stream to close. Also exits on Manager shutdown / Wait + // so the goroutine drains when the remote end keeps the connection + // open — in that case we close ws ourselves so the underlying WS + // read/write loops in agentctl.WorkspaceStream also exit. ws.Close + // is idempotent via closeOnce. The waitCh branch covers the case + // where the caller never closes external stopCh (or stopCh is nil) + // but still calls Wait — without it, isolated tests that triggered + // this select would leak under CI scheduling. + shutdown := func() { ws.Close() } + if sm.stopCh == nil { + select { + case <-ws.Done(): + case <-sm.waitCh: + shutdown() + } + } else { + select { + case <-ws.Done(): + case <-sm.stopCh: + shutdown() + case <-sm.waitCh: + shutdown() + } + } // Block until the stream's read/write goroutines have fully unwound // before returning. Done()/Close only signal shutdown, so without this // the StreamManager's wg releases while a blocked websocket read is