Skip to content

Commit 0179726

Browse files
Memory fix for trace's streamWrapper. (#25089)
1 parent 0957f7d commit 0179726

File tree

2 files changed

+102
-25
lines changed

2 files changed

+102
-25
lines changed

packages/core/src/telemetry/trace.test.ts

Lines changed: 53 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,32 +4,37 @@
44
* SPDX-License-Identifier: Apache-2.0
55
*/
66

7-
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
8-
import { trace, SpanStatusCode, diag, type Tracer } from '@opentelemetry/api';
9-
import { runInDevTraceSpan, truncateForTelemetry } from './trace.js';
7+
import { diag, SpanStatusCode, trace } from '@opentelemetry/api';
8+
import type { Tracer } from '@opentelemetry/api';
9+
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
10+
1011
import {
11-
GeminiCliOperation,
12-
GEN_AI_CONVERSATION_ID,
1312
GEN_AI_AGENT_DESCRIPTION,
1413
GEN_AI_AGENT_NAME,
14+
GEN_AI_CONVERSATION_ID,
1515
GEN_AI_INPUT_MESSAGES,
1616
GEN_AI_OPERATION_NAME,
1717
GEN_AI_OUTPUT_MESSAGES,
18+
GeminiCliOperation,
1819
SERVICE_DESCRIPTION,
1920
SERVICE_NAME,
2021
} from './constants.js';
22+
import {
23+
runInDevTraceSpan,
24+
spanRegistry,
25+
truncateForTelemetry,
26+
} from './trace.js';
2127

2228
vi.mock('@opentelemetry/api', async (importOriginal) => {
23-
const original = await importOriginal<typeof import('@opentelemetry/api')>();
24-
return {
25-
...original,
29+
const original = await importOriginal();
30+
return Object.assign({}, original, {
2631
trace: {
2732
getTracer: vi.fn(),
2833
},
2934
diag: {
3035
error: vi.fn(),
3136
},
32-
};
37+
});
3338
});
3439

