Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 22 additions & 17 deletions src/converters/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { FastifyReply } from 'fastify';
import { Stream } from 'openai/streaming';
import {
AnthropicMessageResponse,
AnthropicStreamEvent,
AnthropicUsage,
} from '../types/anthropic';
import { OpenAIStreamChunk, OpenAIStreamToolCall } from '../types/openai';
Expand Down Expand Up @@ -216,7 +217,7 @@ function processToolCallDelta(
}

function sendMessageStart(state: StreamingState, raw: any): void {
const event = {
const event: AnthropicStreamEvent = {
type: 'message_start',
message: {
id: state.messageId,
Expand All @@ -229,10 +230,12 @@ function sendMessageStart(state: StreamingState, raw: any): void {
usage: {
input_tokens: state.inputTokens,
output_tokens: state.outputTokens,
cache_read_input_tokens: state.cachedInputTokens,
},
} as AnthropicUsage,
},
};
if (state.cachedInputTokens > 0) {
event.message.usage.cache_read_input_tokens = state.cachedInputTokens;
}
sendSSE(event, raw);
}

Expand All @@ -256,7 +259,7 @@ function sendContentBlockStart(
};
}

const event = {
const event: AnthropicStreamEvent = {
type: 'content_block_start',
index,
content_block: contentBlock,
Expand All @@ -265,7 +268,7 @@ function sendContentBlockStart(
}

function sendTextDelta(index: number, text: string, raw: any): void {
const event = {
const event: AnthropicStreamEvent = {
type: 'content_block_delta',
index,
delta: {
Expand All @@ -277,7 +280,7 @@ function sendTextDelta(index: number, text: string, raw: any): void {
}

function sendInputJsonDelta(index: number, partialJson: string, raw: any): void {
const event = {
const event: AnthropicStreamEvent = {
type: 'content_block_delta',
index,
delta: {
Expand All @@ -289,7 +292,7 @@ function sendInputJsonDelta(index: number, partialJson: string, raw: any): void
}

function sendContentBlockStop(index: number, raw: any): void {
const event = {
const event: AnthropicStreamEvent = {
type: 'content_block_stop',
index,
};
Expand All @@ -313,21 +316,23 @@ function finishStream(state: StreamingState, raw: any): void {
});

// Send message_delta
const deltaEvent = {
const usage: any = {
output_tokens: state.outputTokens,
};
if (state.cachedInputTokens > 0) {
usage.cache_read_input_tokens = state.cachedInputTokens;
}
const deltaEvent: AnthropicStreamEvent = {
type: 'message_delta',
delta: {
stop_reason: stopReason,
stop_sequence: null,
},
usage: {
output_tokens: state.outputTokens,
cache_read_input_tokens: state.cachedInputTokens,
stop_reason: stopReason as 'end_turn' | 'max_tokens' | 'stop_sequence' | 'tool_use',
},
usage,
};
sendSSE(deltaEvent, raw);

// Send message_stop
sendSSE({ type: 'message_stop' }, raw);
sendSSE({ type: 'message_stop' } as AnthropicStreamEvent, raw);

raw.end();
}
Expand All @@ -341,7 +346,7 @@ function sendErrorEvent(error: Error, state: StreamingState, raw: any): void {
streaming: true
});

const event = {
const event: AnthropicStreamEvent = {
type: 'error',
error: {
type: 'api_error',
Expand All @@ -352,7 +357,7 @@ function sendErrorEvent(error: Error, state: StreamingState, raw: any): void {
raw.end();
}

function sendSSE(data: any, raw: any): void {
function sendSSE(data: AnthropicStreamEvent, raw: any): void {
raw.write(`event: ${data.type}\n`);
raw.write(`data: ${JSON.stringify(data)}\n\n`);
}
43 changes: 24 additions & 19 deletions src/converters/xmlStreaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import { FastifyReply } from 'fastify';
import { Stream } from 'openai/streaming';
import { AnthropicStreamEvent, AnthropicUsage } from '../types/anthropic';
import { OpenAIStreamChunk } from '../types/openai';
import { generateToolUseId } from './tools';
import { recordUsage } from '../utils/tokenUsage';
Expand Down Expand Up @@ -175,23 +176,23 @@ function cleanToolArgs(args: string): string {

function emitTextBlock(text: string, state: BufferedState, raw: any): void {
// Start text block
const startEvent = {
const startEvent: AnthropicStreamEvent = {
type: 'content_block_start',
index: state.contentBlockIndex,
content_block: { type: 'text', text: '' },
};
sendSSE(startEvent, raw);

// Send text delta
const deltaEvent = {
const deltaEvent: AnthropicStreamEvent = {
type: 'content_block_delta',
index: state.contentBlockIndex,
delta: { type: 'text_delta', text },
};
sendSSE(deltaEvent, raw);

// Stop text block
const stopEvent = {
const stopEvent: AnthropicStreamEvent = {
type: 'content_block_stop',
index: state.contentBlockIndex,
};
Expand All @@ -204,7 +205,7 @@ function emitToolUseBlock(toolName: string, args: string, state: BufferedState,
const toolId = generateToolUseId();

// Start tool_use block
const startEvent = {
const startEvent: AnthropicStreamEvent = {
type: 'content_block_start',
index: state.contentBlockIndex,
content_block: {
Expand All @@ -217,7 +218,7 @@ function emitToolUseBlock(toolName: string, args: string, state: BufferedState,
sendSSE(startEvent, raw);

// Send complete input as single delta
const deltaEvent = {
const deltaEvent: AnthropicStreamEvent = {
type: 'content_block_delta',
index: state.contentBlockIndex,
delta: {
Expand All @@ -228,7 +229,7 @@ function emitToolUseBlock(toolName: string, args: string, state: BufferedState,
sendSSE(deltaEvent, raw);

// Stop tool_use block
const stopEvent = {
const stopEvent: AnthropicStreamEvent = {
type: 'content_block_stop',
index: state.contentBlockIndex,
};
Expand All @@ -239,7 +240,7 @@ function emitToolUseBlock(toolName: string, args: string, state: BufferedState,
}

function sendMessageStart(state: BufferedState, raw: any): void {
const event = {
const event: AnthropicStreamEvent = {
type: 'message_start',
message: {
id: state.messageId,
Expand All @@ -252,10 +253,12 @@ function sendMessageStart(state: BufferedState, raw: any): void {
usage: {
input_tokens: state.inputTokens,
output_tokens: state.outputTokens,
cache_read_input_tokens: state.cachedInputTokens,
},
} as AnthropicUsage,
},
};
if (state.cachedInputTokens > 0) {
event.message.usage.cache_read_input_tokens = state.cachedInputTokens;
}
sendSSE(event, raw);
}

Expand All @@ -275,21 +278,23 @@ function finishStream(state: BufferedState, raw: any): void {
});

// Send message_delta
const deltaEvent = {
const usage: any = {
output_tokens: state.outputTokens,
};
if (state.cachedInputTokens > 0) {
usage.cache_read_input_tokens = state.cachedInputTokens;
}
const deltaEvent: AnthropicStreamEvent = {
type: 'message_delta',
delta: {
stop_reason: stopReason,
stop_sequence: null,
},
usage: {
output_tokens: state.outputTokens,
cache_read_input_tokens: state.cachedInputTokens,
stop_reason: stopReason as 'end_turn' | 'max_tokens' | 'stop_sequence' | 'tool_use',
},
usage,
};
sendSSE(deltaEvent, raw);

// Send message_stop
sendSSE({ type: 'message_stop' }, raw);
sendSSE({ type: 'message_stop' } as AnthropicStreamEvent, raw);

raw.end();
}
Expand All @@ -303,7 +308,7 @@ function sendErrorEvent(error: Error, state: BufferedState, raw: any): void {
streaming: true
});

const event = {
const event: AnthropicStreamEvent = {
type: 'error',
error: {
type: 'api_error',
Expand All @@ -314,7 +319,7 @@ function sendErrorEvent(error: Error, state: BufferedState, raw: any): void {
raw.end();
}

function sendSSE(data: any, raw: any): void {
function sendSSE(data: AnthropicStreamEvent, raw: any): void {
raw.write(`event: ${data.type}\n`);
raw.write(`data: ${JSON.stringify(data)}\n\n`);
}