fix(lifecycle): drain StreamManager goroutines in tests#1227
Conversation
Goroutines spawned by `agentctl.Client.StreamUpdates` / `StreamWorkspace` and by `lifecycle.StreamManager.connectWorkspaceStream` could outlive the tests that created them on slow CI runners, causing intermittent `goleak.VerifyTestMain` failures in `internal/agent/runtime/lifecycle` even though every individual test passed. The leak required the test to race: `client.Close()` could return while a workspace dial was still in flight, leaving the just-spawned WS read/write loops with nobody to drain them, and `StreamManager.Wait()` only fired the drain when the external `stopCh` had been closed first. Make the close paths absolute drain barriers: - `Client.Close()` now tracks every stream goroutine it spawns (`streamWG` + a per-stream `WorkspaceStream` reference) and blocks until all of them have exited. A `closed` flag flipped under the client mutex makes subsequent `StreamUpdates` / `StreamWorkspace` calls reject the dial, so a Close that races a dial in flight cannot leave behind a stranded WS connection. `manager_interaction.go` switches the restart path to per-stream Close helpers so it can keep reusing the client after teardown. - `StreamManager.Wait()` closes a new internal `waitCh` that the retry backoff (`sleepOrStop`) and the connected `<-ws.Done() / <-stop>` select also observe. This makes `Wait` an absolute drain barrier regardless of whether the caller ever closes the external `stopCh`. `streamContext` carries both stops through `stopChannelContext` so in-flight WebSocket dials cancel on either signal. Local stress (`make test-lifecycle-goleak LIFECYCLE_GOLEAK_COUNT=20`) passes 20/20 with `-race` after the fix. The new `TestClientClose_DrainsStreamGoroutines` covers the drain barrier and the post-Close rejection path. Added a `test-lifecycle-goleak` Makefile target as the repro hook for the CI-only flake. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
|
Claude finished @jcfs's task in 6m 43s —— View job Code ReviewTodo:
Overall AssessmentSolid, well-reasoned fix for the CI goleak flake. The two-pronged approach — closing Findings
Minor observations (not inline)
|
📝 WalkthroughSummary by CodeRabbit
WalkthroughThis PR hardens goroutine lifecycle management in agent control and stream orchestration to prevent leaks during client shutdown. It adds dual-level drain barriers (Client and StreamManager), detects and handles dial-time races, and introduces stress-testing infrastructure to validate the fixes. ChangesGoroutine Lifecycle Hardening and Drain Barriers
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
|
||
| // streamWG tracks every stream goroutine spawned by this client | ||
| // (readUpdatesStream and, transitively via WorkspaceStream.wg, the | ||
| // workspace read/write loops). Close waits on it so callers get a true |
There was a problem hiding this comment.
The comment here says streamWG tracks workspace goroutines "transitively via WorkspaceStream.wg", but that's not accurate. streamWG only tracks readUpdatesStream. Workspace goroutines live on the per-stream stream.wg and are drained in Close() via the explicit ws.Wait() call — not through streamWG at all. Suggest wording like:
// streamWG tracks the updates-stream goroutine (readUpdatesStream) spawned
// by this client. Workspace read/write goroutines are tracked on each
// WorkspaceStream's own wg and drained via ws.Wait() in Close().
| c.CloseUpdatesStream() | ||
| c.CloseWorkspaceStream() | ||
|
|
||
| // Wait for the workspace stream's read/write goroutines (closeOnce makes |
There was a problem hiding this comment.
CloseWorkspaceStream() (line 686) closes c.workspaceStreamConn directly — outside closeOnce. Then ws.Close() (line 692) enters closeOnce.Do and tries to close the same underlying conn a second time. The closeOnce only guards against multiple ws.Close() calls; it doesn't prevent CloseWorkspaceStream() from pre-closing the raw conn first.
In practice this is harmless — gorilla websocket's Close on an already-closed connection returns an error that is logged at Debug level — but the comment "ws.Close is idempotent via closeOnce" is slightly misleading here because the first close doesn't go through closeOnce. Consider either:
- Removing the
CloseWorkspaceStream()call fromClose()and relying solely onws.Close()+ws.Wait()to tear down the workspace stream, or - Adding a note that
CloseWorkspaceStreamtriggers the read-loop exit whilews.Closewakes the write-loop, so both are needed.
|
|
||
| func (c stopChannelContext) Done() <-chan struct{} { | ||
| if c.stopCh == nil { | ||
| func (c *stopChannelContext) Done() <-chan struct{} { |
There was a problem hiding this comment.
secondary (sm.waitCh) is always non-nil (set unconditionally in NewStreamManager), so the c.primary == nil && c.secondary == nil early-return path is dead code. mergeStops will now be spawned for every WebSocket dial attempt — including connectUpdatesStream's one-shot dial. Each mergeStops goroutine persists until waitCh (or stopCh) fires, which means a goroutine that lives from the moment the dial completes until sm.Wait() is called.
That's fine in production (always a stopCh) and in well-behaved tests (always a Wait()), but it's worth a comment noting that the goroutine is intentional and bounded, so a future reader doesn't flag it as a leak. Also worth noting that the mergeStops goroutine from connectUpdatesStream's ctx outlives connectUpdatesStream itself (which returns immediately after the dial), pinning the stopChannelContext alive until teardown.
There was a problem hiding this comment.
🧹 Nitpick comments (1)
apps/backend/internal/agent/runtime/lifecycle/streams.go (1)
432-449: 💤 Low valueConsider consolidating the duplicate select branches.
The shutdown select logic duplicates the
ws.Done()andwaitChcases. SincestopChcan be selected on even when nil (a receive on nil channel blocks forever), you could simplify to a single select block:select { case <-ws.Done(): case <-sm.stopCh: shutdown() case <-sm.waitCh: shutdown() }A receive on a nil channel never proceeds, so when
stopChis nil, onlyws.Done()andwaitChare effective. This removes the conditional branching.That said, the current explicit nil-check approach is clearer about intent and avoids relying on nil-channel semantics, so this is purely stylistic.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@apps/backend/internal/agent/runtime/lifecycle/streams.go` around lines 432 - 449, The duplicate select branches can be consolidated: remove the if sm.stopCh == nil conditional and replace both branches with one select that listens for <-ws.Done(), <-sm.stopCh, and <-sm.waitCh, calling the existing shutdown() function in the latter two cases; relying on Go's nil-channel semantics (receive on nil blocks) will make the <-sm.stopCh case inert when sm.stopCh is nil while preserving behavior for ws.Done() and sm.waitCh.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@apps/backend/internal/agent/runtime/lifecycle/streams.go`:
- Around line 432-449: The duplicate select branches can be consolidated: remove
the if sm.stopCh == nil conditional and replace both branches with one select
that listens for <-ws.Done(), <-sm.stopCh, and <-sm.waitCh, calling the existing
shutdown() function in the latter two cases; relying on Go's nil-channel
semantics (receive on nil blocks) will make the <-sm.stopCh case inert when
sm.stopCh is nil while preserving behavior for ws.Done() and sm.waitCh.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 82a94bf9-322b-4b5b-80e6-124e0f5ada93
📒 Files selected for processing (7)
apps/backend/Makefileapps/backend/internal/agent/runtime/agentctl/agent.goapps/backend/internal/agent/runtime/agentctl/client.goapps/backend/internal/agent/runtime/agentctl/client_close_test.goapps/backend/internal/agent/runtime/agentctl/workspace_stream.goapps/backend/internal/agent/runtime/lifecycle/manager_interaction.goapps/backend/internal/agent/runtime/lifecycle/streams.go
| // Race: Close may have fired between the dial returning and us re-acquiring | ||
| // the lock. Drop the new conn + stream instead of leaking the read/write | ||
| // goroutines past Client.Close's drain barrier. | ||
| c.mu.Lock() |
There was a problem hiding this comment.
Good catch on this race. One minor gap: the post-dial check here guards against closed but not against a second concurrent StreamWorkspace call that also passed the pre-dial workspaceStreamConn != nil check (both raced through the unlock on line 56). If two goroutines both dial and both reach this lock, the second one overwrites workspaceStreamConn / workspaceStream without closing the first connection, orphaning the first conn and its goroutines.
This is a pre-existing issue (the original code didn't check post-dial either), and the StreamManager never calls StreamWorkspace concurrently, so it's not a regression. But since this PR is already touching the post-dial section, adding if c.workspaceStreamConn != nil { conn.Close(); return nil, ... } here would seal the gap completely.
| // goroutine here would block Close forever (or, pre-fix, return early | ||
| // and leave the goroutine running past goleak's check). | ||
| done := make(chan struct{}) | ||
| go func() { |
There was a problem hiding this comment.
m.connected fires on the first server-side WS accept (via sync.Once). Since both StreamUpdates and StreamWorkspace return successfully before this wait, both dials are already complete and both client goroutines are live by the time we reach this select. The wait is a valid sanity check but doesn't actually guarantee the second server-side handler is in its blocking loop yet.
Functionally this is fine — streamWG.Add(1) and stream.wg.Add(2) are registered before the goroutines start, so Close() will drain all of them regardless. But a comment explaining the intent ("both client goroutines are already started; waiting on mock.connected is a belt-and-suspenders check that the read side of the socket is live before we call Close") would prevent future readers from misreading the once as "all connections are live".
|
| Filename | Overview |
|---|---|
| apps/backend/internal/agent/runtime/agentctl/client.go | Adds workspaceStream, closed, and streamWG fields; makes Close() a true drain barrier that sets the closed flag, waits on ws.Wait() for workspace goroutines, and then streamWG.Wait() for the updates goroutine. Race guards (double-check under lock) for both StreamUpdates and StreamWorkspace are correct. |
| apps/backend/internal/agent/runtime/agentctl/agent.go | Adds pre-dial and post-dial c.closed checks around StreamUpdates; streamWG.Add(1) is done inside the write lock so Close() cannot race it; goroutine wrapper calls streamWG.Done() on exit. Logic is correct. |
| apps/backend/internal/agent/runtime/agentctl/workspace_stream.go | Adds c.closed guard before and after dial; stores c.workspaceStream = stream inside the post-dial lock; readWorkspaceStream defer conditionally clears c.workspaceStream (identity check) to avoid clobbering a newer stream during restart. |
| apps/backend/internal/agent/runtime/lifecycle/streams.go | Adds waitCh/waitChOnce for an internal drain signal; refactors stopChannelContext to merge two stop channels via a lazily-spawned mergeStops goroutine. The mergeStops goroutine is not tracked in sm.wg, creating a narrow theoretical window after sm.wg.Wait() returns for the connectUpdatesStream path. |
| apps/backend/internal/agent/runtime/agentctl/client_close_test.go | New regression test for the Close drain guarantee. mock.connected sync.Once fires only for the first of two WS connections, so the comment overstates what is being synchronized. |
| apps/backend/internal/agent/runtime/lifecycle/manager_interaction.go | Switches RestartAgentProcess from agentctl.Close() to CloseUpdatesStream() + CloseWorkspaceStream() so the client is reusable after teardown. |
| apps/backend/Makefile | Adds test-lifecycle-goleak target with configurable LIFECYCLE_GOLEAK_COUNT and help entry. |
Reviews (1): Last reviewed commit: "fix(lifecycle): drain StreamManager goro..." | Re-trigger Greptile
| c.once.Do(func() { | ||
| c.merged = make(chan struct{}) | ||
| go c.mergeStops() | ||
| }) | ||
| return c.merged | ||
| } |
There was a problem hiding this comment.
mergeStops goroutine not covered by sm.wg
stopChannelContext.Done() lazily spawns a mergeStops goroutine via sync.Once. For connectWorkspaceStream the wrapper goroutine stays alive long enough (blocked in ws.Wait()) that the merge goroutine always exits before sm.wg.Wait() returns — safe. For connectUpdatesStream though, the wrapper goroutine exits immediately after StreamUpdates returns, so sm.wg.Wait() can return before the merge goroutine finishes closing c.merged. The PR's stated guarantee is that Wait() is an "absolute drain barrier," but this goroutine isn't part of it. goleak's grace period will almost always cover the nanosecond window, but under tight-loop stress tests (-count=20 with -race) a scheduler pause on a slow runner is exactly the scenario this PR is trying to harden against.
| select { | ||
| case <-mock.connected: | ||
| case <-time.After(2 * time.Second): | ||
| t.Fatal("mock server never observed a WS connection") | ||
| } | ||
|
|
||
| // Close must return promptly and have drained both streams. A hung | ||
| // goroutine here would block Close forever (or, pre-fix, return early |
There was a problem hiding this comment.
mock.connected only fires for the first of two WS connections
The closeBarrierMockServer uses sync.Once to close m.connected, so the channel fires after the very first server-side handler starts — not after both the /api/v1/agent/stream and /api/v1/workspace/stream handlers are running. The comment "Wait for both server-side WS handlers to register" is therefore inaccurate. Both client-side goroutines are already live by the time this select is reached (both dials returned successfully), so the drain assertion is still valid, but using a WaitGroup or counting two connections in the mock would make the test's intent match its implementation and guard against future changes that call these two endpoints in a different sequence.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
There was a problem hiding this comment.
4 issues found across 7 files
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="apps/backend/internal/agent/runtime/lifecycle/streams.go">
<violation number="1" location="apps/backend/internal/agent/runtime/lifecycle/streams.go:73">
P2: Per-stream `Done()` now spawns a goroutine that can outlive the stream. Normal `ws.Done()` exits do not close `stopCh`/`waitCh`, so reconnect cycles can pile idle `mergeStops` goroutines until full manager shutdown. Tie cancellation to stream lifetime or reuse a shared manager-level merged channel.</violation>
</file>
<file name="apps/backend/internal/agent/runtime/agentctl/workspace_stream.go">
<violation number="1" location="apps/backend/internal/agent/runtime/agentctl/workspace_stream.go:80">
P2: Re-check `workspaceStreamConn` after the dial and before assigning it. Two concurrent callers can both pass the pre-dial guard, and the later one will overwrite the first stream reference without closing the first connection.</violation>
<violation number="2" location="apps/backend/internal/agent/runtime/agentctl/workspace_stream.go:121">
P1: Clear `workspaceStreamConn` only for the same stream. Old read goroutine can zero out a newer connection pointer. Guard the conn reset like you guarded `workspaceStream`.</violation>
</file>
<file name="apps/backend/internal/agent/runtime/agentctl/client.go">
<violation number="1" location="apps/backend/internal/agent/runtime/agentctl/client.go:54">
P3: This comment overstates what `streamWG` tracks. It only tracks the updates-stream goroutine directly; workspace read/write loops are tracked on `WorkspaceStream.wg` and drained via `ws.Wait()` in `Close()`.</violation>
</file>
Reply with feedback, questions, or to request a fix.
Re-trigger cubic
| defer func() { | ||
| c.mu.Lock() | ||
| c.workspaceStreamConn = nil | ||
| if c.workspaceStream == stream { |
There was a problem hiding this comment.
P1: Clear workspaceStreamConn only for the same stream. Old read goroutine can zero out a newer connection pointer. Guard the conn reset like you guarded workspaceStream.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At apps/backend/internal/agent/runtime/agentctl/workspace_stream.go, line 121:
<comment>Clear `workspaceStreamConn` only for the same stream. Old read goroutine can zero out a newer connection pointer. Guard the conn reset like you guarded `workspaceStream`.</comment>
<file context>
@@ -105,6 +118,9 @@ func (c *Client) readWorkspaceStream(conn *websocket.Conn, stream *WorkspaceStre
defer func() {
c.mu.Lock()
c.workspaceStreamConn = nil
+ if c.workspaceStream == stream {
+ c.workspaceStream = nil
+ }
</file context>
| return c.stopCh | ||
| c.once.Do(func() { | ||
| c.merged = make(chan struct{}) | ||
| go c.mergeStops() |
There was a problem hiding this comment.
P2: Per-stream Done() now spawns a goroutine that can outlive the stream. Normal ws.Done() exits do not close stopCh/waitCh, so reconnect cycles can pile idle mergeStops goroutines until full manager shutdown. Tie cancellation to stream lifetime or reuse a shared manager-level merged channel.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At apps/backend/internal/agent/runtime/lifecycle/streams.go, line 73:
<comment>Per-stream `Done()` now spawns a goroutine that can outlive the stream. Normal `ws.Done()` exits do not close `stopCh`/`waitCh`, so reconnect cycles can pile idle `mergeStops` goroutines until full manager shutdown. Tie cancellation to stream lifetime or reuse a shared manager-level merged channel.</comment>
<file context>
@@ -31,49 +31,104 @@ type StreamManager struct {
- return c.stopCh
+ c.once.Do(func() {
+ c.merged = make(chan struct{})
+ go c.mergeStops()
+ })
+ return c.merged
</file context>
| c.workspaceStreamConn = conn | ||
| c.workspaceStream = stream |
There was a problem hiding this comment.
P2: Re-check workspaceStreamConn after the dial and before assigning it. Two concurrent callers can both pass the pre-dial guard, and the later one will overwrite the first stream reference without closing the first connection.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At apps/backend/internal/agent/runtime/agentctl/workspace_stream.go, line 80:
<comment>Re-check `workspaceStreamConn` after the dial and before assigning it. Two concurrent callers can both pass the pre-dial guard, and the later one will overwrite the first stream reference without closing the first connection.</comment>
<file context>
@@ -57,19 +61,28 @@ func (c *Client) StreamWorkspace(ctx context.Context, callbacks WorkspaceStreamC
+ _ = conn.Close()
+ return nil, fmt.Errorf("agentctl client closed during workspace stream dial")
+ }
+ c.workspaceStreamConn = conn
+ c.workspaceStream = stream
+ c.mu.Unlock()
</file context>
| c.workspaceStreamConn = conn | |
| c.workspaceStream = stream | |
| if c.workspaceStreamConn != nil { | |
| c.mu.Unlock() | |
| _ = conn.Close() | |
| return nil, fmt.Errorf("workspace stream already connected") | |
| } | |
| c.workspaceStreamConn = conn | |
| c.workspaceStream = stream |
| // streamWG tracks every stream goroutine spawned by this client | ||
| // (readUpdatesStream and, transitively via WorkspaceStream.wg, the | ||
| // workspace read/write loops). Close waits on it so callers get a true | ||
| // drain barrier instead of a fire-and-forget conn.Close. |
There was a problem hiding this comment.
P3: This comment overstates what streamWG tracks. It only tracks the updates-stream goroutine directly; workspace read/write loops are tracked on WorkspaceStream.wg and drained via ws.Wait() in Close().
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At apps/backend/internal/agent/runtime/agentctl/client.go, line 54:
<comment>This comment overstates what `streamWG` tracks. It only tracks the updates-stream goroutine directly; workspace read/write loops are tracked on `WorkspaceStream.wg` and drained via `ws.Wait()` in `Close()`.</comment>
<file context>
@@ -38,11 +38,25 @@ type Client struct {
// Shared write mutex for agent stream (used by StreamUpdates and sendStreamRequest)
streamWriteMu sync.Mutex
+ // streamWG tracks every stream goroutine spawned by this client
+ // (readUpdatesStream and, transitively via WorkspaceStream.wg, the
+ // workspace read/write loops). Close waits on it so callers get a true
</file context>
| // streamWG tracks every stream goroutine spawned by this client | |
| // (readUpdatesStream and, transitively via WorkspaceStream.wg, the | |
| // workspace read/write loops). Close waits on it so callers get a true | |
| // drain barrier instead of a fire-and-forget conn.Close. | |
| // streamWG tracks the updates-stream goroutine (readUpdatesStream) | |
| // spawned by this client. Workspace read/write loops are tracked on | |
| // each WorkspaceStream's own wg and drained via ws.Wait() in Close(). |
Summary
internal/agent/runtime/lifecyclegoleak.VerifyTestMainwould fail on slow runners with leakedStreamManager.connectWorkspaceStream+WorkspaceStream.writeLoop/ read loop goroutines, even though every individual test passed (see PR feat: full GitLab integration — parity with GitHub #1120 run 26745440163 job 78819616867).agentctl.Client.Close()an absolute drain barrier — tracks every stream goroutine it spawns and waits for them, plus aclosedflag that rejects newStreamUpdates/StreamWorkspacedials so a Close racing an in-flight dial cannot strand a fresh WS connection. The restart path inmanager_interaction.goswitches to per-stream Close helpers so it can keep reusing the client after teardown.StreamManager.Wait()an absolute drain barrier — closes a new internalwaitChthat the retry backoff and the connected<-ws.Done() / <-stop>select observe, so drain doesn't depend on the caller closing the externalstopChfirst.streamContextcarries both stops throughstopChannelContextso in-flight WS dials cancel on either signal.make test-lifecycle-goleak LIFECYCLE_GOLEAK_COUNT=Ntarget (defaults to 20) as the repro hook for the flake, plus aTestClientClose_DrainsStreamGoroutinesregression test in the agentctl package.Test plan
make test-lifecycle-goleak LIFECYCLE_GOLEAK_COUNT=20— 20/20 clean under-racego test -race -count=1 ./internal/agent/runtime/lifecycle/... ./internal/agent/runtime/agentctl/...go vet ./...go build ./...Run Backend Testspasses on first attempt🤖 Generated with Claude Code
Preview Environment
0bcdc06