Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion apps/backend/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ BUILD_TIME := $(shell date -u '+%Y-%m-%dT%H:%M:%SZ' 2>/dev/null || powershell -c

LDFLAGS += -X main.Version=$(VERSION) -X main.Commit=$(COMMIT) -X main.BuildTime=$(BUILD_TIME)

.PHONY: all build build-all build-agentctl build-agentctl-linux build-acpdbg acpdbg build-mock-agent build-mock-agent-linux build-preview build-winjob clean run dev start-debug test test-e2e test-sprites-e2e lint fmt vet help
.PHONY: all build build-all build-agentctl build-agentctl-linux build-acpdbg acpdbg build-mock-agent build-mock-agent-linux build-preview build-winjob clean run dev start-debug test test-e2e test-sprites-e2e test-lifecycle-goleak lint fmt vet help

## Default target
all: build
Expand Down Expand Up @@ -167,6 +167,18 @@ test-sprites-e2e: build-agentctl-linux
KANDEV_AGENTCTL_LINUX_BINARY=$(CURDIR)/$(BUILD_DIR)/agentctl-linux-amd64 \
$(GO) test -v -count=1 -tags sprites_e2e -run TestSpritesE2E -timeout 10m ./internal/agent/runtime/lifecycle/

## Stress-run the lifecycle goleak suite — repro target for the CI-only flake
## where StreamManager/WorkspaceStream goroutines linger past TestMain on
## slow runners. Local hardware almost never reproduces the leak under a
## single pass; -count=20 (configurable) plus -race is the smallest cadence
## that consistently surfaces the race during development.
##
## Override the loop count with `make test-lifecycle-goleak LIFECYCLE_GOLEAK_COUNT=N`.
LIFECYCLE_GOLEAK_COUNT ?= 20
test-lifecycle-goleak:
@echo "Stress-running lifecycle goleak suite ($(LIFECYCLE_GOLEAK_COUNT)x)..."
$(CGO_PREFIX) $(GO) test -tags fts5 -race -count=$(LIFECYCLE_GOLEAK_COUNT) -timeout 600s ./internal/agent/runtime/lifecycle/ ./internal/agent/runtime/agentctl/

## Run tests with coverage
test-coverage:
@echo "Running tests with coverage..."
Expand Down Expand Up @@ -227,6 +239,7 @@ help:
@echo " test Run tests (full suite — fails on Windows pending fixture cleanup)"
@echo " test-windows Run only the Windows-clean subset (matches CI windows-latest job)"
@echo " test-e2e Run E2E adapter tests (real agents, costs money)"
@echo " test-lifecycle-goleak Stress-run lifecycle goleak suite (override LIFECYCLE_GOLEAK_COUNT=N)"
@echo " test-coverage Run tests with coverage report"
@echo " lint Run golangci-lint"
@echo " fmt Format code"
Expand Down
37 changes: 35 additions & 2 deletions apps/backend/internal/agent/runtime/agentctl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,18 @@ type Client struct {
// WebSocket connections for streaming
agentStreamConn *websocket.Conn
workspaceStreamConn *websocket.Conn
mu sync.RWMutex
// workspaceStream is the most-recent workspace stream returned by
// StreamWorkspace, retained so Client.Close can wait for its read/write
// goroutines to drain. Cleared by readWorkspaceStream's defer once the
// stream tears down.
workspaceStream *WorkspaceStream
// closed flips to true on Client.Close and prevents new StreamWorkspace
// dials from leaking goroutines past the close barrier. Agent (updates)
// stream is not gated on this flag because the cascade flow legitimately
// stops + restarts the agent stream on the same client; gating it would
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: Agent stream restart can clobber live connection pointer. Old read goroutine sets agentStreamConn=nil after new stream starts. Guard cleanup by connection identity.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At apps/backend/internal/agent/runtime/agentctl/client.go, line 49:

<comment>Agent stream restart can clobber live connection pointer. Old read goroutine sets `agentStreamConn=nil` after new stream starts. Guard cleanup by connection identity.</comment>

<file context>
@@ -43,19 +43,17 @@ type Client struct {
+	// closed flips to true on Client.Close and prevents new StreamWorkspace
+	// dials from leaking goroutines past the close barrier. Agent (updates)
+	// stream is not gated on this flag because the cascade flow legitimately
+	// stops + restarts the agent stream on the same client; gating it would
+	// strand workflow step transitions on a closed client.
 	closed bool
</file context>

// strand workflow step transitions on a closed client.
closed bool
mu sync.RWMutex

// Shared write mutex for agent stream (used by StreamUpdates and sendStreamRequest)
streamWriteMu sync.Mutex
Expand Down Expand Up @@ -656,10 +667,32 @@ type (
ProcessStatusUpdate = types.ProcessStatusUpdate
)

// Close closes all connections and releases resources
// Close closes all connections and releases resources. It is a drain
// barrier for workspace stream goroutines: when Close returns, the workspace
// read/write loops have fully exited and future StreamWorkspace calls return
// immediately with an error. The agent (updates) stream is closed but not
// drained synchronously — the cascade flow legitimately calls Close on a
// client whose updates stream is still mid-event, and blocking would stall
// workflow step transitions.
func (c *Client) Close() {
c.mu.Lock()
c.closed = true
ws := c.workspaceStream
c.mu.Unlock()

c.CloseUpdatesStream()
// CloseWorkspaceStream closes the raw conn to wake the blocked read loop.
// ws.Close (below) is needed to close the writeLoop's closeCh; closeOnce
// makes ws.Close idempotent so the duplicate conn.Close it issues just
// logs at Debug. Both calls together wake both goroutines deterministically.
c.CloseWorkspaceStream()

// Wait for the workspace stream's read/write goroutines to fully unwind.
if ws != nil {
ws.Close()
ws.Wait()
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agentStreamConn identity guard still missing in readUpdatesStream's defer

This note applies to readUpdatesStream's defer in agent.go (which has no net diff in this PR and can't be commented on directly).

The readWorkspaceStream defer was fixed in commit 2 to guard both workspaceStreamConn and workspaceStream resets with identity checks (if c.workspaceStreamConn == conn). The analogous guard is still absent in readUpdatesStream's defer, which unconditionally writes c.agentStreamConn = nil.

Commit 3 intentionally removed the synchronous drain barrier for the agent stream — meaning the old goroutine can outlive CloseUpdatesStream() and the subsequent StreamUpdates dial in RestartAgentProcess. The race window is now:

  1. CloseUpdatesStream() → closes old conn, sets agentStreamConn = nil
  2. StreamUpdates → new dial succeeds, sets agentStreamConn = newConn
  3. Old goroutine exits → defer fires: agentStreamConn = nil overwrites newConn
  4. Next CloseUpdatesStream() sees nil, silently skips close → active conn orphaned

The fix is one extra if-guard in readUpdatesStream's defer in agent.go:

c.mu.Lock()
if c.agentStreamConn == conn {
    c.agentStreamConn = nil
}
c.mu.Unlock()

Greptile flagged this as P1 in their review. The response indicated it would be fixed but it landed only in workspace_stream.go. Low-risk follow-up, but should be tracked before the next restart-path regression.

if c.httpClient != nil {
c.httpClient.CloseIdleConnections()
}
Expand Down
126 changes: 126 additions & 0 deletions apps/backend/internal/agent/runtime/agentctl/client_close_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package client

import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
"time"

"github.com/gorilla/websocket"
"github.com/kandev/kandev/internal/common/logger"
)

// closeBarrierMockServer is a minimal agentctl mock that exposes the two
// WebSocket endpoints needed for Close/drain regression coverage. Handlers
// stay open until the client tears down, mirroring the behaviour real
// agentctl exhibits when the manager hasn't asked it to exit.
type closeBarrierMockServer struct {
server *httptest.Server

mu sync.Mutex
wsConns []*websocket.Conn
connected chan struct{}
once sync.Once
}

func newCloseBarrierMockServer(t *testing.T) *closeBarrierMockServer {
t.Helper()
m := &closeBarrierMockServer{connected: make(chan struct{})}
upgrader := websocket.Upgrader{CheckOrigin: func(*http.Request) bool { return true }}

handler := func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
m.mu.Lock()
m.wsConns = append(m.wsConns, conn)
m.mu.Unlock()
m.once.Do(func() { close(m.connected) })
// Block until client closes.
for {
if _, _, err := conn.ReadMessage(); err != nil {
_ = conn.Close()
return
}
}
}

mux := http.NewServeMux()
mux.HandleFunc("/api/v1/agent/stream", handler)
mux.HandleFunc("/api/v1/workspace/stream", handler)
m.server = httptest.NewServer(mux)
t.Cleanup(func() {
m.mu.Lock()
for _, c := range m.wsConns {
_ = c.Close()
}
m.mu.Unlock()
m.server.Close()
})
return m
}

func newCloseBarrierTestClient(t *testing.T, serverURL string) *Client {
t.Helper()
url := strings.TrimPrefix(serverURL, "http://")
parts := strings.SplitN(url, ":", 2)
host := parts[0]
var port int
_, _ = fmt.Sscanf(parts[1], "%d", &port)
log, _ := logger.NewLogger(logger.LoggingConfig{Level: "error", Format: "json"})
return NewClient(host, port, log)
}

// TestClientClose_DrainsWorkspaceStream is the regression test for the
// CI-only goleak flake around StreamManager and WorkspaceStream goroutines
// surviving Close. After Close returns, the workspace read/write loops must
// have fully unwound — otherwise tests with `defer client.Close()` see
// lingering goroutines and goleak.VerifyTestMain fails. The agent (updates)
// stream is closed but not drained synchronously: the cascade flow legitimately
// stops + restarts the updates stream on the same client.
func TestClientClose_DrainsWorkspaceStream(t *testing.T) {
mock := newCloseBarrierMockServer(t)
client := newCloseBarrierTestClient(t, mock.server.URL)

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

ws, err := client.StreamWorkspace(ctx, WorkspaceStreamCallbacks{})
if err != nil {
t.Fatalf("StreamWorkspace failed: %v", err)
}
if ws == nil {
t.Fatal("nil WorkspaceStream")
}

select {
case <-mock.connected:
case <-time.After(2 * time.Second):
t.Fatal("mock server never observed a WS connection")
}

// Close must return promptly and have drained the workspace stream. A hung
// goroutine here would block Close forever (or, pre-fix, return early and
// leave the goroutine running past goleak's check).
done := make(chan struct{})
go func() {
Comment thread
jcfs marked this conversation as resolved.
client.Close()
close(done)
}()
select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatal("Client.Close did not return within 2s — workspace drain is stuck")
}

// Post-Close, StreamWorkspace must error so a racy second close path
// doesn't strand a new dial past the barrier.
if _, err := client.StreamWorkspace(context.Background(), WorkspaceStreamCallbacks{}); err == nil {
t.Error("StreamWorkspace after Close should return error, got nil")
}
}
42 changes: 35 additions & 7 deletions apps/backend/internal/agent/runtime/agentctl/workspace_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ type WorkspaceStream struct {
// StreamWorkspace opens a unified WebSocket connection for all workspace events
func (c *Client) StreamWorkspace(ctx context.Context, callbacks WorkspaceStreamCallbacks) (*WorkspaceStream, error) {
c.mu.Lock()
if c.closed {
c.mu.Unlock()
return nil, fmt.Errorf("agentctl client closed")
}
if c.workspaceStreamConn != nil {
c.mu.Unlock()
return nil, fmt.Errorf("workspace stream already connected")
Expand All @@ -57,19 +61,36 @@ func (c *Client) StreamWorkspace(ctx context.Context, callbacks WorkspaceStreamC
return nil, fmt.Errorf("failed to connect to workspace stream: %w", err)
}

c.mu.Lock()
c.workspaceStreamConn = conn
c.mu.Unlock()

c.logger.Info("connected to workspace stream", zap.String("url", wsURL))

stream := &WorkspaceStream{
conn: conn,
inputCh: make(chan types.WorkspaceStreamMessage, 64),
closeCh: make(chan struct{}),
logger: c.logger,
}

// Race: Close may have fired between the dial returning and us re-acquiring
// the lock. Drop the new conn + stream instead of leaking the read/write
// goroutines past Client.Close's drain barrier.
c.mu.Lock()
Comment thread
jcfs marked this conversation as resolved.
if c.closed {
c.mu.Unlock()
_ = conn.Close()
return nil, fmt.Errorf("agentctl client closed during workspace stream dial")
}
// Re-check after dial: two concurrent StreamWorkspace callers can both pass
// the pre-dial guard and race here. The later one would orphan the first
// conn and its goroutines without this check.
if c.workspaceStreamConn != nil {
c.mu.Unlock()
_ = conn.Close()
return nil, fmt.Errorf("workspace stream already connected")
}
c.workspaceStreamConn = conn
c.workspaceStream = stream
Comment thread
jcfs marked this conversation as resolved.
c.mu.Unlock()

c.logger.Info("connected to workspace stream", zap.String("url", wsURL))

// Track both goroutines on the per-stream wg so WorkspaceStream.Wait can
// block until they have fully unwound. The workspace read loop only invokes
// data callbacks (shell/git/process) and self-closes on exit — it never
Expand Down Expand Up @@ -104,7 +125,14 @@ var workspaceTracedTypes = map[types.WorkspaceMessageType]bool{
func (c *Client) readWorkspaceStream(conn *websocket.Conn, stream *WorkspaceStream, callbacks WorkspaceStreamCallbacks) {
defer func() {
c.mu.Lock()
c.workspaceStreamConn = nil
// Guard both resets by identity — a concurrent StreamWorkspace caller
// may have replaced the conn/stream pointers since this read loop started.
if c.workspaceStreamConn == conn {
c.workspaceStreamConn = nil
}
if c.workspaceStream == stream {
Comment thread
jcfs marked this conversation as resolved.
c.workspaceStream = nil
}
c.mu.Unlock()
stream.Close()
}()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,8 +575,13 @@ func (m *Manager) RestartAgentProcess(ctx context.Context, executionID string) e
return fmt.Errorf("failed to get agent config for restart: %w", err)
}

// 1. Close WebSocket streams (updates + workspace)
execution.agentctl.Close()
// 1. Close WebSocket streams (updates + workspace). Use per-stream Close
// methods rather than client.Close — the latter is a terminal drain
// barrier that flips the client into a closed state and would block
// every StreamUpdates/StreamWorkspace call that this same restart path
// makes a few lines below.
execution.agentctl.CloseUpdatesStream()
execution.agentctl.CloseWorkspaceStream()

// 2. Stop the agent subprocess via agentctl (keeps agentctl server alive)
if err := execution.agentctl.Stop(ctx); err != nil {
Expand Down
Loading
Loading