From 5efa38ab114f3f4672204296e59945b41daa0e6e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Scott=20C=C3=B4t=C3=A9?= Date: Wed, 30 Jul 2025 13:42:33 -0400 Subject: [PATCH] Add WS root span option --- .../src/index.ts | 116 +++++++++++++++++- 1 file changed, 110 insertions(+), 6 deletions(-) diff --git a/packages/opentelemetry-instrumentation-ws/src/index.ts b/packages/opentelemetry-instrumentation-ws/src/index.ts index 31ea9e0..d23d1f2 100644 --- a/packages/opentelemetry-instrumentation-ws/src/index.ts +++ b/packages/opentelemetry-instrumentation-ws/src/index.ts @@ -1,7 +1,9 @@ /* eslint-disable @typescript-eslint/no-this-alias */ /* eslint-disable @typescript-eslint/ban-types */ -import { context, Context, diag, propagation, ROOT_CONTEXT, Span, SpanKind, SpanStatusCode, trace } from "@opentelemetry/api"; -import { RPCMetadata, RPCType, setRPCMetadata } from "@opentelemetry/core"; +import type { Context, Link, Span } from "@opentelemetry/api"; +import { context, diag, propagation, ROOT_CONTEXT, SpanKind, SpanStatusCode, trace } from "@opentelemetry/api"; +import type { RPCMetadata } from "@opentelemetry/core"; +import { RPCType, setRPCMetadata } from "@opentelemetry/core"; import { InstrumentationBase, InstrumentationNodeModuleDefinition, @@ -12,11 +14,12 @@ import { getIncomingRequestAttributes } from "@opentelemetry/instrumentation-htt import { SemanticAttributes } from "@opentelemetry/semantic-conventions"; import type * as http from "http"; import type * as https from "http"; -import { IncomingMessage } from "http"; +import type { IncomingMessage } from "http"; import isPromise from "is-promise"; -import { Duplex } from "stream"; -import WS, { ErrorEvent, Server, WebSocket } from "ws"; -import { WSInstrumentationConfig } from "./types"; +import type { Duplex } from "stream"; +import type WS from "ws"; +import type { ErrorEvent, Server, WebSocket } from "ws"; +import type { WSInstrumentationConfig } from "./types"; const endSpan = (traced: () => any | Promise, span: Span) => { try { @@ -160,8 +163,11 @@ export class WSInstrumentation extends InstrumentationBase { } if (address != null) { + const [root, links] = getRootLinks(WSInstrumentationContext.CONNECT_ROOT); connectingSpan = self.tracer.startSpan(`WS connect`, { kind: SpanKind.CLIENT, + root, + links, attributes: { [SemanticAttributes.MESSAGING_SYSTEM]: "ws", [SemanticAttributes.MESSAGING_DESTINATION_KIND]: "websocket", @@ -201,8 +207,11 @@ export class WSInstrumentation extends InstrumentationBase { } this.once("open", () => { + const [root, links] = getRootLinks(WSInstrumentationContext.OPEN_ROOT); this._openSpan = self.tracer.startSpan(`WS open`, { kind: connectingSpan ? SpanKind.CLIENT : SpanKind.SERVER, + root, + links, attributes: { [SemanticAttributes.MESSAGING_SYSTEM]: "ws", [SemanticAttributes.MESSAGING_DESTINATION_KIND]: "websocket", @@ -250,8 +259,11 @@ export class WSInstrumentation extends InstrumentationBase { options = {}; } + const [root, links] = getRootLinks(WSInstrumentationContext.SEND_ROOT); const span = self.tracer.startSpan(`WS send`, { kind: SpanKind.CLIENT, + root, + links, attributes: { [SemanticAttributes.MESSAGING_DESTINATION]: this.url, }, @@ -296,8 +308,11 @@ export class WSInstrumentation extends InstrumentationBase { const self = this; return function (this: ExtendedWebsocket, ...args: any[]) { + const [root, links] = getRootLinks(WSInstrumentationContext.CLOSE_ROOT); const span = self.tracer.startSpan(`WS close`, { kind: SpanKind.CLIENT, + root, + links, attributes: { [SemanticAttributes.MESSAGING_DESTINATION]: this.url, }, @@ -387,8 +402,11 @@ export class WSInstrumentation extends InstrumentationBase { callback: (client: WebSocket, request: IncomingMessage) => void ) { const parentSpan = self._requestSpans.get(request); + const [root, links] = getRootLinks(WSInstrumentationContext.UPGRADE_ROOT, parentSpan); const span = self.tracer.startSpan(`WS upgrade`, { kind: SpanKind.SERVER, + root, + links, attributes: getIncomingRequestAttributes(request, { component: "WS", hookAttributes: { @@ -430,3 +448,89 @@ export class WSInstrumentation extends InstrumentationBase { }; }; } + +/** + * Context keys for the WSInstrumentation. + */ +export const WSInstrumentationContext = Object.freeze({ + /** + * Whether the "WS connect" span is a root span. + */ + CONNECT_ROOT: Symbol.for("opentelemetry-instrumentation-ws/connect-root"), + + /** + * Whether the "WS open" span is a root span. + */ + OPEN_ROOT: Symbol.for("opentelemetry-instrumentation-ws/open-root"), + + /** + * Whether the "WS send" span is a root span. + */ + SEND_ROOT: Symbol.for("opentelemetry-instrumentation-ws/send-root"), + + /** + * Whether the "WS close" span is a root span. + */ + CLOSE_ROOT: Symbol.for("opentelemetry-instrumentation-ws/close-root"), + + /** + * Whether the "WS upgrade" span is a root span. + */ + UPGRADE_ROOT: Symbol.for("opentelemetry-instrumentation-ws/upgrade-root"), +}); + +export type WSInstrumentationContext = (typeof WSInstrumentationContext)[keyof typeof WSInstrumentationContext]; + +function getRootLinks(contextKey: WSInstrumentationContext, parentSpan = trace.getSpan(context.active())): [boolean | undefined, Link[]] { + const links: Link[] = []; + const root = context.active().getValue(contextKey) as boolean | undefined; + if (root && parentSpan) { + links.push({ context: parentSpan.spanContext() }); + } + return [root, links]; +} + +/** + * Makes all "WS connect" spans created in the callback be root spans. + * @param callback - The callback to execute. + * @returns The result of the callback. + */ +export function withWSConnectRoot(callback: () => T): T { + return context.with(context.active().setValue(WSInstrumentationContext.CONNECT_ROOT, true), callback); +} + +/** + * Makes all "WS open" spans created in the callback be root spans. + * @param callback - The callback to execute. + * @returns The result of the callback. + */ +export function withWSOpenRoot(callback: () => T): T { + return context.with(context.active().setValue(WSInstrumentationContext.OPEN_ROOT, true), callback); +} + +/** + * Makes all "WS send" spans created in the callback be root spans. + * @param callback - The callback to execute. + * @returns The result of the callback. + */ +export function withWSSendRoot(callback: () => T): T { + return context.with(context.active().setValue(WSInstrumentationContext.SEND_ROOT, true), callback); +} + +/** + * Makes all "WS close" spans created in the callback be root spans. + * @param callback - The callback to execute. + * @returns The result of the callback. + */ +export function withWSCloseRoot(callback: () => T): T { + return context.with(context.active().setValue(WSInstrumentationContext.CLOSE_ROOT, true), callback); +} + +/** + * Makes all "WS upgrade" spans created in the callback be root spans. + * @param callback - The callback to execute. + * @returns The result of the callback. + */ +export function withWSUpgradeRoot(callback: () => T): T { + return context.with(context.active().setValue(WSInstrumentationContext.UPGRADE_ROOT, true), callback); +}