Skip to content

fix(agentctl): order workspace stream wg.Add before publish to fix Close race#1272

Merged
jcfs merged 2 commits into
mainfrom
feature/main-backedn-tests-f-kle
Jun 4, 2026
Merged

fix(agentctl): order workspace stream wg.Add before publish to fix Close race#1272
jcfs merged 2 commits into
mainfrom
feature/main-backedn-tests-f-kle

Conversation

@jcfs
Copy link
Copy Markdown
Contributor

@jcfs jcfs commented Jun 4, 2026

Summary

  • Fix the WARNING: DATA RACE on WorkspaceStream.wg (Add(2) vs Wait()) that has been failing Backend Tests on main for the last several pushes (TestInitializeAndPrompt_StreamBeforeInitialize, TestManager_RestartAgentProcess_StopErrorIsNonFatal).
  • Client.StreamWorkspace published c.workspaceStream = stream under the lock, then released the lock and ran stream.wg.Add(2) afterwards. A concurrent Client.Close could capture the new pointer under the same lock and call ws.Wait() outside it — racing the not-yet-executed Add(2) on a counter still at 0.
  • Move stream.wg.Add(2) under the same lock that publishes the pointer. The unlock/lock pair now carries the Add into Close's view, so any caller that observes c.workspaceStream already sees the counter bumped before invoking Wait.
  • Add TestClientClose_StreamWorkspaceAddWaitRace — waits for the mock server to observe the WS upgrade, then fires Close in parallel with StreamWorkspace. Pre-fix it deterministically triggers the same race in under 10 iterations; post-fix it stays quiet across 200+ iterations under -race.

Race trace from CI (run 26925964563)

Write at 0x00c0005ccff0 by goroutine 1302:
  WorkspaceStream.Wait()  workspace_stream.go:228
  Client.Close()           client.go:693
  deferred test cleanup

Previous read at 0x00c0005ccff0 by goroutine 1305:
  Client.StreamWorkspace() workspace_stream.go:98  ← stream.wg.Add(2)
  StreamManager.connectWorkspaceStream  streams.go:442
  StreamManager.start.func1             streams.go:203

Test plan

  • go test -race -count=1 ./internal/agent/runtime/... — green
  • go test -race -run TestClientClose_StreamWorkspaceAddWaitRace -count=20 -cpu=8 ./internal/agent/runtime/agentctl/ — green
  • Revert fix locally → new regression test reliably fails under -race
  • make fmt lint clean

🤖 Generated with Claude Code

…ose race

Client.StreamWorkspace published the new WorkspaceStream pointer to
c.workspaceStream under the lock, then released the lock and called
stream.wg.Add(2) afterwards. A concurrent Client.Close captured
c.workspaceStream under the same lock and then called ws.Wait() outside
of it, racing the subsequent Add(2) on a WaitGroup whose counter was
still 0. The race detector flagged this as Add-after-Wait on the same
counter address — reproduced in CI in
TestInitializeAndPrompt_StreamBeforeInitialize and
TestManager_RestartAgentProcess_StopErrorIsNonFatal, both of which use a
deferred client.Close after the StreamManager spawns the workspace
stream goroutine.

Move stream.wg.Add(2) under the same lock that publishes the pointer.
The unlock/lock pair now carries the Add into Close's view, so any
caller that observes c.workspaceStream is guaranteed to see the counter
already bumped before invoking Wait.

Adds TestClientClose_StreamWorkspaceAddWaitRace as a deterministic
regression: it waits for the server to observe the WS upgrade, then
fires Close in parallel with StreamWorkspace — pre-fix this triggers
the race within a handful of iterations under -race.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@claude
Copy link
Copy Markdown

claude Bot commented Jun 4, 2026

Claude finished @jcfs's task in 3m 6s —— View job


Code Review

  • Gather context — reading changed files and surrounding code
  • Analyze the race fix in workspace_stream.go
  • Analyze the new regression test in client_close_test.go
  • Post inline findings
  • Final summary

Summary

