Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,22 @@ export interface ClientOptions {
* @unit milliseconds
*/
timeout?: number | undefined;

/**
* The maximum amount of time (in milliseconds) to wait between SSE events during streaming.
* If no event is received within this period, the stream will be aborted with a
* `StreamIdleTimeoutError`.
*
* This is distinct from `timeout` which applies to the overall request. `idleTimeout`
* specifically detects stalled streams where the connection is alive but no data is flowing.
*
* Can be overridden per-request via `RequestOptions.idleTimeout`.
*
* @unit milliseconds
* @default undefined (no idle timeout)
*/
idleTimeout?: number | undefined;

/**
* Additional `RequestInit` options to be passed to `fetch` calls.
* Properties will be overridden by per-request `fetchOptions`.
Expand Down Expand Up @@ -270,6 +286,7 @@ export class BaseAnthropic {
baseURL: string;
maxRetries: number;
timeout: number;
idleTimeout: number | undefined;
logger: Logger;
logLevel: LogLevel | undefined;
fetchOptions: MergedRequestInit | undefined;
Expand Down Expand Up @@ -314,6 +331,7 @@ export class BaseAnthropic {

this.baseURL = options.baseURL!;
this.timeout = options.timeout ?? BaseAnthropic.DEFAULT_TIMEOUT /* 10 minutes */;
this.idleTimeout = options.idleTimeout;
this.logger = options.logger ?? console;
const defaultLogLevel = 'warn';
// Set default logLevel early so that we can log a warning in parseLogLevel.
Expand Down Expand Up @@ -342,6 +360,7 @@ export class BaseAnthropic {
baseURL: this.baseURL,
maxRetries: this.maxRetries,
timeout: this.timeout,
idleTimeout: this.idleTimeout,
logger: this.logger,
logLevel: this.logLevel,
fetch: this.fetch,
Expand Down Expand Up @@ -954,6 +973,7 @@ export class BaseAnthropic {
static APIConnectionError = Errors.APIConnectionError;
static APIConnectionTimeoutError = Errors.APIConnectionTimeoutError;
static APIUserAbortError = Errors.APIUserAbortError;
static StreamIdleTimeoutError = Errors.StreamIdleTimeoutError;
static NotFoundError = Errors.NotFoundError;
static ConflictError = Errors.ConflictError;
static RateLimitError = Errors.RateLimitError;
Expand Down
33 changes: 33 additions & 0 deletions src/core/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,39 @@ export class APIConnectionTimeoutError extends APIConnectionError {
}
}

/**
* Error thrown when a streaming response stalls - no SSE events received
* within the configured idle timeout period.
*/
export class StreamIdleTimeoutError extends APIConnectionError {
/** The configured idle timeout in milliseconds */
readonly idleTimeoutMs: number;
/** When the last SSE event was received */
readonly lastEventTime: Date;
/** Number of events received before the timeout */
readonly eventCount: number;

constructor({
idleTimeoutMs,
lastEventTime,
eventCount,
message,
}: {
idleTimeoutMs: number;
lastEventTime: Date;
eventCount: number;
message?: string;
}) {
super({
message:
message ?? `Stream stalled: no SSE event received for ${idleTimeoutMs}ms after ${eventCount} events`,
});
this.idleTimeoutMs = idleTimeoutMs;
this.lastEventTime = lastEventTime;
this.eventCount = eventCount;
}
}

export class BadRequestError extends APIError<400, Headers> {}

export class AuthenticationError extends APIError<401, Headers> {}
Expand Down
82 changes: 73 additions & 9 deletions src/core/streaming.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { AnthropicError } from './error';
import { AnthropicError, StreamIdleTimeoutError } from './error';
import { type ReadableStream } from '../internal/shim-types';
import { makeReadableStream } from '../internal/shims';
import { findDoubleNewlineIndex, LineDecoder } from '../internal/decoders/line';
Expand All @@ -11,6 +11,14 @@ import type { BaseAnthropic } from '../client';

import { APIError } from './error';

export type StreamOptions = {
/**
* Maximum time in milliseconds to wait between SSE events before considering
* the stream stalled and aborting.
*/
idleTimeout?: number | undefined;
};

type Bytes = string | ArrayBuffer | Uint8Array | null | undefined;

export type ServerSentEvent = {
Expand All @@ -22,23 +30,28 @@ export type ServerSentEvent = {
export class Stream<Item> implements AsyncIterable<Item> {
controller: AbortController;
#client: BaseAnthropic | undefined;
#options: StreamOptions | undefined;

constructor(
private iterator: () => AsyncIterator<Item>,
controller: AbortController,
client?: BaseAnthropic,
options?: StreamOptions,
) {
this.controller = controller;
this.#client = client;
this.#options = options;
}

static fromSSEResponse<Item>(
response: Response,
controller: AbortController,
client?: BaseAnthropic,
options?: StreamOptions,
): Stream<Item> {
let consumed = false;
const logger = client ? loggerFor(client) : console;
const idleTimeout = options?.idleTimeout ?? client?.idleTimeout;

async function* iterator(): AsyncIterator<Item, any, undefined> {
if (consumed) {
Expand All @@ -47,7 +60,7 @@ export class Stream<Item> implements AsyncIterable<Item> {
consumed = true;
let done = false;
try {
for await (const sse of _iterSSEMessages(response, controller)) {
for await (const sse of _iterSSEMessages(response, controller, { idleTimeout })) {
if (sse.event === 'completion') {
try {
yield JSON.parse(sse.data);
Expand Down Expand Up @@ -94,7 +107,7 @@ export class Stream<Item> implements AsyncIterable<Item> {
}
}

return new Stream(iterator, controller, client);
return new Stream(iterator, controller, client, options);
}

/**
Expand Down Expand Up @@ -175,8 +188,8 @@ export class Stream<Item> implements AsyncIterable<Item> {
};

return [
new Stream(() => teeIterator(left), this.controller, this.#client),
new Stream(() => teeIterator(right), this.controller, this.#client),
new Stream(() => teeIterator(left), this.controller, this.#client, this.#options),
new Stream(() => teeIterator(right), this.controller, this.#client, this.#options),
];
}

Expand Down Expand Up @@ -215,6 +228,7 @@ export class Stream<Item> implements AsyncIterable<Item> {
export async function* _iterSSEMessages(
response: Response,
controller: AbortController,
options?: StreamOptions,
): AsyncGenerator<ServerSentEvent, void, unknown> {
if (!response.body) {
controller.abort();
Expand All @@ -229,17 +243,67 @@ export async function* _iterSSEMessages(
throw new AnthropicError(`Attempted to iterate over a response with no body`);
}

const idleTimeout = options?.idleTimeout;
let lastEventTime = Date.now();
let eventCount = 0;

const sseDecoder = new SSEDecoder();
const lineDecoder = new LineDecoder();

const iter = ReadableStreamToAsyncIterable<Bytes>(response.body);
for await (const sseChunk of iterSSEChunks(iter)) {
for (const line of lineDecoder.decode(sseChunk)) {
const sse = sseDecoder.decode(line);
if (sse) yield sse;
const chunksIter = iterSSEChunks(iter)[Symbol.asyncIterator]();

while (true) {
// Create a promise that rejects after the idle timeout
let timeoutId: ReturnType<typeof setTimeout> | undefined;
const timeoutPromise =
idleTimeout ?
new Promise<never>((_, reject) => {
timeoutId = setTimeout(() => {
const error = new StreamIdleTimeoutError({
idleTimeoutMs: idleTimeout,
lastEventTime: new Date(lastEventTime),
eventCount,
});
controller.abort();
reject(error);
}, idleTimeout);
})
: null;

try {
// Race between the next chunk and the timeout
const nextChunk = chunksIter.next();
const result = timeoutPromise ? await Promise.race([nextChunk, timeoutPromise]) : await nextChunk;

// Clear the timeout since we got data
if (timeoutId) {
clearTimeout(timeoutId);
}

if (result.done) {
break;
}

// Update tracking for idle timeout
lastEventTime = Date.now();
eventCount++;

// Process the chunk
for (const line of lineDecoder.decode(result.value)) {
const sse = sseDecoder.decode(line);
if (sse) yield sse;
}
} catch (e) {
// Clear timeout on error
if (timeoutId) {
clearTimeout(timeoutId);
}
throw e;
}
}

// Flush any remaining data
for (const line of lineDecoder.flush()) {
const sse = sseDecoder.decode(line);
if (sse) yield sse;
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export {
APIConnectionError,
APIConnectionTimeoutError,
APIUserAbortError,
StreamIdleTimeoutError,
NotFoundError,
ConflictError,
RateLimitError,
Expand Down
14 changes: 12 additions & 2 deletions src/internal/parse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,21 @@ export async function defaultParseResponse<T>(
// Note: there is an invariant here that isn't represented in the type system
// that if you set `stream: true` the response type must also be `Stream<T>`

// Pass through idle timeout from request options or client defaults
const streamOptions = {
idleTimeout: props.options.idleTimeout ?? client.idleTimeout,
};

if (props.options.__streamClass) {
return props.options.__streamClass.fromSSEResponse(response, props.controller) as any;
return props.options.__streamClass.fromSSEResponse(
response,
props.controller,
client,
streamOptions,
) as any;
}

return Stream.fromSSEResponse(response, props.controller) as any;
return Stream.fromSSEResponse(response, props.controller, client, streamOptions) as any;
}

// fetch refuses to read the body when the status code is 204.
Expand Down
13 changes: 13 additions & 0 deletions src/internal/request-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,19 @@ export type RequestOptions = {
*/
timeout?: number;

/**
* The maximum amount of time (in milliseconds) to wait between SSE events during streaming.
* If no event is received within this period, the stream will be aborted with a
* `StreamIdleTimeoutError`.
*
* This is distinct from `timeout` which applies to the overall request. `idleTimeout`
* specifically detects stalled streams where the connection is alive but no data is flowing.
*
* @unit milliseconds
* @default undefined (no idle timeout)
*/
idleTimeout?: number;

/**
* Additional `RequestInit` options to be passed to the underlying `fetch` call.
* These options will be merged with the client's default fetch options.
Expand Down
Loading