Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 110 additions & 6 deletions packages/opentelemetry-instrumentation-ws/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
/* eslint-disable @typescript-eslint/no-this-alias */
/* eslint-disable @typescript-eslint/ban-types */
import { context, Context, diag, propagation, ROOT_CONTEXT, Span, SpanKind, SpanStatusCode, trace } from "@opentelemetry/api";
import { RPCMetadata, RPCType, setRPCMetadata } from "@opentelemetry/core";
import type { Context, Link, Span } from "@opentelemetry/api";
import { context, diag, propagation, ROOT_CONTEXT, SpanKind, SpanStatusCode, trace } from "@opentelemetry/api";
import type { RPCMetadata } from "@opentelemetry/core";
import { RPCType, setRPCMetadata } from "@opentelemetry/core";
import {
InstrumentationBase,
InstrumentationNodeModuleDefinition,
Expand All @@ -12,11 +14,12 @@ import { getIncomingRequestAttributes } from "@opentelemetry/instrumentation-htt
import { SemanticAttributes } from "@opentelemetry/semantic-conventions";
import type * as http from "http";
import type * as https from "http";
import { IncomingMessage } from "http";
import type { IncomingMessage } from "http";
import isPromise from "is-promise";
import { Duplex } from "stream";
import WS, { ErrorEvent, Server, WebSocket } from "ws";
import { WSInstrumentationConfig } from "./types";
import type { Duplex } from "stream";
import type WS from "ws";
import type { ErrorEvent, Server, WebSocket } from "ws";
import type { WSInstrumentationConfig } from "./types";

const endSpan = (traced: () => any | Promise<any>, span: Span) => {
try {
Expand Down Expand Up @@ -160,8 +163,11 @@ export class WSInstrumentation extends InstrumentationBase<WS> {
}

if (address != null) {
const [root, links] = getRootLinks(WSInstrumentationContext.CONNECT_ROOT);
connectingSpan = self.tracer.startSpan(`WS connect`, {
kind: SpanKind.CLIENT,
root,
links,
attributes: {
[SemanticAttributes.MESSAGING_SYSTEM]: "ws",
[SemanticAttributes.MESSAGING_DESTINATION_KIND]: "websocket",
Expand Down Expand Up @@ -201,8 +207,11 @@ export class WSInstrumentation extends InstrumentationBase<WS> {
}

this.once("open", () => {
const [root, links] = getRootLinks(WSInstrumentationContext.OPEN_ROOT);
this._openSpan = self.tracer.startSpan(`WS open`, {
kind: connectingSpan ? SpanKind.CLIENT : SpanKind.SERVER,
root,
links,
attributes: {
[SemanticAttributes.MESSAGING_SYSTEM]: "ws",
[SemanticAttributes.MESSAGING_DESTINATION_KIND]: "websocket",
Expand Down Expand Up @@ -250,8 +259,11 @@ export class WSInstrumentation extends InstrumentationBase<WS> {
options = {};
}

const [root, links] = getRootLinks(WSInstrumentationContext.SEND_ROOT);
const span = self.tracer.startSpan(`WS send`, {
kind: SpanKind.CLIENT,
root,
links,
attributes: {
[SemanticAttributes.MESSAGING_DESTINATION]: this.url,
},
Expand Down Expand Up @@ -296,8 +308,11 @@ export class WSInstrumentation extends InstrumentationBase<WS> {
const self = this;

return function (this: ExtendedWebsocket, ...args: any[]) {
const [root, links] = getRootLinks(WSInstrumentationContext.CLOSE_ROOT);
const span = self.tracer.startSpan(`WS close`, {
kind: SpanKind.CLIENT,
root,
links,
attributes: {
[SemanticAttributes.MESSAGING_DESTINATION]: this.url,
},
Expand Down Expand Up @@ -387,8 +402,11 @@ export class WSInstrumentation extends InstrumentationBase<WS> {
callback: (client: WebSocket, request: IncomingMessage) => void
) {
const parentSpan = self._requestSpans.get(request);
const [root, links] = getRootLinks(WSInstrumentationContext.UPGRADE_ROOT, parentSpan);
const span = self.tracer.startSpan(`WS upgrade`, {
kind: SpanKind.SERVER,
root,
links,
attributes: getIncomingRequestAttributes(request, {
component: "WS",
hookAttributes: {
Expand Down Expand Up @@ -430,3 +448,89 @@ export class WSInstrumentation extends InstrumentationBase<WS> {
};
};
}

/**
* Context keys for the WSInstrumentation.
*/
export const WSInstrumentationContext = Object.freeze({
/**
* Whether the "WS connect" span is a root span.
*/
CONNECT_ROOT: Symbol.for("opentelemetry-instrumentation-ws/connect-root"),

/**
* Whether the "WS open" span is a root span.
*/
OPEN_ROOT: Symbol.for("opentelemetry-instrumentation-ws/open-root"),

/**
* Whether the "WS send" span is a root span.
*/
SEND_ROOT: Symbol.for("opentelemetry-instrumentation-ws/send-root"),

/**
* Whether the "WS close" span is a root span.
*/
CLOSE_ROOT: Symbol.for("opentelemetry-instrumentation-ws/close-root"),

/**
* Whether the "WS upgrade" span is a root span.
*/
UPGRADE_ROOT: Symbol.for("opentelemetry-instrumentation-ws/upgrade-root"),
});

export type WSInstrumentationContext = (typeof WSInstrumentationContext)[keyof typeof WSInstrumentationContext];

function getRootLinks(contextKey: WSInstrumentationContext, parentSpan = trace.getSpan(context.active())): [boolean | undefined, Link[]] {
const links: Link[] = [];
const root = context.active().getValue(contextKey) as boolean | undefined;
if (root && parentSpan) {
links.push({ context: parentSpan.spanContext() });
}
return [root, links];
}

/**
* Makes all "WS connect" spans created in the callback be root spans.
* @param callback - The callback to execute.
* @returns The result of the callback.
*/
export function withWSConnectRoot<T>(callback: () => T): T {
return context.with(context.active().setValue(WSInstrumentationContext.CONNECT_ROOT, true), callback);
}

/**
* Makes all "WS open" spans created in the callback be root spans.
* @param callback - The callback to execute.
* @returns The result of the callback.
*/
export function withWSOpenRoot<T>(callback: () => T): T {
return context.with(context.active().setValue(WSInstrumentationContext.OPEN_ROOT, true), callback);
}

/**
* Makes all "WS send" spans created in the callback be root spans.
* @param callback - The callback to execute.
* @returns The result of the callback.
*/
export function withWSSendRoot<T>(callback: () => T): T {
return context.with(context.active().setValue(WSInstrumentationContext.SEND_ROOT, true), callback);
}

/**
* Makes all "WS close" spans created in the callback be root spans.
* @param callback - The callback to execute.
* @returns The result of the callback.
*/
export function withWSCloseRoot<T>(callback: () => T): T {
return context.with(context.active().setValue(WSInstrumentationContext.CLOSE_ROOT, true), callback);
}

/**
* Makes all "WS upgrade" spans created in the callback be root spans.
* @param callback - The callback to execute.
* @returns The result of the callback.
*/
export function withWSUpgradeRoot<T>(callback: () => T): T {
return context.with(context.active().setValue(WSInstrumentationContext.UPGRADE_ROOT, true), callback);
}
Loading