3540
vi.mock('../utils/session.js', () => ({
@@ -207,6 +212,45 @@ describe('runInDevTraceSpan', () => {
207212
expect(mockSpan.end).toHaveBeenCalled();
208213
});
209214

215+
it('should register async generators with spanRegistry', async () => {
216+
const spy = vi.spyOn(spanRegistry, 'register');
217+
async function* testStream() {
218+
yield 1;
219+
}
220+
221+
const resultStream = await runInDevTraceSpan(
222+
{ operation: GeminiCliOperation.LLMCall, sessionId: 'test-session-id' },
223+
async () => testStream(),
224+
);
225+
226+
expect(spy).toHaveBeenCalledWith(resultStream, expect.any(Function));
227+
});
228+
229+
it('should be idempotent and call span.end only once', async () => {
230+
vi.spyOn(spanRegistry, 'register');
231+
async function* testStream() {
232+
yield 1;
233+
}
234+
235+
const resultStream = await runInDevTraceSpan(
236+
{ operation: GeminiCliOperation.LLMCall, sessionId: 'test-session-id' },
237+
async () => testStream(),
238+
);
239+
240+
// Simulate completion
241+
for await (const _ of resultStream) {
242+
// iterate
243+
}
244+
expect(mockSpan.end).toHaveBeenCalledTimes(1);
245+
246+
// Try to end again (simulating registry or double call)
247+
const endSpanFn = vi.mocked(spanRegistry.register).mock
248+
.calls[0][1] as () => void;
249+
endSpanFn();
250+
251+
expect(mockSpan.end).toHaveBeenCalledTimes(1);
252+
});
253+
210254
it('should end span automatically on error in async iterators', async () => {
211255
const error = new Error('streaming error');
212256
async function* errorStream() {

packages/core/src/telemetry/trace.ts

Lines changed: 49 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@ import {
1111
type AttributeValue,
1212
type SpanOptions,
1313
} from '@opentelemetry/api';
14+
15+
import { debugLogger } from '../utils/debugLogger.js';
1416
import { safeJsonStringify } from '../utils/safeJsonStringify.js';
17+
import { truncateString } from '../utils/textUtils.js';
1518
import {
16-
type GeminiCliOperation,
1719
GEN_AI_AGENT_DESCRIPTION,
1820
GEN_AI_AGENT_NAME,
1921
GEN_AI_CONVERSATION_ID,
@@ -22,34 +24,55 @@ import {
2224
GEN_AI_OUTPUT_MESSAGES,
2325
SERVICE_DESCRIPTION,
2426
SERVICE_NAME,
27+
type GeminiCliOperation,
2528
} from './constants.js';
2629

27-
import { truncateString } from '../utils/textUtils.js';
28-
2930
const TRACER_NAME = 'gemini-cli';
3031
const TRACER_VERSION = 'v1';
3132

33+
/**
34+
* Registry used to ensure that spans are properly ended when their associated
35+
* async objects are garbage collected.
36+
*/
37+
export const spanRegistry = new FinalizationRegistry((endSpan: () => void) => {
38+
try {
39+
endSpan();
40+
} catch (e) {
41+
debugLogger.warn(
42+
'Error in FinalizationRegistry callback for span cleanup',
43+
e,
44+
);
45+
}
46+
});
47+
48+
/**
49+
* Truncates a value for inclusion in telemetry attributes.
50+
*
51+
* @param value The value to truncate.
52+
* @param maxLength The maximum length of the stringified value.
53+
* @returns The truncated value, or undefined if the value type is not supported.
54+
*/
3255
export function truncateForTelemetry(
3356
value: unknown,
34-
maxLength: number = 10000,
57+
maxLength = 10000,
3558
): AttributeValue | undefined {
3659
if (typeof value === 'string') {
3760
return truncateString(
3861
value,
3962
maxLength,
4063
`...[TRUNCATED: original length ${value.length}]`,
41-
);
64+
) as AttributeValue;
4265
}
4366
if (typeof value === 'object' && value !== null) {
4467
const stringified = safeJsonStringify(value);
4568
return truncateString(
4669
stringified,
4770
maxLength,
4871
`...[TRUNCATED: original length ${stringified.length}]`,
49-
);
72+
) as AttributeValue;
5073
}
5174
if (typeof value === 'number' || typeof value === 'boolean') {
52-
return value;
75+
return value as AttributeValue;
5376
}
5477
return undefined;
5578
}
@@ -82,12 +105,15 @@ export interface SpanMetadata {
82105
*
83106
* @example
84107
* ```typescript
85-
* runInDevTraceSpan({ name: 'my-operation' }, ({ metadata }) => {
86-
* metadata.input = { foo: 'bar' };
87-
* // ... do work ...
88-
* metadata.output = { result: 'baz' };
89-
* metadata.attributes['my.custom.attribute'] = 'some-value';
90-
* });
108+
* await runInDevTraceSpan(
109+
* { operation: GeminiCliOperation.LLMCall, sessionId: 'my-session' },
110+
* async ({ metadata }) => {
111+
* metadata.input = { foo: 'bar' };
112+
* // ... do work ...
113+
* metadata.output = { result: 'baz' };
114+
* metadata.attributes['my.custom.attribute'] = 'some-value';
115+
* }
116+
* );
91117
* ```
92118
*
93119
* @param opts The options for the span.
@@ -115,7 +141,12 @@ export async function runInDevTraceSpan<R>(
115141
[GEN_AI_CONVERSATION_ID]: sessionId,
116142
},
117143
};
144+
let spanEnded = false;
118145
const endSpan = () => {
146+
if (spanEnded) {
147+
return;
148+
}
149+
spanEnded = true;
119150
try {
120151
if (logPrompts !== false) {
121152
if (meta.input !== undefined) {
@@ -169,18 +200,20 @@ export async function runInDevTraceSpan<R>(
169200
const streamWrapper = (async function* () {
170201
try {
171202
yield* result;
172-
} catch (e) {
203+
} catch (e: unknown) {
173204
meta.error = e;
174205
throw e;
175206
} finally {
176207
endSpan();
177208
}
178209
})();
179210

180-
return Object.assign(streamWrapper, result);
211+
const finalResult = Object.assign(streamWrapper, result);
212+
spanRegistry.register(finalResult, endSpan);
213+
return finalResult;
181214
}
182215
return result;
183-
} catch (e) {
216+
} catch (e: unknown) {
184217
meta.error = e;
185218
throw e;
186219
} finally {

0 commit comments

Comments
 (0)