From 98ceacfd844cd587107928f71d4a2f0f06c023f3 Mon Sep 17 00:00:00 2001 From: Sam Brenner <106700075+sabrenner@users.noreply.github.com> Date: Fri, 20 Dec 2024 11:42:14 -0500 Subject: [PATCH] [MLOB-1942] fix(llmobs): auto-annotations for wrapped functions happen after manual annotations (#4960) * auto-annotation done before span finish * error cases * callback scoped consistently to apm * make clearer --- packages/dd-trace/src/llmobs/sdk.js | 116 +++++++++--- .../dd-trace/test/llmobs/sdk/index.spec.js | 177 ++++++++++++++++++ 2 files changed, 267 insertions(+), 26 deletions(-) diff --git a/packages/dd-trace/src/llmobs/sdk.js b/packages/dd-trace/src/llmobs/sdk.js index 91fe1e8f70a..2a6d548d656 100644 --- a/packages/dd-trace/src/llmobs/sdk.js +++ b/packages/dd-trace/src/llmobs/sdk.js @@ -1,12 +1,12 @@ 'use strict' -const { SPAN_KIND, OUTPUT_VALUE } = require('./constants/tags') +const { SPAN_KIND, OUTPUT_VALUE, INPUT_VALUE } = require('./constants/tags') const { getFunctionArguments, validateKind } = require('./util') -const { isTrue } = require('../util') +const { isTrue, isError } = require('../util') const { storage } = require('./storage') @@ -134,29 +134,63 @@ class LLMObs extends NoopLLMObs { function wrapped () { const span = llmobs._tracer.scope().active() - - const result = llmobs._activate(span, { kind, options: llmobsOptions }, () => { - if (!['llm', 'embedding'].includes(kind)) { - llmobs.annotate(span, { inputData: getFunctionArguments(fn, arguments) }) + const fnArgs = arguments + + const lastArgId = fnArgs.length - 1 + const cb = fnArgs[lastArgId] + const hasCallback = typeof cb === 'function' + + if (hasCallback) { + const scopeBoundCb = llmobs._bind(cb) + fnArgs[lastArgId] = function () { + // it is standard practice to follow the callback signature (err, result) + // however, we try to parse the arguments to determine if the first argument is an error + // if it is not, and is not undefined, we will use that for the output value + const maybeError = arguments[0] + const maybeResult = arguments[1] + + llmobs._autoAnnotate( + span, + kind, + getFunctionArguments(fn, fnArgs), + isError(maybeError) || maybeError == null ? maybeResult : maybeError + ) + + return scopeBoundCb.apply(this, arguments) } + } - return fn.apply(this, arguments) - }) + try { + const result = llmobs._activate(span, { kind, options: llmobsOptions }, () => fn.apply(this, fnArgs)) + + if (result && typeof result.then === 'function') { + return result.then( + value => { + if (!hasCallback) { + llmobs._autoAnnotate(span, kind, getFunctionArguments(fn, fnArgs), value) + } + return value + }, + err => { + llmobs._autoAnnotate(span, kind, getFunctionArguments(fn, fnArgs)) + throw err + } + ) + } - if (result && typeof result.then === 'function') { - return result.then(value => { - if (value && !['llm', 'retrieval'].includes(kind) && !LLMObsTagger.tagMap.get(span)?.[OUTPUT_VALUE]) { - llmobs.annotate(span, { outputData: value }) - } - return value - }) - } + // it is possible to return a value and have a callback + // however, since the span finishes when the callback is called, it is possible that + // the callback is called before the function returns (although unlikely) + // we do not want to throw for "annotating a finished span" in this case + if (!hasCallback) { + llmobs._autoAnnotate(span, kind, getFunctionArguments(fn, fnArgs), result) + } - if (result && !['llm', 'retrieval'].includes(kind) && !LLMObsTagger.tagMap.get(span)?.[OUTPUT_VALUE]) { - llmobs.annotate(span, { outputData: result }) + return result + } catch (e) { + llmobs._autoAnnotate(span, kind, getFunctionArguments(fn, fnArgs)) + throw e } - - return result } return this._tracer.wrap(name, spanOptions, wrapped) @@ -333,20 +367,34 @@ class LLMObs extends NoopLLMObs { flushCh.publish() } + _autoAnnotate (span, kind, input, output) { + const annotations = {} + if (input && !['llm', 'embedding'].includes(kind) && !LLMObsTagger.tagMap.get(span)?.[INPUT_VALUE]) { + annotations.inputData = input + } + + if (output && !['llm', 'retrieval'].includes(kind) && !LLMObsTagger.tagMap.get(span)?.[OUTPUT_VALUE]) { + annotations.outputData = output + } + + this.annotate(span, annotations) + } + _active () { const store = storage.getStore() return store?.span } - _activate (span, { kind, options } = {}, fn) { + _activate (span, options, fn) { const parent = this._active() if (this.enabled) storage.enterWith({ span }) - this._tagger.registerLLMObsSpan(span, { - ...options, - parent, - kind - }) + if (options) { + this._tagger.registerLLMObsSpan(span, { + ...options, + parent + }) + } try { return fn() @@ -355,6 +403,22 @@ class LLMObs extends NoopLLMObs { } } + // bind function to active LLMObs span + _bind (fn) { + if (typeof fn !== 'function') return fn + + const llmobs = this + const activeSpan = llmobs._active() + + const bound = function () { + return llmobs._activate(activeSpan, null, () => { + return fn.apply(this, arguments) + }) + } + + return bound + } + _extractOptions (options) { const { modelName, diff --git a/packages/dd-trace/test/llmobs/sdk/index.spec.js b/packages/dd-trace/test/llmobs/sdk/index.spec.js index 69dad1d60c4..e7cfb81a47d 100644 --- a/packages/dd-trace/test/llmobs/sdk/index.spec.js +++ b/packages/dd-trace/test/llmobs/sdk/index.spec.js @@ -17,6 +17,7 @@ describe('sdk', () => { let LLMObsSDK let llmobs let tracer + let clock before(() => { tracer = require('../../../../dd-trace') @@ -43,6 +44,8 @@ describe('sdk', () => { // remove max listener warnings, we don't care about the writer anyways process.removeAllListeners('beforeExit') + + clock = sinon.useFakeTimers() }) afterEach(() => { @@ -435,6 +438,180 @@ describe('sdk', () => { }) }) + it('does not crash for auto-annotation values that are overriden', () => { + const circular = {} + circular.circular = circular + + let span + function myWorkflow (input) { + span = llmobs._active() + llmobs.annotate({ + inputData: 'circular', + outputData: 'foo' + }) + return '' + } + + const wrappedMyWorkflow = llmobs.wrap({ kind: 'workflow' }, myWorkflow) + wrappedMyWorkflow(circular) + + expect(LLMObsTagger.tagMap.get(span)).to.deep.equal({ + '_ml_obs.meta.span.kind': 'workflow', + '_ml_obs.meta.ml_app': 'mlApp', + '_ml_obs.llmobs_parent_id': 'undefined', + '_ml_obs.meta.input.value': 'circular', + '_ml_obs.meta.output.value': 'foo' + }) + }) + + it('only auto-annotates input on error', () => { + let span + function myTask (foo, bar) { + span = llmobs._active() + throw new Error('error') + } + + const wrappedMyTask = llmobs.wrap({ kind: 'task' }, myTask) + + expect(() => wrappedMyTask('foo', 'bar')).to.throw() + + expect(LLMObsTagger.tagMap.get(span)).to.deep.equal({ + '_ml_obs.meta.span.kind': 'task', + '_ml_obs.meta.ml_app': 'mlApp', + '_ml_obs.llmobs_parent_id': 'undefined', + '_ml_obs.meta.input.value': JSON.stringify({ foo: 'foo', bar: 'bar' }) + }) + }) + + it('only auto-annotates input on error for promises', () => { + let span + function myTask (foo, bar) { + span = llmobs._active() + return Promise.reject(new Error('error')) + } + + const wrappedMyTask = llmobs.wrap({ kind: 'task' }, myTask) + + return wrappedMyTask('foo', 'bar') + .catch(() => { + expect(LLMObsTagger.tagMap.get(span)).to.deep.equal({ + '_ml_obs.meta.span.kind': 'task', + '_ml_obs.meta.ml_app': 'mlApp', + '_ml_obs.llmobs_parent_id': 'undefined', + '_ml_obs.meta.input.value': JSON.stringify({ foo: 'foo', bar: 'bar' }) + }) + }) + }) + + it('auto-annotates the inputs of the callback function as the outputs for the span', () => { + let span + function myWorkflow (input, cb) { + span = llmobs._active() + setTimeout(() => { + cb(null, 'output') + }, 1000) + } + + const wrappedMyWorkflow = llmobs.wrap({ kind: 'workflow' }, myWorkflow) + wrappedMyWorkflow('input', (err, res) => { + expect(err).to.not.exist + expect(res).to.equal('output') + }) + + clock.tick(1000) + + expect(LLMObsTagger.tagMap.get(span)).to.deep.equal({ + '_ml_obs.meta.span.kind': 'workflow', + '_ml_obs.meta.ml_app': 'mlApp', + '_ml_obs.llmobs_parent_id': 'undefined', + '_ml_obs.meta.input.value': JSON.stringify({ input: 'input' }), + '_ml_obs.meta.output.value': 'output' + }) + }) + + it('ignores the error portion of the callback for auto-annotation', () => { + let span + function myWorkflow (input, cb) { + span = llmobs._active() + setTimeout(() => { + cb(new Error('error'), 'output') + }, 1000) + } + + const wrappedMyWorkflow = llmobs.wrap({ kind: 'workflow' }, myWorkflow) + wrappedMyWorkflow('input', (err, res) => { + expect(err).to.exist + expect(res).to.equal('output') + }) + + clock.tick(1000) + + expect(LLMObsTagger.tagMap.get(span)).to.deep.equal({ + '_ml_obs.meta.span.kind': 'workflow', + '_ml_obs.meta.ml_app': 'mlApp', + '_ml_obs.llmobs_parent_id': 'undefined', + '_ml_obs.meta.input.value': JSON.stringify({ input: 'input' }), + '_ml_obs.meta.output.value': 'output' + }) + }) + + it('auto-annotates the first argument of the callback as the output if it is not an error', () => { + let span + function myWorkflow (input, cb) { + span = llmobs._active() + setTimeout(() => { + cb('output', 'ignore') + }, 1000) + } + + const wrappedMyWorkflow = llmobs.wrap({ kind: 'workflow' }, myWorkflow) + wrappedMyWorkflow('input', (res, irrelevant) => { + expect(res).to.equal('output') + expect(irrelevant).to.equal('ignore') + }) + + clock.tick(1000) + + expect(LLMObsTagger.tagMap.get(span)).to.deep.equal({ + '_ml_obs.meta.span.kind': 'workflow', + '_ml_obs.meta.ml_app': 'mlApp', + '_ml_obs.llmobs_parent_id': 'undefined', + '_ml_obs.meta.input.value': JSON.stringify({ input: 'input' }), + '_ml_obs.meta.output.value': 'output' + }) + }) + + it('maintains context consistent with the tracer', () => { + let llmSpan, workflowSpan, taskSpan + + function myLlm (input, cb) { + llmSpan = llmobs._active() + setTimeout(() => { + cb(null, 'output') + }, 1000) + } + const myWrappedLlm = llmobs.wrap({ kind: 'llm' }, myLlm) + + llmobs.trace({ kind: 'workflow', name: 'myWorkflow' }, _workflow => { + workflowSpan = _workflow + tracer.trace('apmOperation', () => { + myWrappedLlm('input', (err, res) => { + expect(err).to.not.exist + expect(res).to.equal('output') + llmobs.trace({ kind: 'task', name: 'afterLlmTask' }, _task => { + taskSpan = _task + + const llmParentId = LLMObsTagger.tagMap.get(llmSpan)['_ml_obs.llmobs_parent_id'] + expect(llmParentId).to.equal(workflowSpan.context().toSpanId()) + + const taskParentId = LLMObsTagger.tagMap.get(taskSpan)['_ml_obs.llmobs_parent_id'] + expect(taskParentId).to.equal(workflowSpan.context().toSpanId()) + }) + }) + }) + }) + }) + // TODO: need span kind optional for this test it.skip('sets the span name to "unnamed-anonymous-function" if no name is provided', () => { let span