Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions valhalla/jawn/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
"main": "index.js",
"license": "MIT",
"dependencies": {
"@anthropic-ai/tokenizer": "^0.0.4",
"@aws-sdk/client-s3": "^3.678.0",
"@aws-sdk/client-sqs": "^3.799.0",
"@aws-sdk/s3-request-presigner": "^3.537.0",
Expand Down Expand Up @@ -49,7 +48,6 @@
"generate-api-key": "^1.0.2",
"gpt-tokenizer": "^2.1.2",
"ioredis": "^5.3.2",
"js-tiktoken": "^1.0.8",
"kafkajs": "^2.2.4",
"lastmile": "^0.4.0",
"llama-api-client": "^0.1.2",
Expand All @@ -66,7 +64,6 @@
"rate-limit-redis": "^4.2.0",
"stripe": "17.7.0",
"swagger-ui-express": "^5.0.0",
"tiktoken": "^1.0.15",
"typescript": "^5.2.2",
"uuid": "^9.0.1",
"uuidv4": "^6.2.13",
Expand Down
3 changes: 0 additions & 3 deletions valhalla/jawn/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import {
} from "./lib/clients/kafkaConsumers/constant";
import { webSocketProxyForwarder } from "./lib/proxy/WebSocketProxyForwarder";
import { RequestWrapper } from "./lib/requestWrapper/requestWrapper";
import { tokenRouter } from "./lib/routers/tokenRouter";
import { DelayedOperationService } from "./lib/shared/delayedOperationService";
import { runLoopsOnce, runMainLoops } from "./mainLoops";
import { authFromRequest, authMiddleware } from "./middleware/auth";
Expand Down Expand Up @@ -180,8 +179,6 @@ unAuthenticatedRouter.use(
swaggerUi.setup(publicSwaggerDoc as any)
);

unAuthenticatedRouter.use(tokenRouter);

unAuthenticatedRouter.use("/download/swagger.json", (req, res) => {
res.json(publicSwaggerDoc as any);
});
Expand Down
31 changes: 0 additions & 31 deletions valhalla/jawn/src/lib/routers/tokenRouter.ts

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,86 +1,57 @@
import {
calculateModel,
getModelFromResponse,
} from "../../../utils/modelMapper";
import { getTokenCountAnthropic } from "../../tokens/tokenCounter";
import { PromiseGenericResult, ok } from "../../../packages/common/result";
import { IBodyProcessor, ParseInput, ParseOutput } from "./IBodyProcessor";

export class AnthropicBodyProcessor implements IBodyProcessor {
public async parse(
parseInput: ParseInput,
): PromiseGenericResult<ParseOutput> {
const { responseBody, requestModel, modelOverride } = parseInput;
const { responseBody } = parseInput;
const parsedResponseBody = JSON.parse(responseBody);
const responseModel = getModelFromResponse(parsedResponseBody);
const model = calculateModel(requestModel, responseModel, modelOverride);

if (
model?.includes("claude-3") ||
model?.includes("claude-sonnet-4") ||
model?.includes("claude-opus-4") ||
// for AI SDK
model?.includes("claude-4")
!parsedResponseBody?.usage?.output_tokens ||
!parsedResponseBody?.usage?.input_tokens
) {
if (
!parsedResponseBody?.usage?.output_tokens ||
!parsedResponseBody?.usage?.input_tokens
) {
if (!parsedResponseBody.input_tokens) {
return ok({
processedBody: parsedResponseBody,
});
} else {
// handles the claude code integration response which has input_tokens at the root
// Example: { input_tokens: 12470, context_management: { original_input_tokens: 12800 } }

return ok({
processedBody: parsedResponseBody,
usage: {
totalTokens: parsedResponseBody.input_tokens ?? 0,
promptTokens: parsedResponseBody.input_tokens ?? 0,
promptCacheWriteTokens: 0,
promptCacheReadTokens: 0,
completionTokens: 0,
},
});
}
if (!parsedResponseBody.input_tokens) {
return ok({
processedBody: parsedResponseBody,
});
} else {
// handles the claude code integration response which has input_tokens at the root
// Example: { input_tokens: 12470, context_management: { original_input_tokens: 12800 } }

return ok({
processedBody: parsedResponseBody,
usage: {
totalTokens:
parsedResponseBody?.usage?.input_tokens +
parsedResponseBody?.usage?.output_tokens +
parsedResponseBody?.usage?.cache_creation_input_tokens +
parsedResponseBody?.usage?.cache_read_input_tokens,
promptTokens: parsedResponseBody?.usage?.input_tokens,
promptCacheWriteTokens:
parsedResponseBody?.usage?.cache_creation_input_tokens,
promptCacheReadTokens:
parsedResponseBody?.usage?.cache_read_input_tokens,
completionTokens: parsedResponseBody?.usage?.output_tokens,
promptCacheWrite5m:
parsedResponseBody?.usage?.cache_creation
?.ephemeral_5m_input_tokens,
promptCacheWrite1h:
parsedResponseBody?.usage?.cache_creation
?.ephemeral_1h_input_tokens,
heliconeCalculated: true,
totalTokens: parsedResponseBody.input_tokens ?? 0,
promptTokens: parsedResponseBody.input_tokens ?? 0,
promptCacheWriteTokens: 0,
promptCacheReadTokens: 0,
completionTokens: 0,
},
});
}
} else {
const prompt = parsedResponseBody?.prompt ?? "";
const completion = parsedResponseBody?.completion ?? "";
const completionTokens = await getTokenCountAnthropic(completion);
const promptTokens = await getTokenCountAnthropic(prompt);
return ok({
processedBody: parsedResponseBody,
usage: {
totalTokens: promptTokens + completionTokens,
promptTokens: promptTokens,
completionTokens: completionTokens,
heliconeCalculated: true,
totalTokens:
(parsedResponseBody?.usage?.input_tokens ?? 0) +
(parsedResponseBody?.usage?.output_tokens ?? 0) +
(parsedResponseBody?.usage?.cache_creation_input_tokens ?? 0) +
(parsedResponseBody?.usage?.cache_read_input_tokens ?? 0),
promptTokens: parsedResponseBody?.usage?.input_tokens,
promptCacheWriteTokens:
parsedResponseBody?.usage?.cache_creation_input_tokens,
promptCacheReadTokens:
parsedResponseBody?.usage?.cache_read_input_tokens,
completionTokens: parsedResponseBody?.usage?.output_tokens,
promptCacheWrite5m:
parsedResponseBody?.usage?.cache_creation
?.ephemeral_5m_input_tokens,
promptCacheWrite1h:
parsedResponseBody?.usage?.cache_creation
?.ephemeral_1h_input_tokens,
},
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
import { calculateModel } from "../../../utils/modelMapper";
import { consolidateTextFields } from "../../../utils/streamParser";
import { getTokenCountAnthropic } from "../../tokens/tokenCounter";
import { PromiseGenericResult, ok } from "../../../packages/common/result";
import { IBodyProcessor, ParseInput, ParseOutput } from "./IBodyProcessor";
import { isParseInputJson } from "./helpers";
Expand All @@ -25,9 +22,7 @@ export class AnthropicStreamBodyProcessor implements IBodyProcessor {
});
}

const { responseBody, requestBody, requestModel, modelOverride } =
parseInput;
const model = calculateModel(requestModel, undefined, modelOverride);
const { responseBody } = parseInput;

// Store the original response body for later use
const originalResponseBody = responseBody;
Expand Down Expand Up @@ -110,70 +105,38 @@ export class AnthropicStreamBodyProcessor implements IBodyProcessor {
}

try {
const processedBody = {
...processConsolidatedJsonForClaude3(processedLines),
// Store the original response body
streamed_data: originalResponseBody,
};

if (
model?.includes("claude-3") ||
model?.includes("claude-sonnet-4") ||
model?.includes("claude-opus-4") ||
// for AI SDK
model?.includes("claude-4")
!processedBody?.usage?.output_tokens ||
!processedBody?.usage?.input_tokens
) {
const processedBody = {
...processConsolidatedJsonForClaude3(processedLines),
// Store the original response body
streamed_data: originalResponseBody,
};

if (
!processedBody?.usage?.output_tokens ||
!processedBody?.usage?.input_tokens
) {
return ok({
processedBody: processedBody,
});
} else {
return ok({
processedBody: processedBody,
usage: {
totalTokens:
processedBody?.usage?.input_tokens +
processedBody?.usage?.output_tokens +
processedBody?.usage?.cache_creation_input_tokens +
processedBody?.usage?.cache_read_input_tokens,
promptTokens: processedBody?.usage?.input_tokens,
promptCacheWriteTokens:
processedBody?.usage?.cache_creation_input_tokens,
promptCacheReadTokens:
processedBody?.usage?.cache_read_input_tokens,
completionTokens: processedBody?.usage?.output_tokens,
promptCacheWrite5m:
processedBody?.usage?.cache_creation?.ephemeral_5m_input_tokens,
promptCacheWrite1h:
processedBody?.usage?.cache_creation?.ephemeral_1h_input_tokens,
heliconeCalculated: true,
},
});
}
return ok({
processedBody: processedBody,
});
} else {
const claudeData = {
...processedLines[processedLines.length - 1],
completion: processedLines.map((d) => d.completion).join(""),
};
const completionTokens = await getTokenCountAnthropic(
claudeData.completion
);
const promptTokens = await getTokenCountAnthropic(
JSON.parse(requestBody ?? "{}")?.prompt ?? ""
);
return ok({
processedBody: {
...consolidateTextFields(processedLines),
streamed_data: originalResponseBody,
},
processedBody: processedBody,
usage: {
totalTokens: completionTokens + promptTokens,
promptTokens: promptTokens,
completionTokens: completionTokens,
heliconeCalculated: true,
totalTokens:
(processedBody?.usage?.input_tokens ?? 0) +
(processedBody?.usage?.output_tokens ?? 0) +
(processedBody?.usage?.cache_creation_input_tokens ?? 0) +
(processedBody?.usage?.cache_read_input_tokens ?? 0),
promptTokens: processedBody?.usage?.input_tokens,
promptCacheWriteTokens:
processedBody?.usage?.cache_creation_input_tokens,
promptCacheReadTokens:
processedBody?.usage?.cache_read_input_tokens,
completionTokens: processedBody?.usage?.output_tokens,
promptCacheWrite5m:
processedBody?.usage?.cache_creation?.ephemeral_5m_input_tokens,
promptCacheWrite1h:
processedBody?.usage?.cache_creation?.ephemeral_1h_input_tokens,
},
});
}
Expand Down Expand Up @@ -269,81 +232,3 @@ function processConsolidatedJsonForClaude3(events: any[]): any {

return acc;
}

// This function is no longer used but kept for reference
function recursivelyConsolidateAnthropicListForClaude3(delta: any[]): any {
return delta.reduce((acc, item) => {
if (Array.isArray(item)) {
return recursivelyConsolidateAnthropicListForClaude3(item);
}
if (typeof item !== "object") {
return item;
}

if (Object.keys(item).length === 0) {
return acc;
}
if (item.type === "message_delta") {
return recursivelyConsolidateAnthropic(acc, {
...item.delta,
...item,
type: undefined,
});
}

if (item.type === "ping") {
return acc;
}

if (item.type === "content_block_start") {
return acc;
}

if (item.type === "content_block_stop") {
return acc;
}

if (item.type === "content_block_delta") {
recursivelyConsolidateAnthropic(acc, {
content: [
{
type: "text",
text: item.delta.text,
},
],
});
}

if (item.type === "message_start") {
return recursivelyConsolidateAnthropic(acc, item.message);
}

// console.log("Item Without Ignore Keys", item);

return recursivelyConsolidateAnthropic(acc, item);
}, {});
}

// This function is no longer used but kept for reference
function recursivelyConsolidateAnthropic(body: any, delta: any): any {
Object.keys(delta).forEach((key) => {
if (key === "stop_reason") {
// console.log("Stop Reason", delta[key]);
}
if (key === "delta") {
} else if (key === "type") {
body[key] = delta[key];
} else if (body[key] === undefined || body[key] === null) {
body[key] = delta[key];
} else if (typeof body[key] === "object") {
body[key] = recursivelyConsolidateAnthropic(body[key], delta[key]);
} else if (typeof body[key] === "number") {
body[key] += delta[key];
} else if (typeof body[key] === "string") {
body[key] += delta[key];
} else {
throw new Error("Invalid");
}
});
return body;
}
Loading
Loading