The fix is correct and well-reasoned. Moving stream.wg.Add(2) inside the c.mu lock before publishing c.workspaceStream is the minimal and precise fix for this race. The happen-before relationship is now properly established: any goroutine that captures the stream pointer from c.mu is guaranteed to see the WaitGroup counter already at 2, so ws.Wait() in Close() can never observe a spurious 0.

Correctness walkthrough:

  • Add(2) is only reached on the success path (after both c.closed and existing-conn checks), so there's no risk of incrementing the counter without starting goroutines.
  • Close() acquires c.mu, captures ws, releases the lock, then calls CloseWorkspaceStream() + ws.Close() before ws.Wait(). The goroutines (started after the unlock) see a closed conn/closeCh and exit promptly — no deadlock possible.
  • The readWorkspaceStream deferred cleanup acquires c.mu and releases it before calling wg.Done(), so it doesn't conflict with Close()'s lock pattern.

Two minor test observations (see inline comments):

  1. ctx, cancel in the loop body — cancel() should be deferred to cover all exit paths including t.Fatalf.
  2. newCloseBarrierMockServer(t) called per-iteration registers 100 t.Cleanup callbacks, keeping all 100 httptest.Server instances alive until test end. Fine at 100 iterations; worth being aware of if the count grows.

Verdict: Approve. The race fix is minimal, correct, and explained clearly. The regression test directly targets the failing CI window and is a good addition.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Jun 4, 2026

Review Change Stack

📝 Walkthrough

Walkthrough

This PR fixes a race condition in the StreamWorkspace goroutine synchronization by reordering when the wait-group is incremented, ensuring Close can safely wait on goroutine completion. A regression test validates the fix by repeatedly calling Close immediately after startup.

Changes

StreamWorkspace synchronization race fix

Layer / File(s) Summary
Goroutine wait-group ordering
apps/backend/internal/agent/runtime/agentctl/workspace_stream.go
stream.wg.Add(2) is moved before publishing workspaceStreamConn and workspaceStream to establish a happens-before relationship that prevents a race between publishing and the Close/Wait shutdown path.
Regression test for the race window
apps/backend/internal/agent/runtime/agentctl/client_close_test.go
TestClientClose_StreamWorkspaceAddWaitRace runs 100 iterations to exercise the timing window where Close() is invoked immediately after the mock observes the WebSocket upgrade, validating that Close returns promptly without hanging.

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~10 minutes

Possibly related PRs

  • kdlbs/kandev#1227: Related synchronization work on agentctl's StreamWorkspace/Client.Close race condition using a workspace-stream drain guard and adding close-barrier test coverage for the same timing window.

Poem

🐰 Goroutines aligned with grace,
wg.Add before we race,
Close comes fast, no hanging here,
Tests confirm the path is clear!

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the main fix: reordering workspace stream wg.Add before publishing to resolve a Close race condition.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Description check ✅ Passed The PR description is comprehensive and well-structured, exceeding the template requirements with detailed context, root cause analysis, test results, and a race trace from CI.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feature/main-backedn-tests-f-kle

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@greptile-apps
Copy link
Copy Markdown

greptile-apps Bot commented Jun 4, 2026

Greptile Summary

This PR fixes a data race between Client.StreamWorkspace and Client.Close in the agentctl package. Moving stream.wg.Add(2) from after the publication unlock to inside the same critical section as c.workspaceStream = stream establishes the necessary happens-before relationship, ensuring any goroutine that reads the stream pointer also sees the WaitGroup counter already incremented.

  • Race fix (workspace_stream.go): wg.Add(2) is now called under the same mutex that publishes c.workspaceStream, so Client.Close's ws.Wait() can never observe a counter of 0 on a stream it retrieved from the pointer.
  • Regression test (client_close_test.go): TestClientClose_StreamWorkspaceAddWaitRace creates a mock server, waits for the WS upgrade signal (the exact timing window shown in the CI trace), then fires Close concurrently. Pre-fix this reliably triggers the race detector; post-fix it runs clean at 100+ iterations under -race.

Confidence Score: 5/5

The change is a one-line reorder within an existing critical section; the goroutines still start after the unlock and correctly call wg.Done() on exit, so Wait() will always drain cleanly.

