Skip to content

fix(agent): stop thinking timer on abort by emitting abort stream part#13568

Open
DeJeune wants to merge 3 commits intomainfrom
DeJeune/fix-thinking-timer
Open

fix(agent): stop thinking timer on abort by emitting abort stream part#13568
DeJeune wants to merge 3 commits intomainfrom
DeJeune/fix-thinking-timer

Conversation

@DeJeune
Copy link
Collaborator

@DeJeune DeJeune commented Mar 17, 2026

What this PR does

Before this PR:

After force-stopping an Agent while it is thinking, the thinking time counter continues running indefinitely because callbacks.onError is never called for agent streams on abort.

After this PR:

The thinking timer stops correctly when the user force-stops an Agent, because the agent SSE stream now emits an 'abort' stream part before erroring — matching AI SDK stream behavior — so callbacks.onError fires and updates block statuses.

Fixes #13566

image

Why we need it and why it was done in this way

The root cause is in createSSEReadableStream (used only for agent streams). Its abort handler called controller.error() directly without first enqueuing an 'abort' TextStreamPart. The standard AI SDK streams emit an 'abort' part before erroring, which AiSdkToChunkAdapter.convertAndEmitChunk converts to a ChunkType.ERROR chunk, triggering callbacks.onError to update block statuses (STREAMING → PAUSED). Without that stream part, the adapter's processStream catch silently swallowed the AbortError, and onError was never invoked.

The following tradeoffs were made:

A try/catch wraps the controller.enqueue() in case the controller is already closed. This is defensive but necessary since the abort handler can race with a natural stream close.

The following alternatives were considered:

  • Patching AiSdkToChunkAdapter.processStream to emit the ERROR chunk in the catch block when no error chunk was emitted. Rejected because fixing the upstream source (createSSEReadableStream) is cleaner and makes agent streams consistent with AI SDK streams.

Breaking changes

None.

Special notes for your reviewer

Single-line root cause: createSSEReadableStream's abort handler → controller.error() without prior controller.enqueue({ type: 'abort' })AiSdkToChunkAdapter never emits ERROR chunk → onError never called → thinking block stays STREAMING.

Checklist

  • PR: The PR description is expressive enough and will help future contributors
  • Code: Write code that humans can understand and Keep it simple
  • Refactor: You have left the code cleaner than you found it (Boy Scout Rule)
  • Upgrade: Impact of this change on upgrade flows was considered and addressed if required
  • Documentation: Not required — bug fix, no user-facing behavior change
  • Self-review: I have reviewed my own code before requesting review from others

Release note

fix: Agent thinking timer no longer continues counting after force-stopping the Agent

The agent SSE stream's abort handler directly called controller.error()
without first enqueuing an 'abort' stream part. This meant
AiSdkToChunkAdapter never emitted the ERROR chunk, so callbacks.onError
was never called and the thinking block stayed in STREAMING status,
causing the thinking timer to run indefinitely after force-stopping.

Enqueue { type: 'abort' } before controller.error() to match AI SDK
stream behavior, ensuring the error callback fires and block statuses
are properly updated on abort.

Fixes #13566

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Signed-off-by: suyao <sy20010504@gmail.com>
@DeJeune DeJeune requested a review from 0xfullex as a code owner March 17, 2026 17:23
@DeJeune DeJeune requested a review from EurFelux March 18, 2026 02:39
Copy link
Collaborator

@EurFelux EurFelux left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bug diagnosis is correct — the thinking timer not stopping on abort is a real issue. However, the fix is applied at the wrong layer.

createSSEReadableStream is transport (SSE parsing), and shouldn't emit synthetic { type: 'abort' } parts to simulate AI SDK runtime behavior. In the normal chat pipeline, AI SDK's runtime handles abort signal → abort chunk translation. In the Agent pipeline, AI SDK's chunk types are used as a protocol but the runtime is absent — so nobody owns stream lifecycle management (abort, error, cleanup).

The proper fix is to introduce a middleware layer (between SSE stream and AiSdkToChunkAdapter) that takes over the stream lifecycle responsibilities that AI SDK normally handles. Without it, we'll continue patching AI SDK runtime behavior into unrelated layers.

The agent SSE stream's abort handler in createSSEReadableStream called
controller.error() without first emitting an 'abort' stream part. This
meant AiSdkToChunkAdapter never fired callbacks.onError, leaving the
thinking block in STREAMING status and the timer running indefinitely.

Instead of patching the transport layer (createSSEReadableStream) or
the protocol adapter (AiSdkToChunkAdapter), introduce a dedicated
stream lifecycle middleware — withAbortStreamPart — that sits between
them. It intercepts source stream errors on abort and emits the
{ type: 'abort' } stream part that the AI SDK runtime would normally
produce, keeping each layer's responsibilities clean.

Fixes #13566

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Signed-off-by: suyao <sy20010504@gmail.com>
Copy link
Collaborator

@EurFelux EurFelux left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The layering is much better now — withAbortStreamPart as a dedicated lifecycle middleware is the right approach. One minor issue with silent error swallowing in the non-abort path, see inline comment.

Comment on lines +361 to +371
} catch {
// When the source errors due to abort, emit the abort stream part
// so downstream consumers (AiSdkToChunkAdapter) can fire onError.
if (signal.aborted) {
try {
controller.enqueue({ type: 'abort' } as TextStreamPart<Record<string, any>>)
} catch {
// Controller may already be closed
}
}
controller.close()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The middleware extraction looks great — clean separation of concerns 👍

One issue: the catch block silently swallows non-abort errors. If reader.read() throws for a reason other than abort (e.g. network failure, decoding error), the stream just closes normally via controller.close() and downstream has no idea an error occurred.

Consider propagating the error in the non-abort path:

} catch (error) {
  if (signal.aborted) {
    try {
      controller.enqueue({ type: 'abort' } as TextStreamPart<Record<string, any>>)
    } catch {
      // Controller may already be closed
    }
    controller.close()
  } else {
    controller.error(error)
  }
}

This way abort gets the lifecycle treatment, and genuine errors still propagate.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — fixed in 63c7612. Non-abort errors now propagate via controller.error(error), only abort gets the lifecycle treatment (enqueue abort part + close).

The catch block in withAbortStreamPart was silently closing the stream
for all errors. Now only abort errors get the lifecycle treatment
(enqueue abort part + close), while genuine errors (network failure,
decoding error) propagate via controller.error() so downstream
consumers are properly notified.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Signed-off-by: suyao <sy20010504@gmail.com>
@DeJeune DeJeune requested a review from EurFelux March 19, 2026 02:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug]: Endless Thinking Time

2 participants