diff --git a/src/client.ts b/src/client.ts index 5418a25d..eeb2a6a9 100644 --- a/src/client.ts +++ b/src/client.ts @@ -199,6 +199,22 @@ export interface ClientOptions { * @unit milliseconds */ timeout?: number | undefined; + + /** + * The maximum amount of time (in milliseconds) to wait between SSE events during streaming. + * If no event is received within this period, the stream will be aborted with a + * `StreamIdleTimeoutError`. + * + * This is distinct from `timeout` which applies to the overall request. `idleTimeout` + * specifically detects stalled streams where the connection is alive but no data is flowing. + * + * Can be overridden per-request via `RequestOptions.idleTimeout`. + * + * @unit milliseconds + * @default undefined (no idle timeout) + */ + idleTimeout?: number | undefined; + /** * Additional `RequestInit` options to be passed to `fetch` calls. * Properties will be overridden by per-request `fetchOptions`. @@ -270,6 +286,7 @@ export class BaseAnthropic { baseURL: string; maxRetries: number; timeout: number; + idleTimeout: number | undefined; logger: Logger; logLevel: LogLevel | undefined; fetchOptions: MergedRequestInit | undefined; @@ -314,6 +331,7 @@ export class BaseAnthropic { this.baseURL = options.baseURL!; this.timeout = options.timeout ?? BaseAnthropic.DEFAULT_TIMEOUT /* 10 minutes */; + this.idleTimeout = options.idleTimeout; this.logger = options.logger ?? console; const defaultLogLevel = 'warn'; // Set default logLevel early so that we can log a warning in parseLogLevel. @@ -342,6 +360,7 @@ export class BaseAnthropic { baseURL: this.baseURL, maxRetries: this.maxRetries, timeout: this.timeout, + idleTimeout: this.idleTimeout, logger: this.logger, logLevel: this.logLevel, fetch: this.fetch, @@ -954,6 +973,7 @@ export class BaseAnthropic { static APIConnectionError = Errors.APIConnectionError; static APIConnectionTimeoutError = Errors.APIConnectionTimeoutError; static APIUserAbortError = Errors.APIUserAbortError; + static StreamIdleTimeoutError = Errors.StreamIdleTimeoutError; static NotFoundError = Errors.NotFoundError; static ConflictError = Errors.ConflictError; static RateLimitError = Errors.RateLimitError; diff --git a/src/core/error.ts b/src/core/error.ts index 89ffec90..16bc9cd4 100644 --- a/src/core/error.ts +++ b/src/core/error.ts @@ -116,6 +116,39 @@ export class APIConnectionTimeoutError extends APIConnectionError { } } +/** + * Error thrown when a streaming response stalls - no SSE events received + * within the configured idle timeout period. + */ +export class StreamIdleTimeoutError extends APIConnectionError { + /** The configured idle timeout in milliseconds */ + readonly idleTimeoutMs: number; + /** When the last SSE event was received */ + readonly lastEventTime: Date; + /** Number of events received before the timeout */ + readonly eventCount: number; + + constructor({ + idleTimeoutMs, + lastEventTime, + eventCount, + message, + }: { + idleTimeoutMs: number; + lastEventTime: Date; + eventCount: number; + message?: string; + }) { + super({ + message: + message ?? `Stream stalled: no SSE event received for ${idleTimeoutMs}ms after ${eventCount} events`, + }); + this.idleTimeoutMs = idleTimeoutMs; + this.lastEventTime = lastEventTime; + this.eventCount = eventCount; + } +} + export class BadRequestError extends APIError<400, Headers> {} export class AuthenticationError extends APIError<401, Headers> {} diff --git a/src/core/streaming.ts b/src/core/streaming.ts index d9cad674..276edf77 100644 --- a/src/core/streaming.ts +++ b/src/core/streaming.ts @@ -1,4 +1,4 @@ -import { AnthropicError } from './error'; +import { AnthropicError, StreamIdleTimeoutError } from './error'; import { type ReadableStream } from '../internal/shim-types'; import { makeReadableStream } from '../internal/shims'; import { findDoubleNewlineIndex, LineDecoder } from '../internal/decoders/line'; @@ -11,6 +11,14 @@ import type { BaseAnthropic } from '../client'; import { APIError } from './error'; +export type StreamOptions = { + /** + * Maximum time in milliseconds to wait between SSE events before considering + * the stream stalled and aborting. + */ + idleTimeout?: number | undefined; +}; + type Bytes = string | ArrayBuffer | Uint8Array | null | undefined; export type ServerSentEvent = { @@ -22,23 +30,28 @@ export type ServerSentEvent = { export class Stream implements AsyncIterable { controller: AbortController; #client: BaseAnthropic | undefined; + #options: StreamOptions | undefined; constructor( private iterator: () => AsyncIterator, controller: AbortController, client?: BaseAnthropic, + options?: StreamOptions, ) { this.controller = controller; this.#client = client; + this.#options = options; } static fromSSEResponse( response: Response, controller: AbortController, client?: BaseAnthropic, + options?: StreamOptions, ): Stream { let consumed = false; const logger = client ? loggerFor(client) : console; + const idleTimeout = options?.idleTimeout ?? client?.idleTimeout; async function* iterator(): AsyncIterator { if (consumed) { @@ -47,7 +60,7 @@ export class Stream implements AsyncIterable { consumed = true; let done = false; try { - for await (const sse of _iterSSEMessages(response, controller)) { + for await (const sse of _iterSSEMessages(response, controller, { idleTimeout })) { if (sse.event === 'completion') { try { yield JSON.parse(sse.data); @@ -94,7 +107,7 @@ export class Stream implements AsyncIterable { } } - return new Stream(iterator, controller, client); + return new Stream(iterator, controller, client, options); } /** @@ -175,8 +188,8 @@ export class Stream implements AsyncIterable { }; return [ - new Stream(() => teeIterator(left), this.controller, this.#client), - new Stream(() => teeIterator(right), this.controller, this.#client), + new Stream(() => teeIterator(left), this.controller, this.#client, this.#options), + new Stream(() => teeIterator(right), this.controller, this.#client, this.#options), ]; } @@ -215,6 +228,7 @@ export class Stream implements AsyncIterable { export async function* _iterSSEMessages( response: Response, controller: AbortController, + options?: StreamOptions, ): AsyncGenerator { if (!response.body) { controller.abort(); @@ -229,17 +243,67 @@ export async function* _iterSSEMessages( throw new AnthropicError(`Attempted to iterate over a response with no body`); } + const idleTimeout = options?.idleTimeout; + let lastEventTime = Date.now(); + let eventCount = 0; + const sseDecoder = new SSEDecoder(); const lineDecoder = new LineDecoder(); const iter = ReadableStreamToAsyncIterable(response.body); - for await (const sseChunk of iterSSEChunks(iter)) { - for (const line of lineDecoder.decode(sseChunk)) { - const sse = sseDecoder.decode(line); - if (sse) yield sse; + const chunksIter = iterSSEChunks(iter)[Symbol.asyncIterator](); + + while (true) { + // Create a promise that rejects after the idle timeout + let timeoutId: ReturnType | undefined; + const timeoutPromise = + idleTimeout ? + new Promise((_, reject) => { + timeoutId = setTimeout(() => { + const error = new StreamIdleTimeoutError({ + idleTimeoutMs: idleTimeout, + lastEventTime: new Date(lastEventTime), + eventCount, + }); + controller.abort(); + reject(error); + }, idleTimeout); + }) + : null; + + try { + // Race between the next chunk and the timeout + const nextChunk = chunksIter.next(); + const result = timeoutPromise ? await Promise.race([nextChunk, timeoutPromise]) : await nextChunk; + + // Clear the timeout since we got data + if (timeoutId) { + clearTimeout(timeoutId); + } + + if (result.done) { + break; + } + + // Update tracking for idle timeout + lastEventTime = Date.now(); + eventCount++; + + // Process the chunk + for (const line of lineDecoder.decode(result.value)) { + const sse = sseDecoder.decode(line); + if (sse) yield sse; + } + } catch (e) { + // Clear timeout on error + if (timeoutId) { + clearTimeout(timeoutId); + } + throw e; } } + // Flush any remaining data for (const line of lineDecoder.flush()) { const sse = sseDecoder.decode(line); if (sse) yield sse; diff --git a/src/index.ts b/src/index.ts index 8594f41b..119b33ae 100644 --- a/src/index.ts +++ b/src/index.ts @@ -12,6 +12,7 @@ export { APIConnectionError, APIConnectionTimeoutError, APIUserAbortError, + StreamIdleTimeoutError, NotFoundError, ConflictError, RateLimitError, diff --git a/src/internal/parse.ts b/src/internal/parse.ts index f5b8e527..dfb80e6b 100644 --- a/src/internal/parse.ts +++ b/src/internal/parse.ts @@ -27,11 +27,21 @@ export async function defaultParseResponse( // Note: there is an invariant here that isn't represented in the type system // that if you set `stream: true` the response type must also be `Stream` + // Pass through idle timeout from request options or client defaults + const streamOptions = { + idleTimeout: props.options.idleTimeout ?? client.idleTimeout, + }; + if (props.options.__streamClass) { - return props.options.__streamClass.fromSSEResponse(response, props.controller) as any; + return props.options.__streamClass.fromSSEResponse( + response, + props.controller, + client, + streamOptions, + ) as any; } - return Stream.fromSSEResponse(response, props.controller) as any; + return Stream.fromSSEResponse(response, props.controller, client, streamOptions) as any; } // fetch refuses to read the body when the status code is 204. diff --git a/src/internal/request-options.ts b/src/internal/request-options.ts index 56765e5a..f10a78ea 100644 --- a/src/internal/request-options.ts +++ b/src/internal/request-options.ts @@ -55,6 +55,19 @@ export type RequestOptions = { */ timeout?: number; + /** + * The maximum amount of time (in milliseconds) to wait between SSE events during streaming. + * If no event is received within this period, the stream will be aborted with a + * `StreamIdleTimeoutError`. + * + * This is distinct from `timeout` which applies to the overall request. `idleTimeout` + * specifically detects stalled streams where the connection is alive but no data is flowing. + * + * @unit milliseconds + * @default undefined (no idle timeout) + */ + idleTimeout?: number; + /** * Additional `RequestInit` options to be passed to the underlying `fetch` call. * These options will be merged with the client's default fetch options. diff --git a/tests/api-resources/MessageStream.test.ts b/tests/api-resources/MessageStream.test.ts index a7ec7379..162a1df4 100644 --- a/tests/api-resources/MessageStream.test.ts +++ b/tests/api-resources/MessageStream.test.ts @@ -1,4 +1,4 @@ -import Anthropic, { APIConnectionError, APIUserAbortError } from '@anthropic-ai/sdk'; +import Anthropic, { APIConnectionError, APIUserAbortError, StreamIdleTimeoutError } from '@anthropic-ai/sdk'; import { Message, MessageStreamEvent } from '@anthropic-ai/sdk/resources/messages'; import { mockFetch } from '../lib/mock-fetch'; import { loadFixture, parseSSEFixture } from '../lib/sse-helpers'; @@ -251,4 +251,146 @@ describe('MessageStream class', () => { }); await expect(stream).rejects.toThrow(APIConnectionError); }); + + describe('idle timeout integration', () => { + it('throws StreamIdleTimeoutError when stream stalls via request options', async () => { + const { fetch, handleRequest } = mockFetch(); + const anthropic = new Anthropic({ apiKey: 'test-key', fetch }); + + // Create a stream that sends one event then stalls + handleRequest(async () => { + const { PassThrough } = await import('stream'); + const stream = new PassThrough(); + + // Send one event then stall + stream.write('event: message_start\n'); + stream.write( + 'data: {"type":"message_start","message":{"id":"msg_1","type":"message","role":"assistant","content":[],"model":"claude","stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":0,"output_tokens":0}}}\n\n', + ); + // Don't end the stream - it will stall + + return new Response(stream, { + headers: { 'Content-Type': 'text/event-stream' }, + }); + }); + + const stream = anthropic.messages.stream( + { + max_tokens: 1024, + model: 'claude-opus-4-20250514', + messages: [{ role: 'user', content: 'Hello' }], + }, + { idleTimeout: 50 }, // 50ms timeout + ); + + await expect(stream.finalMessage()).rejects.toThrow(StreamIdleTimeoutError); + }); + + it('throws StreamIdleTimeoutError when stream stalls via client default', async () => { + const { fetch, handleRequest } = mockFetch(); + const anthropic = new Anthropic({ + apiKey: 'test-key', + fetch, + idleTimeout: 50, // Default idle timeout for all requests + }); + + // Create a stream that sends one event then stalls + handleRequest(async () => { + const { PassThrough } = await import('stream'); + const stream = new PassThrough(); + + stream.write('event: message_start\n'); + stream.write( + 'data: {"type":"message_start","message":{"id":"msg_1","type":"message","role":"assistant","content":[],"model":"claude","stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":0,"output_tokens":0}}}\n\n', + ); + // Stall + + return new Response(stream, { + headers: { 'Content-Type': 'text/event-stream' }, + }); + }); + + const stream = anthropic.messages.stream({ + max_tokens: 1024, + model: 'claude-opus-4-20250514', + messages: [{ role: 'user', content: 'Hello' }], + }); + + await expect(stream.finalMessage()).rejects.toThrow(StreamIdleTimeoutError); + }); + + it('request idleTimeout overrides client default', async () => { + const { fetch, handleRequest } = mockFetch(); + const anthropic = new Anthropic({ + apiKey: 'test-key', + fetch, + idleTimeout: 10, // Very short default that would timeout + }); + + // Create a slow but completing stream + handleRequest(async () => { + const { PassThrough } = await import('stream'); + const stream = new PassThrough(); + + (async () => { + stream.write('event: message_start\n'); + stream.write( + 'data: {"type":"message_start","message":{"id":"msg_1","type":"message","role":"assistant","content":[],"model":"claude","stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":0,"output_tokens":0}}}\n\n', + ); + await new Promise((r) => setTimeout(r, 30)); // 30ms delay + stream.write('event: message_stop\n'); + stream.write('data: {"type":"message_stop"}\n\n'); + stream.end(); + })(); + + return new Response(stream, { + headers: { 'Content-Type': 'text/event-stream' }, + }); + }); + + // Override with longer timeout + const stream = anthropic.messages.stream( + { + max_tokens: 1024, + model: 'claude-opus-4-20250514', + messages: [{ role: 'user', content: 'Hello' }], + }, + { idleTimeout: 100 }, // Override with longer timeout + ); + + // Should complete successfully despite client's short default + const events: MessageStreamEvent[] = []; + for await (const event of stream) { + events.push(event); + } + expect(events.length).toBe(2); + }); + + it('stream completes normally when data arrives before timeout', async () => { + const { fetch, handleStreamEvents } = mockFetch(); + const anthropic = new Anthropic({ + apiKey: 'test-key', + fetch, + idleTimeout: 1000, // Long timeout + }); + + const fixtureContent = loadFixture('basic_response.txt'); + const streamEvents = await parseSSEFixture(fixtureContent); + handleStreamEvents(streamEvents); + + const stream = anthropic.messages.stream({ + max_tokens: 1024, + model: 'claude-opus-4-20250514', + messages: [{ role: 'user', content: 'Say hello there!' }], + }); + + const events: MessageStreamEvent[] = []; + for await (const event of stream) { + events.push(event); + } + + const message = await stream.finalMessage(); + assertBasicResponse(events, message); + }); + }); }); diff --git a/tests/streaming.test.ts b/tests/streaming.test.ts index 0977884e..24dbcbbf 100644 --- a/tests/streaming.test.ts +++ b/tests/streaming.test.ts @@ -1,6 +1,6 @@ import assert from 'assert'; import { Stream, _iterSSEMessages } from '@anthropic-ai/sdk/core/streaming'; -import { APIError } from '@anthropic-ai/sdk/core/error'; +import { APIError, StreamIdleTimeoutError } from '@anthropic-ai/sdk/core/error'; import { ReadableStreamFrom } from '@anthropic-ai/sdk/internal/shims'; describe('streaming decoding', () => { @@ -243,3 +243,386 @@ test('error handling', async () => { ); await err.toBeInstanceOf(APIError); }); + +describe('idle timeout', () => { + test('throws StreamIdleTimeoutError when stream stalls', async () => { + // Create a stream that sends one event then stalls forever + async function* stalledBody(): AsyncGenerator { + yield Buffer.from('event: message_start\n'); + yield Buffer.from('data: {"type":"message_start"}\n'); + yield Buffer.from('\n'); + // Stream stalls here - never sends more data + await new Promise(() => {}); // Never resolves + } + + const controller = new AbortController(); + const stream = _iterSSEMessages(new Response(ReadableStreamFrom(stalledBody())), controller, { + idleTimeout: 50, // 50ms timeout for test speed + })[Symbol.asyncIterator](); + + // First event should come through + const event = await stream.next(); + assert(event.value); + expect(event.value.event).toEqual('message_start'); + + // Second call should timeout + const startTime = Date.now(); + await expect(stream.next()).rejects.toThrow(StreamIdleTimeoutError); + const elapsed = Date.now() - startTime; + + // Should have waited approximately the idle timeout + expect(elapsed).toBeGreaterThanOrEqual(45); // Allow some tolerance + expect(elapsed).toBeLessThan(200); // But not too long + }); + + test('timeout resets on each chunk received', async () => { + // Create a stream that sends events slowly but consistently + async function* slowBody(): AsyncGenerator { + yield Buffer.from('event: message_start\n'); + yield Buffer.from('data: {"type":"message_start"}\n'); + yield Buffer.from('\n'); + await new Promise((resolve) => setTimeout(resolve, 30)); + yield Buffer.from('event: content_block_start\n'); + yield Buffer.from('data: {"type":"content_block_start"}\n'); + yield Buffer.from('\n'); + await new Promise((resolve) => setTimeout(resolve, 30)); + yield Buffer.from('event: message_stop\n'); + yield Buffer.from('data: {"type":"message_stop"}\n'); + yield Buffer.from('\n'); + } + + const controller = new AbortController(); + const stream = _iterSSEMessages(new Response(ReadableStreamFrom(slowBody())), controller, { + idleTimeout: 100, // 100ms timeout - longer than each individual delay + })[Symbol.asyncIterator](); + + // All events should come through since we reset timeout on each + let event = await stream.next(); + assert(event.value); + expect(event.value.event).toEqual('message_start'); + + event = await stream.next(); + assert(event.value); + expect(event.value.event).toEqual('content_block_start'); + + event = await stream.next(); + assert(event.value); + expect(event.value.event).toEqual('message_stop'); + + event = await stream.next(); + expect(event.done).toBeTruthy(); + }); + + test('StreamIdleTimeoutError contains diagnostic information', async () => { + async function* stalledBody(): AsyncGenerator { + yield Buffer.from('event: message_start\n'); + yield Buffer.from('data: {"type":"message_start"}\n'); + yield Buffer.from('\n'); + yield Buffer.from('event: content_block_start\n'); + yield Buffer.from('data: {"type":"content_block_start"}\n'); + yield Buffer.from('\n'); + // Stall after 2 events + await new Promise(() => {}); + } + + const controller = new AbortController(); + const stream = _iterSSEMessages(new Response(ReadableStreamFrom(stalledBody())), controller, { + idleTimeout: 50, + })[Symbol.asyncIterator](); + + // Consume the two events + await stream.next(); + await stream.next(); + + // Third call should timeout + try { + await stream.next(); + fail('Expected StreamIdleTimeoutError'); + } catch (err) { + expect(err).toBeInstanceOf(StreamIdleTimeoutError); + const timeoutErr = err as StreamIdleTimeoutError; + expect(timeoutErr.idleTimeoutMs).toBe(50); + expect(timeoutErr.eventCount).toBe(2); // We received 2 chunks before timeout + expect(timeoutErr.lastEventTime).toBeInstanceOf(Date); + expect(timeoutErr.message).toContain('50ms'); + } + }); + + test('no timeout when idleTimeout is not set', async () => { + // This test verifies the feature doesn't break existing behavior + async function* body(): AsyncGenerator { + yield Buffer.from('event: completion\n'); + yield Buffer.from('data: {"foo":true}\n'); + yield Buffer.from('\n'); + } + + const controller = new AbortController(); + // No idleTimeout option passed + const stream = _iterSSEMessages(new Response(ReadableStreamFrom(body())), controller)[ + Symbol.asyncIterator + ](); + + const event = await stream.next(); + assert(event.value); + expect(JSON.parse(event.value.data)).toEqual({ foo: true }); + + const done = await stream.next(); + expect(done.done).toBeTruthy(); + }); + + test('Stream.fromSSEResponse passes idleTimeout through', async () => { + async function* stalledBody(): AsyncGenerator { + yield Buffer.from('event: message_start\n'); + yield Buffer.from( + 'data: {"type":"message_start","message":{"id":"msg_1","type":"message","role":"assistant","content":[],"model":"claude","stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":0,"output_tokens":0}}}\n', + ); + yield Buffer.from('\n'); + await new Promise(() => {}); // Stall + } + + const controller = new AbortController(); + const stream = Stream.fromSSEResponse( + new Response(ReadableStreamFrom(stalledBody())), + controller, + undefined, + { idleTimeout: 50 }, + ); + + const iterator = stream[Symbol.asyncIterator](); + // First event comes through + const first = await iterator.next(); + expect(first.done).toBeFalsy(); + + // Second call should timeout + await expect(iterator.next()).rejects.toThrow(StreamIdleTimeoutError); + }); + + test('cleans up timer on normal stream completion', async () => { + const clearTimeoutSpy = jest.spyOn(global, 'clearTimeout'); + + async function* body(): AsyncGenerator { + yield Buffer.from('event: message_start\n'); + yield Buffer.from('data: {"type":"message_start"}\n'); + yield Buffer.from('\n'); + yield Buffer.from('event: message_stop\n'); + yield Buffer.from('data: {"type":"message_stop"}\n'); + yield Buffer.from('\n'); + } + + const controller = new AbortController(); + const stream = _iterSSEMessages(new Response(ReadableStreamFrom(body())), controller, { + idleTimeout: 1000, + }); + + const events = []; + for await (const event of stream) { + events.push(event); + } + + expect(events.length).toBe(2); + // Timer should have been cleared (once per chunk + final) + expect(clearTimeoutSpy).toHaveBeenCalled(); + clearTimeoutSpy.mockRestore(); + }); + + test('cleans up timer when user breaks out of loop', async () => { + const clearTimeoutSpy = jest.spyOn(global, 'clearTimeout'); + + async function* body(): AsyncGenerator { + yield Buffer.from('event: event1\n\n'); + yield Buffer.from('event: event2\n\n'); + yield Buffer.from('event: event3\n\n'); + // More events that won't be consumed + yield Buffer.from('event: event4\n\n'); + } + + const controller = new AbortController(); + const stream = _iterSSEMessages(new Response(ReadableStreamFrom(body())), controller, { + idleTimeout: 1000, + }); + + let count = 0; + for await (const _event of stream) { + count++; + if (count >= 2) break; // Break early + } + + expect(count).toBe(2); + // Allow microtask queue to flush + await new Promise((resolve) => setTimeout(resolve, 10)); + expect(clearTimeoutSpy).toHaveBeenCalled(); + clearTimeoutSpy.mockRestore(); + }); + + test('cleans up timer on manual abort', async () => { + const clearTimeoutSpy = jest.spyOn(global, 'clearTimeout'); + + async function* body(): AsyncGenerator { + yield Buffer.from('event: message_start\n\n'); + await new Promise((resolve) => setTimeout(resolve, 100)); + yield Buffer.from('event: message_stop\n\n'); + } + + const controller = new AbortController(); + const stream = _iterSSEMessages(new Response(ReadableStreamFrom(body())), controller, { + idleTimeout: 1000, + }); + + const iterator = stream[Symbol.asyncIterator](); + + // Get first event + await iterator.next(); + + // Abort while waiting for second + setTimeout(() => controller.abort(), 20); + + // The next call should handle the abort + try { + await iterator.next(); + } catch (e) { + // Abort error is expected + } + + expect(clearTimeoutSpy).toHaveBeenCalled(); + clearTimeoutSpy.mockRestore(); + }); + + test('handles very short idleTimeout', async () => { + // Test that a very short timeout (1ms) still works correctly + // Note: setTimeout(fn, 0) behavior allows synchronously-available data through, + // so we use 1ms and add a small delay to ensure timeout fires + async function* body(): AsyncGenerator { + yield Buffer.from('event: message_start\n\n'); + await new Promise((resolve) => setTimeout(resolve, 20)); // Delay longer than timeout + yield Buffer.from('event: message_stop\n\n'); + } + + const controller = new AbortController(); + const stream = _iterSSEMessages(new Response(ReadableStreamFrom(body())), controller, { + idleTimeout: 1, // Very short timeout + }); + + const events = []; + try { + for await (const event of stream) { + events.push(event); + } + fail('Should have thrown'); + } catch (err) { + expect(err).toBeInstanceOf(StreamIdleTimeoutError); + } + + // First event should have come through before timeout + expect(events.length).toBe(1); + }); + + test('works correctly with multiple sequential streams', async () => { + // Ensure no cross-contamination between streams + async function* fastBody(): AsyncGenerator { + yield Buffer.from('event: fast\n\n'); + } + + async function* slowBody(): AsyncGenerator { + yield Buffer.from('event: slow\n\n'); + await new Promise((resolve) => setTimeout(resolve, 30)); + yield Buffer.from('event: slow2\n\n'); + } + + // First stream with short timeout + const stream1 = _iterSSEMessages(new Response(ReadableStreamFrom(fastBody())), new AbortController(), { + idleTimeout: 100, + }); + + const events1 = []; + for await (const event of stream1) { + events1.push(event); + } + expect(events1.length).toBe(1); + + // Second stream with different timeout - should not be affected by first + const stream2 = _iterSSEMessages(new Response(ReadableStreamFrom(slowBody())), new AbortController(), { + idleTimeout: 100, + }); + + const events2 = []; + for await (const event of stream2) { + events2.push(event); + } + expect(events2.length).toBe(2); + }); + + test('timeout error includes accurate event count across multiple chunks', async () => { + async function* body(): AsyncGenerator { + // Send 5 chunks before stalling + for (let i = 0; i < 5; i++) { + yield Buffer.from(`event: event${i}\n\n`); + } + await new Promise(() => {}); // Stall + } + + const controller = new AbortController(); + const stream = _iterSSEMessages(new Response(ReadableStreamFrom(body())), controller, { + idleTimeout: 50, + }); + + const events = []; + try { + for await (const event of stream) { + events.push(event); + } + fail('Should have thrown'); + } catch (err) { + expect(err).toBeInstanceOf(StreamIdleTimeoutError); + const timeoutErr = err as StreamIdleTimeoutError; + // We received 5 chunks before timeout + expect(timeoutErr.eventCount).toBe(5); + } + + expect(events.length).toBe(5); + }); + + test('does not leak timers when error occurs during chunk processing', async () => { + const setTimeoutSpy = jest.spyOn(global, 'setTimeout'); + const clearTimeoutSpy = jest.spyOn(global, 'clearTimeout'); + + async function* body(): AsyncGenerator { + yield Buffer.from('event: good\n\n'); + yield Buffer.from('event: bad\n\n'); + } + + const controller = new AbortController(); + const stream = _iterSSEMessages(new Response(ReadableStreamFrom(body())), controller, { + idleTimeout: 1000, + }); + + let errorThrown = false; + try { + let count = 0; + for await (const _event of stream) { + count++; + if (count === 2) { + throw new Error('Processing error'); + } + } + } catch (e) { + if ((e as Error).message === 'Processing error') { + errorThrown = true; + } else { + throw e; + } + } + + expect(errorThrown).toBe(true); + + // Get counts before restore + const setCount = setTimeoutSpy.mock.calls.length; + const clearCount = clearTimeoutSpy.mock.calls.length; + + setTimeoutSpy.mockRestore(); + clearTimeoutSpy.mockRestore(); + + // Each setTimeout should have a corresponding clearTimeout + // (may have more clears than sets due to the error path) + expect(clearCount).toBeGreaterThanOrEqual(setCount - 1); + }); +});