The fix is minimal and mechanically correct: Add(2) now happens under the same lock that publishes the stream pointer, and Client.Close already acquires that lock before reading the pointer. The regression test reproduces the exact CI race window and confirms the fix holds under the race detector across many iterations. No new APIs, no control-flow changes, no side effects introduced.

No files require special attention.

Important Files Changed

Filename Overview
apps/backend/internal/agent/runtime/agentctl/workspace_stream.go Moves wg.Add(2) under the publication lock to fix Add-vs-Wait data race; the fix is minimal, correct, and well-commented.
apps/backend/internal/agent/runtime/agentctl/client_close_test.go Adds a targeted regression test that reproduces the exact CI race window; test structure, timeouts, and iteration count are well-suited for the -race detector.

Sequence Diagram

sequenceDiagram
    participant SW as StreamWorkspace goroutine
    participant Lock as c.mu (lock)
    participant Close as Client.Close goroutine

    Note over SW: Dial WebSocket (outside lock)
    SW->>Lock: c.mu.Lock()
    SW->>SW: stream.wg.Add(2)  ← now under lock
    SW->>SW: "c.workspaceStream = stream"
    SW->>Lock: c.mu.Unlock()

    par Goroutines launched
        SW->>SW: go readWorkspaceStream → defer wg.Done()
        SW->>SW: go writeLoop → defer wg.Done()
    and Close fires
        Close->>Lock: c.mu.Lock()
        Close->>Close: "ws = c.workspaceStream (sees counter=2)"
        Close->>Lock: c.mu.Unlock()
        Close->>Close: CloseWorkspaceStream() / ws.Close()
        Close->>Close: ws.Wait() — blocks until both Done()
    end

    SW-->>Close: wg.Done() x2 → Wait() returns
Loading

Reviews (2): Last reviewed commit: "test(agentctl): defer ctx cancel in Stre..." | Re-trigger Greptile

Comment thread apps/backend/internal/agent/runtime/agentctl/client_close_test.go
Comment thread apps/backend/internal/agent/runtime/agentctl/client_close_test.go
Comment thread apps/backend/internal/agent/runtime/agentctl/workspace_stream.go
Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No issues found across 2 files

Re-trigger cubic

Wrap the per-iteration body in a runIter closure so `defer cancel()`
covers every exit path (timeout fatals as well as the normal flow),
replacing the explicit cancel() calls. Pure test hygiene — addresses
Claude review comment on PR #1272.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@claude
Copy link
Copy Markdown

claude Bot commented Jun 4, 2026

Claude finished @jcfs's task in 1m 30s —— View job


Code Review

  • Gather context — reading changed files and surrounding code
  • Analyze the race fix in workspace_stream.go
  • Analyze the new regression test in client_close_test.go
  • Verify previous review comments addressed
  • Final summary

Summary

The PR is correct and ready to merge. The previous review comment about defer cancel() has been addressed in commit 9b4df0c by wrapping the iteration body in a runIter closure — defer cancel() now covers all exit paths including t.Fatalf.

Race fix verified correct (workspace_stream.go:95):

  • stream.wg.Add(2) is now inside the c.mu lock, before c.workspaceStream = stream is published
  • Any goroutine that observes c.workspaceStream via c.mu is guaranteed to see counter=2
  • Close() acquires c.mu, captures ws, then calls CloseWorkspaceStream() + ws.Close() before ws.Wait() — the two goroutines see a closed conn/closeCh and exit promptly, so no deadlock is possible
  • The readWorkspaceStream deferred cleanup acquires and releases c.mu before calling wg.Done(), no conflict with Close()'s lock pattern

Regression test verified (client_close_test.go:140):

  • runIter closure correctly scopes defer cancel() to each iteration ✓
  • Server accumulation (100 httptest.Server instances via t.Cleanup) is acceptable at this count — previously noted as a minor observation ✓
  • <-streamErr at the end of each iteration ensures the StreamWorkspace goroutine is fully joined before proceeding ✓

No new findings. The fix is minimal, targeted, and the regression test directly covers the CI failure window shown in the race trace.

@jcfs jcfs merged commit 3652f0f into main Jun 4, 2026
47 checks passed
@jcfs jcfs deleted the feature/main-backedn-tests-f-kle branch June 4, 2026 11:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant