Skip to content

wip(openai): trying to convert openai to use orchestrion #5762

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 31 additions & 7 deletions packages/datadog-instrumentations/src/openai.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,42 @@ const shimmer = require('../../datadog-shimmer')
const dc = require('dc-polyfill')
const ch = dc.tracingChannel('apm:openai:request')

const V4_PACKAGE_SHIMS = [
const ORCHESTRION_V4_PACKAGE_SHIMS = [ // cjs only
{
file: 'resources/chat/completions',
file: 'resources/completions.js',
targetClass: 'Completions',
baseResource: 'chat.completions',
baseResource: 'completions',
methods: ['create'],
streamedResponse: true
},
channelName: 'Completions_create'
}
]

for (const shim of ORCHESTRION_V4_PACKAGE_SHIMS) {
addHook({ name: 'openai', file: shim.file, versions: ['>=4'] }, exports => {
const targetPrototype = exports[shim.targetClass].prototype

for (const methodName of shim.methods) {
shimmer.wrap(targetPrototype, methodName, methodFn => function () {
const fullChannelName = `orchestrion:openai:${shim.channelName}`
const channel = dc.tracingChannel(fullChannelName)

const ctx = {
self: this, arguments
}

return channel.traceSync(methodFn, ctx, this, ...arguments)
})
}

return exports
})
}

