diff --git a/src/converters/streaming.ts b/src/converters/streaming.ts index 78445ab..39729a3 100644 --- a/src/converters/streaming.ts +++ b/src/converters/streaming.ts @@ -3,6 +3,7 @@ import { FastifyReply } from 'fastify'; import { Stream } from 'openai/streaming'; import { AnthropicMessageResponse, + AnthropicStreamEvent, AnthropicUsage, } from '../types/anthropic'; import { OpenAIStreamChunk, OpenAIStreamToolCall } from '../types/openai'; @@ -216,7 +217,7 @@ function processToolCallDelta( } function sendMessageStart(state: StreamingState, raw: any): void { - const event = { + const event: AnthropicStreamEvent = { type: 'message_start', message: { id: state.messageId, @@ -229,10 +230,12 @@ function sendMessageStart(state: StreamingState, raw: any): void { usage: { input_tokens: state.inputTokens, output_tokens: state.outputTokens, - cache_read_input_tokens: state.cachedInputTokens, - }, + } as AnthropicUsage, }, }; + if (state.cachedInputTokens > 0) { + event.message.usage.cache_read_input_tokens = state.cachedInputTokens; + } sendSSE(event, raw); } @@ -256,7 +259,7 @@ function sendContentBlockStart( }; } - const event = { + const event: AnthropicStreamEvent = { type: 'content_block_start', index, content_block: contentBlock, @@ -265,7 +268,7 @@ function sendContentBlockStart( } function sendTextDelta(index: number, text: string, raw: any): void { - const event = { + const event: AnthropicStreamEvent = { type: 'content_block_delta', index, delta: { @@ -277,7 +280,7 @@ function sendTextDelta(index: number, text: string, raw: any): void { } function sendInputJsonDelta(index: number, partialJson: string, raw: any): void { - const event = { + const event: AnthropicStreamEvent = { type: 'content_block_delta', index, delta: { @@ -289,7 +292,7 @@ function sendInputJsonDelta(index: number, partialJson: string, raw: any): void } function sendContentBlockStop(index: number, raw: any): void { - const event = { + const event: AnthropicStreamEvent = { type: 'content_block_stop', index, }; @@ -313,21 +316,23 @@ function finishStream(state: StreamingState, raw: any): void { }); // Send message_delta - const deltaEvent = { + const usage: any = { + output_tokens: state.outputTokens, + }; + if (state.cachedInputTokens > 0) { + usage.cache_read_input_tokens = state.cachedInputTokens; + } + const deltaEvent: AnthropicStreamEvent = { type: 'message_delta', delta: { - stop_reason: stopReason, - stop_sequence: null, - }, - usage: { - output_tokens: state.outputTokens, - cache_read_input_tokens: state.cachedInputTokens, + stop_reason: stopReason as 'end_turn' | 'max_tokens' | 'stop_sequence' | 'tool_use', }, + usage, }; sendSSE(deltaEvent, raw); // Send message_stop - sendSSE({ type: 'message_stop' }, raw); + sendSSE({ type: 'message_stop' } as AnthropicStreamEvent, raw); raw.end(); } @@ -341,7 +346,7 @@ function sendErrorEvent(error: Error, state: StreamingState, raw: any): void { streaming: true }); - const event = { + const event: AnthropicStreamEvent = { type: 'error', error: { type: 'api_error', @@ -352,7 +357,7 @@ function sendErrorEvent(error: Error, state: StreamingState, raw: any): void { raw.end(); } -function sendSSE(data: any, raw: any): void { +function sendSSE(data: AnthropicStreamEvent, raw: any): void { raw.write(`event: ${data.type}\n`); raw.write(`data: ${JSON.stringify(data)}\n\n`); } diff --git a/src/converters/xmlStreaming.ts b/src/converters/xmlStreaming.ts index 8646606..68f860d 100644 --- a/src/converters/xmlStreaming.ts +++ b/src/converters/xmlStreaming.ts @@ -3,6 +3,7 @@ import { FastifyReply } from 'fastify'; import { Stream } from 'openai/streaming'; +import { AnthropicStreamEvent, AnthropicUsage } from '../types/anthropic'; import { OpenAIStreamChunk } from '../types/openai'; import { generateToolUseId } from './tools'; import { recordUsage } from '../utils/tokenUsage'; @@ -175,7 +176,7 @@ function cleanToolArgs(args: string): string { function emitTextBlock(text: string, state: BufferedState, raw: any): void { // Start text block - const startEvent = { + const startEvent: AnthropicStreamEvent = { type: 'content_block_start', index: state.contentBlockIndex, content_block: { type: 'text', text: '' }, @@ -183,7 +184,7 @@ function emitTextBlock(text: string, state: BufferedState, raw: any): void { sendSSE(startEvent, raw); // Send text delta - const deltaEvent = { + const deltaEvent: AnthropicStreamEvent = { type: 'content_block_delta', index: state.contentBlockIndex, delta: { type: 'text_delta', text }, @@ -191,7 +192,7 @@ function emitTextBlock(text: string, state: BufferedState, raw: any): void { sendSSE(deltaEvent, raw); // Stop text block - const stopEvent = { + const stopEvent: AnthropicStreamEvent = { type: 'content_block_stop', index: state.contentBlockIndex, }; @@ -204,7 +205,7 @@ function emitToolUseBlock(toolName: string, args: string, state: BufferedState, const toolId = generateToolUseId(); // Start tool_use block - const startEvent = { + const startEvent: AnthropicStreamEvent = { type: 'content_block_start', index: state.contentBlockIndex, content_block: { @@ -217,7 +218,7 @@ function emitToolUseBlock(toolName: string, args: string, state: BufferedState, sendSSE(startEvent, raw); // Send complete input as single delta - const deltaEvent = { + const deltaEvent: AnthropicStreamEvent = { type: 'content_block_delta', index: state.contentBlockIndex, delta: { @@ -228,7 +229,7 @@ function emitToolUseBlock(toolName: string, args: string, state: BufferedState, sendSSE(deltaEvent, raw); // Stop tool_use block - const stopEvent = { + const stopEvent: AnthropicStreamEvent = { type: 'content_block_stop', index: state.contentBlockIndex, }; @@ -239,7 +240,7 @@ function emitToolUseBlock(toolName: string, args: string, state: BufferedState, } function sendMessageStart(state: BufferedState, raw: any): void { - const event = { + const event: AnthropicStreamEvent = { type: 'message_start', message: { id: state.messageId, @@ -252,10 +253,12 @@ function sendMessageStart(state: BufferedState, raw: any): void { usage: { input_tokens: state.inputTokens, output_tokens: state.outputTokens, - cache_read_input_tokens: state.cachedInputTokens, - }, + } as AnthropicUsage, }, }; + if (state.cachedInputTokens > 0) { + event.message.usage.cache_read_input_tokens = state.cachedInputTokens; + } sendSSE(event, raw); } @@ -275,21 +278,23 @@ function finishStream(state: BufferedState, raw: any): void { }); // Send message_delta - const deltaEvent = { + const usage: any = { + output_tokens: state.outputTokens, + }; + if (state.cachedInputTokens > 0) { + usage.cache_read_input_tokens = state.cachedInputTokens; + } + const deltaEvent: AnthropicStreamEvent = { type: 'message_delta', delta: { - stop_reason: stopReason, - stop_sequence: null, - }, - usage: { - output_tokens: state.outputTokens, - cache_read_input_tokens: state.cachedInputTokens, + stop_reason: stopReason as 'end_turn' | 'max_tokens' | 'stop_sequence' | 'tool_use', }, + usage, }; sendSSE(deltaEvent, raw); // Send message_stop - sendSSE({ type: 'message_stop' }, raw); + sendSSE({ type: 'message_stop' } as AnthropicStreamEvent, raw); raw.end(); } @@ -303,7 +308,7 @@ function sendErrorEvent(error: Error, state: BufferedState, raw: any): void { streaming: true }); - const event = { + const event: AnthropicStreamEvent = { type: 'error', error: { type: 'api_error', @@ -314,7 +319,7 @@ function sendErrorEvent(error: Error, state: BufferedState, raw: any): void { raw.end(); } -function sendSSE(data: any, raw: any): void { +function sendSSE(data: AnthropicStreamEvent, raw: any): void { raw.write(`event: ${data.type}\n`); raw.write(`data: ${JSON.stringify(data)}\n\n`); }