diff --git a/packages/datadog-instrumentations/src/openai.js b/packages/datadog-instrumentations/src/openai.js index a5771c2178e..778090f5790 100644 --- a/packages/datadog-instrumentations/src/openai.js +++ b/packages/datadog-instrumentations/src/openai.js @@ -6,18 +6,42 @@ const shimmer = require('../../datadog-shimmer') const dc = require('dc-polyfill') const ch = dc.tracingChannel('apm:openai:request') -const V4_PACKAGE_SHIMS = [ +const ORCHESTRION_V4_PACKAGE_SHIMS = [ // cjs only { - file: 'resources/chat/completions', + file: 'resources/completions.js', targetClass: 'Completions', - baseResource: 'chat.completions', + baseResource: 'completions', methods: ['create'], - streamedResponse: true - }, + channelName: 'Completions_create' + } +] + +for (const shim of ORCHESTRION_V4_PACKAGE_SHIMS) { + addHook({ name: 'openai', file: shim.file, versions: ['>=4'] }, exports => { + const targetPrototype = exports[shim.targetClass].prototype + + for (const methodName of shim.methods) { + shimmer.wrap(targetPrototype, methodName, methodFn => function () { + const fullChannelName = `orchestrion:openai:${shim.channelName}` + const channel = dc.tracingChannel(fullChannelName) + + const ctx = { + self: this, arguments + } + + return channel.traceSync(methodFn, ctx, this, ...arguments) + }) + } + + return exports + }) +} + +const V4_PACKAGE_SHIMS = [ { - file: 'resources/completions', + file: 'resources/chat/completions', targetClass: 'Completions', - baseResource: 'completions', + baseResource: 'chat.completions', methods: ['create'], streamedResponse: true }, diff --git a/packages/datadog-instrumentations/src/orchestrion-config/index.js b/packages/datadog-instrumentations/src/orchestrion-config/index.js index 15830d1bc34..a52f482e13e 100644 --- a/packages/datadog-instrumentations/src/orchestrion-config/index.js +++ b/packages/datadog-instrumentations/src/orchestrion-config/index.js @@ -51,4 +51,14 @@ instrumentations: class: Embeddings operator: traceSync channel_name: "Embeddings_constructor" + - module_name: "openai" + version_range: ">=4" + file_path: resources/completions.mjs + function_query: + name: create + type: method + kind: sync + class: Completions + operator: traceSync + channel_name: "Completions_create" ` diff --git a/packages/datadog-plugin-openai/src/endpoint-hooks/base.js b/packages/datadog-plugin-openai/src/endpoint-hooks/base.js new file mode 100644 index 00000000000..9ec7b7ab3b9 --- /dev/null +++ b/packages/datadog-plugin-openai/src/endpoint-hooks/base.js @@ -0,0 +1,171 @@ +'use strict' + +const TracingPlugin = require('../../../dd-trace/src/plugins/tracing') +const { storage } = require('../../../datadog-core') +const Sampler = require('../../../dd-trace/src/sampler') +const { MEASURED } = require('../../../../ext/tags') + +// let normalize + +// const { DD_MAJOR } = require('../../../../version') + +class OpenAiBaseEndpointHook extends TracingPlugin { + static get operation () { return 'request' } + static get system () { return 'openai' } + + constructor (services, utilities, ...args) { + super(...args) + + const { metrics, logger } = services + this.metrics = metrics + this.logger = logger + + const { normalize } = utilities + this.normalize = normalize + + this.sampler = new Sampler(0.1) // default 10% log sampling + } + + bindStart (ctx) { + const payloadTags = this.getPayloadTags(ctx) + const resource = this.getResource(ctx) + + const span = this.startSpan('openai.request', { + service: this.config.service, + resource, + type: 'openai', + kind: 'client', + meta: { + ...payloadTags, + [MEASURED]: 1 + } + }, false) + + const inputTags = this.getInputTags(ctx) + span.addTags(inputTags) + + const store = storage('legacy').getStore() + const openaiStore = Object.create(null) + ctx.currentStore = { ...store, span, openai: openaiStore } + + return ctx.currentStore + } + + end (ctx) { // sync because the promise types are custom for openai + const span = ctx.currentStore?.span + if (!span) return + + const { result } = ctx + // instead of wrapping the result, queue up a separate promise to handle when the response resolves + // since we want the response headers as well, call `withResponse()` to get that + // while this makes it easier to manage on our side as opposed to wrapping, it does queue up another promise + result.withResponse().then(({ data, response }) => { + // handle the response - assume it is not a stream at this point + + const responseTags = this.getResponseTags(ctx) + span.addTags(responseTags) + + span.finish() + // this.sendLog(resource, span, tags, openaiStore, error) + // this.sendMetrics(headers, body, endpoint, span._duration, error, tags) + }) + } + + getResource (ctx) {} + + getPayloadTags (ctx) {} + + getInputTags (ctx) {} + + getResponseTags (ctx) {} + + sendMetrics (headers, body, endpoint, duration, error, spanTags) { + const tags = [`error:${Number(!!error)}`] + if (error) { + this.metrics.increment('openai.request.error', 1, tags) + } else { + tags.push(`org:${headers['openai-organization']}`) + tags.push(`endpoint:${endpoint}`) // just "/v1/models", no method + tags.push(`model:${headers['openai-model'] || body.model}`) + } + + this.metrics.distribution('openai.request.duration', duration * 1000, tags) + + const promptTokens = spanTags['openai.response.usage.prompt_tokens'] + const promptTokensEstimated = spanTags['openai.response.usage.prompt_tokens_estimated'] + + const completionTokens = spanTags['openai.response.usage.completion_tokens'] + const completionTokensEstimated = spanTags['openai.response.usage.completion_tokens_estimated'] + + const totalTokens = spanTags['openai.response.usage.total_tokens'] + + if (!error) { + if (promptTokens != null) { + if (promptTokensEstimated) { + this.metrics.distribution( + 'openai.tokens.prompt', promptTokens, [...tags, 'openai.estimated:true']) + } else { + this.metrics.distribution('openai.tokens.prompt', promptTokens, tags) + } + } + + if (completionTokens != null) { + if (completionTokensEstimated) { + this.metrics.distribution( + 'openai.tokens.completion', completionTokens, [...tags, 'openai.estimated:true']) + } else { + this.metrics.distribution('openai.tokens.completion', completionTokens, tags) + } + } + + if (totalTokens != null) { + if (promptTokensEstimated || completionTokensEstimated) { + this.metrics.distribution( + 'openai.tokens.total', totalTokens, [...tags, 'openai.estimated:true']) + } else { + this.metrics.distribution('openai.tokens.total', totalTokens, tags) + } + } + } + + if (headers) { + if (headers['x-ratelimit-limit-requests']) { + this.metrics.gauge('openai.ratelimit.requests', Number(headers['x-ratelimit-limit-requests']), tags) + } + + if (headers['x-ratelimit-remaining-requests']) { + this.metrics.gauge( + 'openai.ratelimit.remaining.requests', Number(headers['x-ratelimit-remaining-requests']), tags + ) + } + + if (headers['x-ratelimit-limit-tokens']) { + this.metrics.gauge('openai.ratelimit.tokens', Number(headers['x-ratelimit-limit-tokens']), tags) + } + + if (headers['x-ratelimit-remaining-tokens']) { + this.metrics.gauge('openai.ratelimit.remaining.tokens', Number(headers['x-ratelimit-remaining-tokens']), tags) + } + } + } + + sendLog (methodName, span, tags, openaiStore, error) { + if (!openaiStore) return + if (!Object.keys(openaiStore).length) return + if (!this.sampler.isSampled()) return + + const log = { + status: error ? 'error' : 'info', + message: `sampled ${methodName}`, + ...openaiStore + } + + this.logger.log(log, span, tags) + } +} + +// function truncateApiKey (apiKey) { +// return apiKey && `sk-...${apiKey.substr(apiKey.length - 4)}` +// } + +module.exports = OpenAiBaseEndpointHook diff --git a/packages/datadog-plugin-openai/src/endpoint-hooks/completions.js b/packages/datadog-plugin-openai/src/endpoint-hooks/completions.js new file mode 100644 index 00000000000..c062d625cdf --- /dev/null +++ b/packages/datadog-plugin-openai/src/endpoint-hooks/completions.js @@ -0,0 +1,125 @@ +'use strict' + +const OpenAiBaseEndpointHook = require('./base') +const { DD_MAJOR } = require('../../../../version') +const satisfies = require('semifies') +const shimmer = require('../../../datadog-shimmer') + +const { addStreamedChunk, convertBuffersToObjects } = require('../streaming') + +function tryRequire (path) { + try { + return require(path) + } catch (e) { + return null + } +} + +const OPENAI_VERSION = tryRequire('openai/version') + +class OpenAiCompletionsEndpointHook extends OpenAiBaseEndpointHook { + static get id () { return 'openai:completions' } + static get resource () { return 'createCompletion' } + static get prefix () { + return 'tracing:orchestrion:openai:Completions_create' + } + + getResource () { + if (DD_MAJOR <= 5 && satisfies(OPENAI_VERSION, '>=4.0.0')) { + return 'chat.completions.create' + } else { + return 'createCompletion' + } + } + + end (ctx) { + const stream = ctx.arguments?.[0].stream + + if (!stream) return super.end(ctx) + + // handle the stream --> needs wrapping? + const span = ctx.currentStore?.span + if (!span) return + + const { result } = ctx + + const n = getOption(ctx.arguments, 'n', 1) + + const plugin = this + + // we need to wrap the stream that the user will consume + // the stream is just a generator function, and each chunk could either be a buffer or an object + + // we cannot separately tee and consume the stream, as to accurately represent the execution time + // in the users application - we wrap the stream for this reason + + // also this is messy - just copied from the existing instrumentation + // it needs to be cleaned up + shimmer.wrap(result, 'parse', parse => function () { + return parse.apply(this, arguments) + .then(body => Promise.all([this.responsePromise, body])) + .then(([{ response, options }, body]) => { + shimmer.wrap(body, Symbol.asyncIterator, asyncIterator => function () { + const iterator = asyncIterator.apply(this, arguments) + + let chunks = [] + let processChunksAsBuffers = false + shimmer.wrap(iterator, 'next', next => function () { + return next.apply(this, arguments) + .then(res => { + const { done, value: chunk } = res + + if (chunk) { + chunks.push(chunk) + if (chunk instanceof Buffer) { + // this operation should be safe + // if one chunk is a buffer (versus a plain object), the rest should be as well + processChunksAsBuffers = true + } + } + + if (done) { + let body = {} + chunks = chunks.filter(chunk => chunk != null) // filter null or undefined values + + if (chunks) { + if (processChunksAsBuffers) { + chunks = convertBuffersToObjects(chunks) + } + + if (chunks.length) { + // define the initial body having all the content outside of choices from the first chunk + // this will include import data like created, id, model, etc. + body = { ...chunks[0], choices: Array.from({ length: n }) } + // start from the first chunk, and add its choices into the body + for (let i = 0; i < chunks.length; i++) { + addStreamedChunk(body, chunks[i]) + } + } + } + + // use headers, response, options, and computed body to set finish tags + span.finish() // TODO do other processing here + } + + return res + }) + .catch(err => { + plugin.addError(err, span) + + throw err + }) + }) + return iterator + }) + return body + }) + }) + } +} + +function getOption (args, option, defaultValue) { + return args?.[0]?.[option] || defaultValue +} + +module.exports = OpenAiCompletionsEndpointHook diff --git a/packages/datadog-plugin-openai/src/index.js b/packages/datadog-plugin-openai/src/index.js index c76f7333910..57c0610b0d5 100644 --- a/packages/datadog-plugin-openai/src/index.js +++ b/packages/datadog-plugin-openai/src/index.js @@ -2,6 +2,7 @@ const CompositePlugin = require('../../dd-trace/src/plugins/composite') const OpenAiTracingPlugin = require('./tracing') +const OpenAiTracingPluginOrchestrion = require('./orchestrion-migration') const OpenAiLLMObsPlugin = require('../../dd-trace/src/llmobs/plugins/openai') class OpenAiPlugin extends CompositePlugin { @@ -9,7 +10,9 @@ class OpenAiPlugin extends CompositePlugin { static get plugins () { return { llmobs: OpenAiLLMObsPlugin, - tracing: OpenAiTracingPlugin + tracing: OpenAiTracingPlugin, // this one will go away + // the below plugin should replace the above tracing plugin and be renamed to "tracing" + orchestrion: OpenAiTracingPluginOrchestrion } } } diff --git a/packages/datadog-plugin-openai/src/orchestrion-migration.js b/packages/datadog-plugin-openai/src/orchestrion-migration.js new file mode 100644 index 00000000000..09fd27569b9 --- /dev/null +++ b/packages/datadog-plugin-openai/src/orchestrion-migration.js @@ -0,0 +1,26 @@ +'use strict' + +const CompositePlugin = require('../../dd-trace/src/plugins/composite') +const OpenAiCompletionEndpointHook = require('./endpoint-hooks/completions') + +const services = require('./services') +const makeUtilities = require('../../dd-trace/src/plugins/util/llm') + +class OpenAiTracingPluginOrchestrion extends CompositePlugin { + static get id () { return 'openai-orchestrion' } // TODO: rename this to "openai" + static get system () { return 'openai' } + static get plugins () { + return { + completion: OpenAiCompletionEndpointHook + } + } + + constructor (tracer, tracerConfig) { + const metricsAndLogServices = services.init(tracerConfig) // TODO can we deprecate this? + const utilities = makeUtilities('openai', tracerConfig) // TODO we'll need to deprecate this too + + super(metricsAndLogServices, utilities, tracer, tracerConfig) + } +} + +module.exports = OpenAiTracingPluginOrchestrion diff --git a/packages/datadog-plugin-openai/src/streaming.js b/packages/datadog-plugin-openai/src/streaming.js new file mode 100644 index 00000000000..abfcced1183 --- /dev/null +++ b/packages/datadog-plugin-openai/src/streaming.js @@ -0,0 +1,71 @@ +function addStreamedChunk (content, chunk) { + content.usage = chunk.usage // add usage if it was specified to be returned + for (const choice of chunk.choices) { + const choiceIdx = choice.index + const oldChoice = content.choices.find(choice => choice?.index === choiceIdx) + if (!oldChoice) { + // we don't know which choices arrive in which order + content.choices[choiceIdx] = choice + } else { + if (!oldChoice.finish_reason) { + oldChoice.finish_reason = choice.finish_reason + } + + // delta exists on chat completions + const delta = choice.delta + + if (delta) { + const content = delta.content + if (content) { + if (oldChoice.delta.content) { // we don't want to append to undefined + oldChoice.delta.content += content + } else { + oldChoice.delta.content = content + } + } + } else { + const text = choice.text + if (text) { + if (oldChoice.text) { + oldChoice.text += text + } else { + oldChoice.text = text + } + } + } + + // tools only exist on chat completions + const tools = delta && choice.delta.tool_calls + + if (tools) { + oldChoice.delta.tool_calls = tools.map((newTool, toolIdx) => { + const oldTool = oldChoice.delta.tool_calls?.[toolIdx] + + if (oldTool) { + oldTool.function.arguments += newTool.function.arguments + } else { + return newTool + } + + return oldTool + }) + } + } + } +} + +function convertBuffersToObjects (chunks = []) { + return Buffer + .concat(chunks) // combine the buffers + .toString() // stringify + .split(/(?=data:)/) // split on "data:" + .map(chunk => chunk.split('\n').join('')) // remove newlines + .map(chunk => chunk.substring(6)) // remove 'data: ' from the front + .slice(0, -1) // remove the last [DONE] message + .map(JSON.parse) // parse all of the returned objects +} + +module.exports = { + addStreamedChunk, + convertBuffersToObjects +}