Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion src/renderer/src/store/thunk/messageThunk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,49 @@ const createSSEReadableStream = (
})
}

/**
* Wraps a parsed stream with abort-signal lifecycle handling.
* In the normal chat pipeline the AI SDK runtime converts abort signals into
* `{ type: 'abort' }` stream parts. The agent pipeline bypasses the AI SDK
* runtime, so this middleware fills that gap — keeping the SSE parser
* (transport) and the chunk adapter (protocol) free of lifecycle concerns.
*/
const withAbortStreamPart = (
source: ReadableStream<TextStreamPart<Record<string, any>>>,
signal: AbortSignal
): ReadableStream<TextStreamPart<Record<string, any>>> => {
const reader = source.getReader()

return new ReadableStream<TextStreamPart<Record<string, any>>>({
async pull(controller) {
try {
const { value, done } = await reader.read()
if (done) {
controller.close()
return
}
controller.enqueue(value)
} catch (error) {
// When the source errors due to abort, emit the abort stream part
// so downstream consumers (AiSdkToChunkAdapter) can fire onError.
if (signal.aborted) {
try {
controller.enqueue({ type: 'abort' } as TextStreamPart<Record<string, any>>)
} catch {
// Controller may already be closed
}
controller.close()
} else {
controller.error(error)
}
}
},
cancel(reason) {
return reader.cancel(reason)
}
})
}

const createAgentMessageStream = async (
apiServer: ApiServerConfig,
agentSession: AgentSessionContext,
Expand Down Expand Up @@ -374,7 +417,8 @@ const createAgentMessageStream = async (
throw new Error('Agent message stream has no body')
}

return createSSEReadableStream(response.body, signal)
const sseStream = createSSEReadableStream(response.body, signal)
return withAbortStreamPart(sseStream, signal)
}
// TODO: 后续可以将db操作移到Listener Middleware中
// export const saveMessageAndBlocksToDB = async (message: Message, blocks: MessageBlock[], messageIndex: number = -1) => {
Expand Down
Loading