const V4_PACKAGE_SHIMS = [
{
file: 'resources/completions',
file: 'resources/chat/completions',
targetClass: 'Completions',
baseResource: 'completions',
baseResource: 'chat.completions',
methods: ['create'],
streamedResponse: true
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,14 @@ instrumentations:
class: Embeddings
operator: traceSync
channel_name: "Embeddings_constructor"
- module_name: "openai"
version_range: ">=4"
file_path: resources/completions.mjs
function_query:
name: create
type: method
kind: sync
class: Completions
operator: traceSync
channel_name: "Completions_create"
`
171 changes: 171 additions & 0 deletions packages/datadog-plugin-openai/src/endpoint-hooks/base.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
'use strict'

const TracingPlugin = require('../../../dd-trace/src/plugins/tracing')
const { storage } = require('../../../datadog-core')
const Sampler = require('../../../dd-trace/src/sampler')
const { MEASURED } = require('../../../../ext/tags')

// let normalize

// const { DD_MAJOR } = require('../../../../version')

class OpenAiBaseEndpointHook extends TracingPlugin {
static get operation () { return 'request' }
static get system () { return 'openai' }

constructor (services, utilities, ...args) {
super(...args)

const { metrics, logger } = services
this.metrics = metrics
this.logger = logger

const { normalize } = utilities
this.normalize = normalize

this.sampler = new Sampler(0.1) // default 10% log sampling
}

bindStart (ctx) {
const payloadTags = this.getPayloadTags(ctx)
const resource = this.getResource(ctx)

const span = this.startSpan('openai.request', {
service: this.config.service,
resource,
type: 'openai',
kind: 'client',
meta: {
...payloadTags,
[MEASURED]: 1
}
}, false)

const inputTags = this.getInputTags(ctx)
span.addTags(inputTags)

const store = storage('legacy').getStore()
const openaiStore = Object.create(null)
ctx.currentStore = { ...store, span, openai: openaiStore }

return ctx.currentStore
}

end (ctx) { // sync because the promise types are custom for openai
const span = ctx.currentStore?.span
if (!span) return

const { result } = ctx
// instead of wrapping the result, queue up a separate promise to handle when the response resolves
// since we want the response headers as well, call `withResponse()` to get that
// while this makes it easier to manage on our side as opposed to wrapping, it does queue up another promise
result.withResponse().then(({ data, response }) => {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as comment explains - not ideal since we'll be queueing up a separate promise, but this way we don't have to do instance patching on the custom promise type.

the custom promise type is also the whole reason we can't do tracePromise, since itll return an object of a promise prototype not compatible with the openai node.js sdk api promise (ref)

// handle the response - assume it is not a stream at this point

const responseTags = this.getResponseTags(ctx)
span.addTags(responseTags)

span.finish()
// this.sendLog(resource, span, tags, openaiStore, error)
// this.sendMetrics(headers, body, endpoint, span._duration, error, tags)
})
}

getResource (ctx) {}

getPayloadTags (ctx) {}

getInputTags (ctx) {}

getResponseTags (ctx) {}

sendMetrics (headers, body, endpoint, duration, error, spanTags) {
const tags = [`error:${Number(!!error)}`]
if (error) {
this.metrics.increment('openai.request.error', 1, tags)
} else {
tags.push(`org:${headers['openai-organization']}`)
tags.push(`endpoint:${endpoint}`) // just "/v1/models", no method
tags.push(`model:${headers['openai-model'] || body.model}`)
}

this.metrics.distribution('openai.request.duration', duration * 1000, tags)

const promptTokens = spanTags['openai.response.usage.prompt_tokens']
const promptTokensEstimated = spanTags['openai.response.usage.prompt_tokens_estimated']

const completionTokens = spanTags['openai.response.usage.completion_tokens']
const completionTokensEstimated = spanTags['openai.response.usage.completion_tokens_estimated']

const totalTokens = spanTags['openai.response.usage.total_tokens']

if (!error) {
if (promptTokens != null) {
if (promptTokensEstimated) {
this.metrics.distribution(
'openai.tokens.prompt', promptTokens, [...tags, 'openai.estimated:true'])
} else {
this.metrics.distribution('openai.tokens.prompt', promptTokens, tags)
}
}

if (completionTokens != null) {
if (completionTokensEstimated) {
this.metrics.distribution(
'openai.tokens.completion', completionTokens, [...tags, 'openai.estimated:true'])
} else {
this.metrics.distribution('openai.tokens.completion', completionTokens, tags)
}
}

if (totalTokens != null) {
if (promptTokensEstimated || completionTokensEstimated) {
this.metrics.distribution(
'openai.tokens.total', totalTokens, [...tags, 'openai.estimated:true'])
} else {
this.metrics.distribution('openai.tokens.total', totalTokens, tags)
}
}
}

if (headers) {
if (headers['x-ratelimit-limit-requests']) {
this.metrics.gauge('openai.ratelimit.requests', Number(headers['x-ratelimit-limit-requests']), tags)
}

if (headers['x-ratelimit-remaining-requests']) {
this.metrics.gauge(
'openai.ratelimit.remaining.requests', Number(headers['x-ratelimit-remaining-requests']), tags
)
}

if (headers['x-ratelimit-limit-tokens']) {
this.metrics.gauge('openai.ratelimit.tokens', Number(headers['x-ratelimit-limit-tokens']), tags)
}

if (headers['x-ratelimit-remaining-tokens']) {
this.metrics.gauge('openai.ratelimit.remaining.tokens', Number(headers['x-ratelimit-remaining-tokens']), tags)
}
}
}

sendLog (methodName, span, tags, openaiStore, error) {
if (!openaiStore) return
if (!Object.keys(openaiStore).length) return
if (!this.sampler.isSampled()) return

const log = {
status: error ? 'error' : 'info',
message: `sampled ${methodName}`,
...openaiStore
}

this.logger.log(log, span, tags)
}
}

// function truncateApiKey (apiKey) {
// return apiKey && `sk-...${apiKey.substr(apiKey.length - 4)}`
// }

module.exports = OpenAiBaseEndpointHook
125 changes: 125 additions & 0 deletions packages/datadog-plugin-openai/src/endpoint-hooks/completions.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
'use strict'

const OpenAiBaseEndpointHook = require('./base')
const { DD_MAJOR } = require('../../../../version')
const satisfies = require('semifies')
const shimmer = require('../../../datadog-shimmer')

const { addStreamedChunk, convertBuffersToObjects } = require('../streaming')

function tryRequire (path) {
try {
return require(path)
} catch (e) {
return null
}
}

const OPENAI_VERSION = tryRequire('openai/version')
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

soooo janky and not where we want to do this but i'm not sure where else to do it - besides having a separate plugin for openai >=4.0 that sets this differently 😕 but then we'll have like 46 plugins (9 different functions traced + 5 that are already versioned + 9 by splitting it into v3 as well, all doubled for LLMObs plugins as well)


class OpenAiCompletionsEndpointHook extends OpenAiBaseEndpointHook {
static get id () { return 'openai:completions' }
static get resource () { return 'createCompletion' }
static get prefix () {
return 'tracing:orchestrion:openai:Completions_create'
}

getResource () {
if (DD_MAJOR <= 5 && satisfies(OPENAI_VERSION, '>=4.0.0')) {
return 'chat.completions.create'
} else {
return 'createCompletion'
}
}

end (ctx) {
const stream = ctx.arguments?.[0].stream

if (!stream) return super.end(ctx)

// handle the stream --> needs wrapping?
const span = ctx.currentStore?.span
if (!span) return

const { result } = ctx

const n = getOption(ctx.arguments, 'n', 1)

const plugin = this

// we need to wrap the stream that the user will consume
// the stream is just a generator function, and each chunk could either be a buffer or an object

// we cannot separately tee and consume the stream, as to accurately represent the execution time
// in the users application - we wrap the stream for this reason

// also this is messy - just copied from the existing instrumentation
// it needs to be cleaned up
shimmer.wrap(result, 'parse', parse => function () {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment explains it but this is one pain point so far - how do we properly wrap this stream? we don't want to consume it ourselves because we want to finish the span once the user has finished consuming it (assuming they do some stuff between each chunk). in the current approach in datadog-instrumentations/src/openai.js we do what we're doing here and wrap it directly... not sure how it translates to here if we don't want to be doing patching here (although, i would maybe argue it's not so much patching as instance wrapping, if that really represents a difference)

return parse.apply(this, arguments)
.then(body => Promise.all([this.responsePromise, body]))
.then(([{ response, options }, body]) => {
shimmer.wrap(body, Symbol.asyncIterator, asyncIterator => function () {
const iterator = asyncIterator.apply(this, arguments)

let chunks = []
let processChunksAsBuffers = false
shimmer.wrap(iterator, 'next', next => function () {
return next.apply(this, arguments)
.then(res => {
const { done, value: chunk } = res

if (chunk) {
chunks.push(chunk)
if (chunk instanceof Buffer) {
// this operation should be safe
// if one chunk is a buffer (versus a plain object), the rest should be as well
processChunksAsBuffers = true
}
}

if (done) {
let body = {}
chunks = chunks.filter(chunk => chunk != null) // filter null or undefined values

if (chunks) {
if (processChunksAsBuffers) {
chunks = convertBuffersToObjects(chunks)
}

if (chunks.length) {
// define the initial body having all the content outside of choices from the first chunk
// this will include import data like created, id, model, etc.
body = { ...chunks[0], choices: Array.from({ length: n }) }
// start from the first chunk, and add its choices into the body
for (let i = 0; i < chunks.length; i++) {
addStreamedChunk(body, chunks[i])
}
}
}

// use headers, response, options, and computed body to set finish tags
span.finish() // TODO do other processing here
}

return res
})
.catch(err => {
plugin.addError(err, span)

throw err
})
})
return iterator
})
return body
})
})
}
}

function getOption (args, option, defaultValue) {
return args?.[0]?.[option] || defaultValue
}

module.exports = OpenAiCompletionsEndpointHook
5 changes: 4 additions & 1 deletion packages/datadog-plugin-openai/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@

const CompositePlugin = require('../../dd-trace/src/plugins/composite')
const OpenAiTracingPlugin = require('./tracing')
const OpenAiTracingPluginOrchestrion = require('./orchestrion-migration')
const OpenAiLLMObsPlugin = require('../../dd-trace/src/llmobs/plugins/openai')

class OpenAiPlugin extends CompositePlugin {
static get id () { return 'openai' }
static get plugins () {
return {
llmobs: OpenAiLLMObsPlugin,
tracing: OpenAiTracingPlugin
tracing: OpenAiTracingPlugin, // this one will go away
// the below plugin should replace the above tracing plugin and be renamed to "tracing"
orchestrion: OpenAiTracingPluginOrchestrion
}
}
}
Expand Down
Loading
Loading