From 7fcbd5d2e7b9f05d7b60913643ea308100834c41 Mon Sep 17 00:00:00 2001 From: William Conti Date: Mon, 12 May 2025 14:08:01 -0400 Subject: [PATCH 01/21] fix kafka asyncstorage --- .../src/confluentinc-kafka-javascript.js | 116 +++++++++--------- .../datadog-instrumentations/src/kafkajs.js | 76 ++++++------ .../datadog-plugin-kafkajs/src/consumer.js | 12 +- .../datadog-plugin-kafkajs/src/producer.js | 7 +- packages/dd-trace/src/plugins/consumer.js | 4 +- packages/dd-trace/src/plugins/producer.js | 4 +- packages/dd-trace/src/plugins/tracing.js | 2 +- 7 files changed, 118 insertions(+), 103 deletions(-) diff --git a/packages/datadog-instrumentations/src/confluentinc-kafka-javascript.js b/packages/datadog-instrumentations/src/confluentinc-kafka-javascript.js index c4d29e2abd2..5243180b269 100644 --- a/packages/datadog-instrumentations/src/confluentinc-kafka-javascript.js +++ b/packages/datadog-instrumentations/src/confluentinc-kafka-javascript.js @@ -2,8 +2,7 @@ const { addHook, - channel, - AsyncResource + channel } = require('./helpers/instrument') const shimmer = require('../../datadog-shimmer') @@ -58,23 +57,22 @@ function instrumentBaseModule (module) { const brokers = this.globalConfig?.['bootstrap.servers'] - const asyncResource = new AsyncResource('bound-anonymous-fn') - return asyncResource.runInAsyncScope(() => { - try { - channels.producerStart.publish({ - topic, - messages: [{ key, value: message }], - bootstrapServers: brokers - }) + const ctx = {} + ctx.topic = topic + ctx.messages = [{ key, value: message }] + ctx.bootstrapServers = brokers + return channels.producerStart.runStores(ctx, () => { + try { const result = produce.apply(this, arguments) channels.producerCommit.publish(undefined) - channels.producerFinish.publish(undefined) + channels.producerFinish.runStores(ctx, () => {}) return result } catch (error) { - channels.producerError.publish(error) - channels.producerFinish.publish(undefined) + ctx.err = error + channels.producerError.publish(ctx) + channels.producerFinish.runStores(ctx, () => {}) throw error } }) @@ -106,32 +104,35 @@ function instrumentBaseModule (module) { callback = numMessages } + const ctx = {} // Handle callback-based consumption if (typeof callback === 'function') { return consume.call(this, numMessages, function wrappedCallback (err, messages) { if (messages && messages.length > 0) { messages.forEach(message => { - channels.consumerStart.publish({ - topic: message?.topic, - partition: message?.partition, - message, - groupId - }) + ctx.topic = message?.topic + ctx.partition = message?.partition + ctx.message = message + ctx.groupId = groupId + + channels.consumerStart.runStores(ctx, () => {}) updateLatestOffset(message?.topic, message?.partition, message?.offset, groupId) }) } if (err) { - channels.consumerError.publish(err) + ctx.err = err + channels.consumerError.publish(ctx) } try { const result = callback.apply(this, arguments) - channels.consumerFinish.publish(undefined) + channels.consumerFinish.runStores(ctx, () => {}) return result } catch (error) { - channels.consumerError.publish(error) - channels.consumerFinish.publish(undefined) + ctx.err = error + channels.consumerError.publish(ctx) + channels.consumerFinish.runStores(ctx, () => {}) throw error } }) @@ -200,33 +201,31 @@ function instrumentKafkaJS (kafkaJS) { return send.apply(this, arguments) } - const asyncResource = new AsyncResource('bound-anonymous-fn') - return asyncResource.runInAsyncScope(() => { - try { - channels.producerStart.publish({ - topic: payload?.topic, - messages: payload?.messages || [], - bootstrapServers: kafka._ddBrokers - }) + const ctx = {} + ctx.topic = payload?.topic + ctx.messages = payload?.messages || [] + ctx.bootstrapServers = kafka._ddBrokers + return channels.producerStart.runStores(ctx, () => { + try { const result = send.apply(this, arguments) - result.then( - asyncResource.bind(res => { - channels.producerCommit.publish(res) - channels.producerFinish.publish(undefined) - }), - asyncResource.bind(err => { - if (err) { - channels.producerError.publish(err) - } - channels.producerFinish.publish(undefined) - }) - ) + result.then((res) => { + ctx.res = res + channels.producerCommit.publish(ctx) + channels.producerFinish.publish(undefined) + }, (err) => { + if (err) { + ctx.err = err + channels.producerError.publish(ctx) + } + channels.producerFinish.publish(undefined) + }) return result } catch (e) { - channels.producerError.publish(e) + ctx.err = e + channels.producerError.publish(ctx) channels.producerFinish.publish(undefined) throw e } @@ -335,10 +334,10 @@ function wrapKafkaCallback (callback, { startCh, commitCh, finishCh, errorCh }, return function wrappedKafkaCallback (payload) { const commitPayload = getPayload(payload) - const asyncResource = new AsyncResource('bound-anonymous-fn') - return asyncResource.runInAsyncScope(() => { - startCh.publish(commitPayload) + const ctx = {} + ctx.commitPayload = commitPayload + return startCh.runStores(ctx, () => { updateLatestOffset(commitPayload?.topic, commitPayload?.partition, commitPayload?.offset, commitPayload?.groupId) try { @@ -346,22 +345,25 @@ function wrapKafkaCallback (callback, { startCh, commitCh, finishCh, errorCh }, if (result && typeof result.then === 'function') { return result - .then(asyncResource.bind(res => { - finishCh.publish(undefined) + .then((res) => { + ctx.res = res + finishCh.runStores(ctx, () => {}) return res - })) - .catch(asyncResource.bind(err => { - errorCh.publish(err) - finishCh.publish(undefined) + }) + .catch((err) => { + ctx.err = err + errorCh.publish(ctx) + finishCh.runStores(ctx, () => {}) throw err - })) + }) } else { - finishCh.publish(undefined) + finishCh.runStores(ctx, () => {}) return result } } catch (error) { - errorCh.publish(error) - finishCh.publish(undefined) + ctx.err = error + errorCh.publish(ctx) + finishCh.runStores(ctx, () => {}) throw error } }) diff --git a/packages/datadog-instrumentations/src/kafkajs.js b/packages/datadog-instrumentations/src/kafkajs.js index e75c03e7e64..dab2f908c97 100644 --- a/packages/datadog-instrumentations/src/kafkajs.js +++ b/packages/datadog-instrumentations/src/kafkajs.js @@ -2,8 +2,7 @@ const { channel, - addHook, - AsyncResource + addHook } = require('./helpers/instrument') const shimmer = require('../../datadog-shimmer') @@ -56,41 +55,42 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf producer.send = function () { const wrappedSend = (clusterId) => { - const innerAsyncResource = new AsyncResource('bound-anonymous-fn') - - return innerAsyncResource.runInAsyncScope(() => { - if (!producerStartCh.hasSubscribers) { - return send.apply(this, arguments) + const ctx = {} + ctx.bootstrapServers = bootstrapServers + ctx.clusterId = clusterId + + const { topic, messages = [] } = arguments[0] + for (const message of messages) { + if (message !== null && typeof message === 'object') { + message.headers = message.headers || {} } + } + ctx.topic = topic + ctx.messages = messages + return producerStartCh.runStores(ctx, () => { try { - const { topic, messages = [] } = arguments[0] - for (const message of messages) { - if (message !== null && typeof message === 'object') { - message.headers = message.headers || {} - } - } - producerStartCh.publish({ topic, messages, bootstrapServers, clusterId }) - const result = send.apply(this, arguments) result.then( - innerAsyncResource.bind(res => { - producerFinishCh.publish(undefined) - producerCommitCh.publish(res) - }), - innerAsyncResource.bind(err => { + (res) => { + ctx.res = res + producerFinishCh.runStores(ctx, () => {}) + producerCommitCh.publish(ctx) + }, + (err) => { + ctx.err = err if (err) { - producerErrorCh.publish(err) + producerErrorCh.publish(ctx) } - producerFinishCh.publish(undefined) + producerFinishCh.runStores(ctx, () => {}) }) - ) return result } catch (e) { - producerErrorCh.publish(e) - producerFinishCh.publish(undefined) + ctx.err = e + producerErrorCh.publish(ctx) + producerFinishCh.runStores(ctx, () => {}) throw e } }) @@ -175,29 +175,33 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf const wrappedCallback = (fn, startCh, finishCh, errorCh, extractArgs, clusterId) => { return typeof fn === 'function' ? function (...args) { - const innerAsyncResource = new AsyncResource('bound-anonymous-fn') - return innerAsyncResource.runInAsyncScope(() => { - const extractedArgs = extractArgs(args, clusterId) + const ctx = {} + const extractedArgs = extractArgs(args, clusterId) + ctx.extractedArgs = extractedArgs - startCh.publish(extractedArgs) + return startCh.runStores(ctx, () => { try { const result = fn.apply(this, args) if (result && typeof result.then === 'function') { result.then( - innerAsyncResource.bind(() => finishCh.publish(undefined)), - innerAsyncResource.bind(err => { + (res) => { + ctx.res = res + finishCh.runStores(ctx, () => {}) + }, + (err) => { + ctx.err = err if (err) { - errorCh.publish(err) + errorCh.publish(ctx) } - finishCh.publish(undefined) + finishCh.runStores(ctx, () => {}) }) - ) } else { - finishCh.publish(undefined) + finishCh.runStores(ctx, () => {}) } return result } catch (e) { - errorCh.publish(e) + ctx.err = e + errorCh.publish(ctx) finishCh.publish(undefined) throw e } diff --git a/packages/datadog-plugin-kafkajs/src/consumer.js b/packages/datadog-plugin-kafkajs/src/consumer.js index 836e0edcfd9..235dddf24c8 100644 --- a/packages/datadog-plugin-kafkajs/src/consumer.js +++ b/packages/datadog-plugin-kafkajs/src/consumer.js @@ -64,7 +64,9 @@ class KafkajsConsumerPlugin extends ConsumerPlugin { } } - start ({ topic, partition, message, groupId, clusterId }) { + bindStart (ctx) { + const { topic, partition, message, groupId, clusterId } = ctx.extractedArgs + let childOf const headers = convertToTextMap(message?.headers) if (headers) { @@ -83,7 +85,7 @@ class KafkajsConsumerPlugin extends ConsumerPlugin { metrics: { 'kafka.partition': partition } - }) + }, ctx) if (message?.offset) span.setTag('kafka.message.offset', message?.offset) if (this.config.dsmEnabled && headers) { @@ -99,14 +101,18 @@ class KafkajsConsumerPlugin extends ConsumerPlugin { if (afterStartCh.hasSubscribers) { afterStartCh.publish({ topic, partition, message, groupId }) } + + return ctx.currentStore } - finish () { + bindFinish (ctx) { if (beforeFinishCh.hasSubscribers) { beforeFinishCh.publish() } super.finish() + + return ctx.parentStore } } diff --git a/packages/datadog-plugin-kafkajs/src/producer.js b/packages/datadog-plugin-kafkajs/src/producer.js index 2ab6dcec8d9..709f670802c 100644 --- a/packages/datadog-plugin-kafkajs/src/producer.js +++ b/packages/datadog-plugin-kafkajs/src/producer.js @@ -67,7 +67,8 @@ class KafkajsProducerPlugin extends ProducerPlugin { } } - start ({ topic, messages, bootstrapServers, clusterId }) { + bindStart (ctx) { + const { topic, messages, bootstrapServers, clusterId } = ctx const span = this.startSpan({ resource: topic, meta: { @@ -79,7 +80,7 @@ class KafkajsProducerPlugin extends ProducerPlugin { metrics: { 'kafka.batch_size': messages.length } - }) + }, ctx) if (bootstrapServers) { span.setTag(BOOTSTRAP_SERVERS_KEY, bootstrapServers) } @@ -102,6 +103,8 @@ class KafkajsProducerPlugin extends ProducerPlugin { } } } + + return ctx.currentStore } } diff --git a/packages/dd-trace/src/plugins/consumer.js b/packages/dd-trace/src/plugins/consumer.js index 0a1f223b89a..775eea7c3bb 100644 --- a/packages/dd-trace/src/plugins/consumer.js +++ b/packages/dd-trace/src/plugins/consumer.js @@ -7,14 +7,14 @@ class ConsumerPlugin extends InboundPlugin { static get kind () { return 'consumer' } static get type () { return 'messaging' } - startSpan (options) { + startSpan (options, ctx) { if (!options.service) { options.service = this.config.service || this.serviceName() } if (!options.kind) { options.kind = this.constructor.kind } - return super.startSpan(this.operationName(), options) + return super.startSpan(this.operationName(), options, ctx) } } diff --git a/packages/dd-trace/src/plugins/producer.js b/packages/dd-trace/src/plugins/producer.js index 13b9bd84d20..3fc277dbc64 100644 --- a/packages/dd-trace/src/plugins/producer.js +++ b/packages/dd-trace/src/plugins/producer.js @@ -7,7 +7,7 @@ class ProducerPlugin extends OutboundPlugin { static get kind () { return 'producer' } static get type () { return 'messaging' } - startSpan (options) { + startSpan (options, ctx) { const spanDefaults = { kind: this.constructor.kind } @@ -19,7 +19,7 @@ class ProducerPlugin extends OutboundPlugin { if (!options[key]) options[key] = spanDefaults[key] } ) - return super.startSpan(this.operationName(), options) + return super.startSpan(this.operationName(), options, ctx) } } diff --git a/packages/dd-trace/src/plugins/tracing.js b/packages/dd-trace/src/plugins/tracing.js index e9823fd5d3e..03d29769bc2 100644 --- a/packages/dd-trace/src/plugins/tracing.js +++ b/packages/dd-trace/src/plugins/tracing.js @@ -60,7 +60,7 @@ class TracingPlugin extends Plugin { error (ctxOrError) { if (ctxOrError?.currentStore) { - ctxOrError.currentStore?.span.setTag('error', ctxOrError?.error) + ctxOrError.currentStore?.span.setTag('error', ctxOrError?.error || ctxOrError?.err) return } this.addError(ctxOrError) From 88346c1f24e96e4dc146a67cb855bf442f029bf8 Mon Sep 17 00:00:00 2001 From: William Conti Date: Mon, 12 May 2025 15:01:31 -0400 Subject: [PATCH 02/21] fix confluent kafka --- .../src/confluentinc-kafka-javascript.js | 7 +++---- packages/datadog-plugin-kafkajs/src/consumer.js | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/packages/datadog-instrumentations/src/confluentinc-kafka-javascript.js b/packages/datadog-instrumentations/src/confluentinc-kafka-javascript.js index 5243180b269..2d04d770c8b 100644 --- a/packages/datadog-instrumentations/src/confluentinc-kafka-javascript.js +++ b/packages/datadog-instrumentations/src/confluentinc-kafka-javascript.js @@ -66,7 +66,7 @@ function instrumentBaseModule (module) { try { const result = produce.apply(this, arguments) - channels.producerCommit.publish(undefined) + channels.producerCommit.publish(result) channels.producerFinish.runStores(ctx, () => {}) return result } catch (error) { @@ -211,8 +211,7 @@ function instrumentKafkaJS (kafkaJS) { const result = send.apply(this, arguments) result.then((res) => { - ctx.res = res - channels.producerCommit.publish(ctx) + channels.producerCommit.publish(res) channels.producerFinish.publish(undefined) }, (err) => { if (err) { @@ -335,7 +334,7 @@ function wrapKafkaCallback (callback, { startCh, commitCh, finishCh, errorCh }, const commitPayload = getPayload(payload) const ctx = {} - ctx.commitPayload = commitPayload + ctx.extractedArgs = commitPayload return startCh.runStores(ctx, () => { updateLatestOffset(commitPayload?.topic, commitPayload?.partition, commitPayload?.offset, commitPayload?.groupId) diff --git a/packages/datadog-plugin-kafkajs/src/consumer.js b/packages/datadog-plugin-kafkajs/src/consumer.js index 235dddf24c8..8253269c3c7 100644 --- a/packages/datadog-plugin-kafkajs/src/consumer.js +++ b/packages/datadog-plugin-kafkajs/src/consumer.js @@ -65,7 +65,7 @@ class KafkajsConsumerPlugin extends ConsumerPlugin { } bindStart (ctx) { - const { topic, partition, message, groupId, clusterId } = ctx.extractedArgs + const { topic, partition, message, groupId, clusterId } = ctx.extractedArgs || ctx let childOf const headers = convertToTextMap(message?.headers) From 00c5465f2dd39b112ebbdcaafeb26edcf1c9712a Mon Sep 17 00:00:00 2001 From: William Conti Date: Mon, 12 May 2025 16:10:36 -0400 Subject: [PATCH 03/21] run kafka latest --- .github/workflows/plugins.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/plugins.yml b/.github/workflows/plugins.yml index be6db7eb5e7..dcd87681e4d 100644 --- a/.github/workflows/plugins.yml +++ b/.github/workflows/plugins.yml @@ -249,7 +249,7 @@ jobs: strategy: matrix: # using node versions matrix since this plugin testing fails due to install differences between node versions - node-version: ['18', '20', '22'] + node-version: ['18', '20', '22', 'latest'] runs-on: ubuntu-latest services: kafka: From 6f3379ae3594c831fe2fe255b7e516b03830c4e6 Mon Sep 17 00:00:00 2001 From: William Conti Date: Mon, 12 May 2025 16:59:59 -0400 Subject: [PATCH 04/21] fix batch consumer --- packages/datadog-plugin-kafkajs/src/batch-consumer.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/datadog-plugin-kafkajs/src/batch-consumer.js b/packages/datadog-plugin-kafkajs/src/batch-consumer.js index 4c701fac01e..74c3caa1451 100644 --- a/packages/datadog-plugin-kafkajs/src/batch-consumer.js +++ b/packages/datadog-plugin-kafkajs/src/batch-consumer.js @@ -6,7 +6,9 @@ class KafkajsBatchConsumerPlugin extends ConsumerPlugin { static get id () { return 'kafkajs' } static get operation () { return 'consume-batch' } - start ({ topic, partition, messages, groupId, clusterId }) { + start (ctx) { + const { topic, messages, groupId, clusterId } = ctx.extractedArgs || ctx + if (!this.config.dsmEnabled) return for (const message of messages) { if (!message || !message.headers) continue From ece0a0b5824b3e36c702b55d573c9887ea5837bc Mon Sep 17 00:00:00 2001 From: William Conti Date: Mon, 12 May 2025 20:20:44 -0400 Subject: [PATCH 05/21] re run on failure --- .../test/index.spec.js | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js b/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js index eecce58c053..2b1ae8af804 100644 --- a/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js +++ b/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js @@ -370,10 +370,17 @@ describe('Plugin', () => { let consumePromise nativeConsumer.on('ready', () => { // Consume messages - consumePromise = new Promise(resolve => { - nativeConsumer.consume(1, (err, messages) => { - resolve() - }) + consumePromise = new Promise((resolve) => { + const attemptConsume = () => { + nativeConsumer.consume(1, (err, messages) => { + if (err || !messages || messages.length === 0) { + setTimeout(attemptConsume, 100) + return + } + resolve(messages) + }) + } + attemptConsume() nativeProducer.produce(testTopic, null, message, key) }) }) From e923e0307ac52976de8675dadb50f2e6bc211441 Mon Sep 17 00:00:00 2001 From: William Conti Date: Mon, 12 May 2025 20:40:03 -0400 Subject: [PATCH 06/21] another fix --- .../test/index.spec.js | 33 +++++++++++++++---- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js b/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js index 2b1ae8af804..6e4df88b58c 100644 --- a/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js +++ b/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js @@ -371,6 +371,9 @@ describe('Plugin', () => { nativeConsumer.on('ready', () => { // Consume messages consumePromise = new Promise((resolve) => { + const produce = () => { + nativeProducer.produce(testTopic, null, message, key) + } const attemptConsume = () => { nativeConsumer.consume(1, (err, messages) => { if (err || !messages || messages.length === 0) { @@ -381,7 +384,7 @@ describe('Plugin', () => { }) } attemptConsume() - nativeProducer.produce(testTopic, null, message, key) + produce() }) }) @@ -390,7 +393,7 @@ describe('Plugin', () => { return expectedSpanPromise }) - it('should propagate context', async () => { + it('rdKafka API should propagate context', async () => { const expectedSpanPromise = agent.use(traces => { const span = traces[0][0] @@ -410,11 +413,27 @@ describe('Plugin', () => { let consumePromise nativeConsumer.on('ready', () => { // Consume messages - consumePromise = new Promise(resolve => { - nativeConsumer.consume(1, (err, messages) => { - resolve() - }) - nativeProducer.produce(testTopic, null, message, key) + consumePromise = new Promise((resolve) => { + const produce = () => { + nativeProducer.produce(testTopic, null, message, key) + } + const attemptConsume = () => { + nativeConsumer.consume(1, (err, messages) => { + if (err || !messages || messages.length === 0) { + setTimeout(attemptConsume, 100) + return + } + // for some reason, messages occassionally don't arrive with headers + // despite header injection occurring during produce, so retry this case + if (messages && !messages[0].headers) { + setTimeout(produce, 100) + return + } + resolve(messages) + }) + } + attemptConsume() + produce() }) }) From 27e4a7429505314e5e90f2703b463fd1f805dfde Mon Sep 17 00:00:00 2001 From: William Conti Date: Mon, 12 May 2025 20:40:28 -0400 Subject: [PATCH 07/21] change test name --- .../test/index.spec.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js b/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js index 6e4df88b58c..3e91f23f9e9 100644 --- a/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js +++ b/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js @@ -393,7 +393,7 @@ describe('Plugin', () => { return expectedSpanPromise }) - it('rdKafka API should propagate context', async () => { + it('should propagate context', async () => { const expectedSpanPromise = agent.use(traces => { const span = traces[0][0] From 38ed0d699339cb97753f73179e94c42f38bfd823 Mon Sep 17 00:00:00 2001 From: William Conti Date: Tue, 13 May 2025 12:11:24 -0400 Subject: [PATCH 08/21] bump timeout --- .../test/index.spec.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js b/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js index 3e91f23f9e9..e0efd19b042 100644 --- a/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js +++ b/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js @@ -404,7 +404,7 @@ describe('Plugin', () => { }) expect(parseInt(span.parent_id.toString())).to.be.gt(0) - }, { timeoutMs: 10000 }) + }, { timeoutMs: 30000 }) // Send a test message using the producer const message = Buffer.from('test message for native consumer') From 1f36645b253701f40972226e3e63dd157be4b5b6 Mon Sep 17 00:00:00 2001 From: William Conti Date: Tue, 13 May 2025 12:28:20 -0400 Subject: [PATCH 09/21] produce message again if not found --- .../test/index.spec.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js b/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js index e0efd19b042..483a0081f2c 100644 --- a/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js +++ b/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js @@ -382,9 +382,9 @@ describe('Plugin', () => { } resolve(messages) }) + produce() } attemptConsume() - produce() }) }) From cff7f0c45d760c1d8cb3355aea0dfb1a040bf893 Mon Sep 17 00:00:00 2001 From: William Conti Date: Tue, 13 May 2025 12:41:27 -0400 Subject: [PATCH 10/21] revert tests --- .../test/index.spec.js | 48 +++++-------------- 1 file changed, 11 insertions(+), 37 deletions(-) diff --git a/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js b/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js index 483a0081f2c..eecce58c053 100644 --- a/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js +++ b/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js @@ -370,21 +370,11 @@ describe('Plugin', () => { let consumePromise nativeConsumer.on('ready', () => { // Consume messages - consumePromise = new Promise((resolve) => { - const produce = () => { - nativeProducer.produce(testTopic, null, message, key) - } - const attemptConsume = () => { - nativeConsumer.consume(1, (err, messages) => { - if (err || !messages || messages.length === 0) { - setTimeout(attemptConsume, 100) - return - } - resolve(messages) - }) - produce() - } - attemptConsume() + consumePromise = new Promise(resolve => { + nativeConsumer.consume(1, (err, messages) => { + resolve() + }) + nativeProducer.produce(testTopic, null, message, key) }) }) @@ -404,7 +394,7 @@ describe('Plugin', () => { }) expect(parseInt(span.parent_id.toString())).to.be.gt(0) - }, { timeoutMs: 30000 }) + }, { timeoutMs: 10000 }) // Send a test message using the producer const message = Buffer.from('test message for native consumer') @@ -413,27 +403,11 @@ describe('Plugin', () => { let consumePromise nativeConsumer.on('ready', () => { // Consume messages - consumePromise = new Promise((resolve) => { - const produce = () => { - nativeProducer.produce(testTopic, null, message, key) - } - const attemptConsume = () => { - nativeConsumer.consume(1, (err, messages) => { - if (err || !messages || messages.length === 0) { - setTimeout(attemptConsume, 100) - return - } - // for some reason, messages occassionally don't arrive with headers - // despite header injection occurring during produce, so retry this case - if (messages && !messages[0].headers) { - setTimeout(produce, 100) - return - } - resolve(messages) - }) - } - attemptConsume() - produce() + consumePromise = new Promise(resolve => { + nativeConsumer.consume(1, (err, messages) => { + resolve() + }) + nativeProducer.produce(testTopic, null, message, key) }) }) From f8d01afe0b9f161fd80eceecb9abec47be206a33 Mon Sep 17 00:00:00 2001 From: William Conti Date: Tue, 13 May 2025 12:41:52 -0400 Subject: [PATCH 11/21] bump kafka hook version to 1.3.0 which seems more stable --- .../src/confluentinc-kafka-javascript.js | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/packages/datadog-instrumentations/src/confluentinc-kafka-javascript.js b/packages/datadog-instrumentations/src/confluentinc-kafka-javascript.js index 2d04d770c8b..b0332ffcf6f 100644 --- a/packages/datadog-instrumentations/src/confluentinc-kafka-javascript.js +++ b/packages/datadog-instrumentations/src/confluentinc-kafka-javascript.js @@ -27,8 +27,9 @@ const channels = { // we need to store the offset per partition per topic for the consumer to track offsets for DSM const latestConsumerOffsets = new Map() -// Customize the instrumentation for Confluent Kafka JavaScript -addHook({ name: '@confluentinc/kafka-javascript', versions: ['>=1.0.0'] }, (module) => { +// We could probably support >1.0.0, but 1.0.0 seems to be quite buggy and our tests frequently fail +// due to undelivered messages +addHook({ name: '@confluentinc/kafka-javascript', versions: ['>=1.3.0'] }, (module) => { // Hook native module classes first instrumentBaseModule(module) From 1743986b8ab0e491f000e762a091f56456604b4c Mon Sep 17 00:00:00 2001 From: William Conti Date: Tue, 13 May 2025 16:38:19 -0400 Subject: [PATCH 12/21] possible fix --- .../src/confluentinc-kafka-javascript.js | 18 +++-- .../test/index.spec.js | 70 ++++++++++--------- 2 files changed, 49 insertions(+), 39 deletions(-) diff --git a/packages/datadog-instrumentations/src/confluentinc-kafka-javascript.js b/packages/datadog-instrumentations/src/confluentinc-kafka-javascript.js index b0332ffcf6f..3d0b44c7c9e 100644 --- a/packages/datadog-instrumentations/src/confluentinc-kafka-javascript.js +++ b/packages/datadog-instrumentations/src/confluentinc-kafka-javascript.js @@ -27,9 +27,7 @@ const channels = { // we need to store the offset per partition per topic for the consumer to track offsets for DSM const latestConsumerOffsets = new Map() -// We could probably support >1.0.0, but 1.0.0 seems to be quite buggy and our tests frequently fail -// due to undelivered messages -addHook({ name: '@confluentinc/kafka-javascript', versions: ['>=1.3.0'] }, (module) => { +addHook({ name: '@confluentinc/kafka-javascript', versions: ['>=1.0.0'] }, (module) => { // Hook native module classes first instrumentBaseModule(module) @@ -51,7 +49,7 @@ function instrumentBaseModule (module) { // Hook the produce method if (typeof producer?.produce === 'function') { shimmer.wrap(producer, 'produce', function wrapProduce (produce) { - return function wrappedProduce (topic, partition, message, key, timestamp, opaque) { + return function wrappedProduce (topic, partition, message, key, timestamp, opaque, headers) { if (!channels.producerStart.hasSubscribers) { return produce.apply(this, arguments) } @@ -65,7 +63,8 @@ function instrumentBaseModule (module) { return channels.producerStart.runStores(ctx, () => { try { - const result = produce.apply(this, arguments) + headers = convertHeaders(ctx.messages[0].headers) + const result = produce.apply(this, [topic, partition, message, key, timestamp, opaque, headers]) channels.producerCommit.publish(result) channels.producerFinish.runStores(ctx, () => {}) @@ -128,7 +127,9 @@ function instrumentBaseModule (module) { try { const result = callback.apply(this, arguments) - channels.consumerFinish.runStores(ctx, () => {}) + if (messages && messages.length > 0) { + channels.consumerFinish.runStores(ctx, () => {}) + } return result } catch (error) { ctx.err = error @@ -391,3 +392,8 @@ function updateLatestOffset (topic, partition, offset, groupId) { function getLatestOffsets () { return Array.from(latestConsumerOffsets.values()) } + +function convertHeaders (headers) { + // convert headers from object to array of objects with 1 key and value per array entry + return Object.entries(headers).map(([key, value]) => ({ [key.toString()]: value.toString() })) +} diff --git a/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js b/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js index eecce58c053..ba1f5e5ad88 100644 --- a/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js +++ b/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js @@ -332,23 +332,46 @@ describe('Plugin', () => { }) describe('consumer', () => { - beforeEach(() => { + beforeEach((done) => { nativeConsumer = new Consumer({ 'bootstrap.servers': '127.0.0.1:9092', - 'group.id': 'test-group-native' + 'group.id': 'test-group-native', + 'auto.offset.reset': 'latest' }) - nativeConsumer.on('ready', () => { - nativeConsumer.subscribe([testTopic]) + nativeConsumer.connect({}, (err, d) => { + done() }) - - nativeConsumer.connect() }) afterEach(() => { + nativeConsumer.unsubscribe() nativeConsumer.disconnect() }) + function consume (consumer, producer, topic, message) { + consumer.consume(1, function (err, messages) { + if (err && err.code === -185) { + setTimeout(() => consume(consumer, producer, topic, message), 10) + return + } else if (!messages || messages.length === 0 || (err && err.code === -191)) { + producer.produce(topic, null, message, null) + setTimeout(() => consume(consumer, producer, topic, message), 10) + return + } + + const consumedMessage = messages[0] + + if (consumedMessage.value.toString() !== message.toString()) { + setTimeout(() => consume(consumer, producer, topic, message), 100) + return + } + + consumer.unsubscribe() + }) + producer.produce(topic, null, message, null) + } + it('should be instrumented', async () => { const expectedSpanPromise = expectSpanWithDefaults({ name: expectedSchema.receive.opName, @@ -363,22 +386,13 @@ describe('Plugin', () => { type: 'worker' }) + nativeConsumer.setDefaultConsumeTimeout(100) + nativeConsumer.subscribe([testTopic]) + // Send a test message using the producer const message = Buffer.from('test message for native consumer') - const key = 'native-consumer-key' - - let consumePromise - nativeConsumer.on('ready', () => { - // Consume messages - consumePromise = new Promise(resolve => { - nativeConsumer.consume(1, (err, messages) => { - resolve() - }) - nativeProducer.produce(testTopic, null, message, key) - }) - }) - await consumePromise + consume(nativeConsumer, nativeProducer, testTopic, message) return expectedSpanPromise }) @@ -395,23 +409,13 @@ describe('Plugin', () => { expect(parseInt(span.parent_id.toString())).to.be.gt(0) }, { timeoutMs: 10000 }) + nativeConsumer.setDefaultConsumeTimeout(100) + nativeConsumer.subscribe([testTopic]) // Send a test message using the producer - const message = Buffer.from('test message for native consumer') - const key = 'native-consumer-key' - - let consumePromise - nativeConsumer.on('ready', () => { - // Consume messages - consumePromise = new Promise(resolve => { - nativeConsumer.consume(1, (err, messages) => { - resolve() - }) - nativeProducer.produce(testTopic, null, message, key) - }) - }) + const message = Buffer.from('test message propagation for native consumer 1') - await consumePromise + consume(nativeConsumer, nativeProducer, testTopic, message) return expectedSpanPromise }) From d6faa5250e6fa358652dbd8c40ca21269617728d Mon Sep 17 00:00:00 2001 From: William Conti Date: Tue, 13 May 2025 17:25:17 -0400 Subject: [PATCH 13/21] more changes --- .../test/index.spec.js | 33 ++++++++++--------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js b/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js index ba1f5e5ad88..db6a01835ab 100644 --- a/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js +++ b/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js @@ -252,8 +252,11 @@ describe('Plugin', () => { let Consumer beforeEach(async () => { - tracer = require('../../dd-trace') await agent.load('@confluentinc/kafka-javascript') + }) + + beforeEach((done) => { + tracer = require('../../dd-trace') const lib = require(`../../../versions/${module}@${version}`).get() nativeApi = lib @@ -266,18 +269,16 @@ describe('Plugin', () => { dr_cb: true }) - nativeProducer.connect() - - await new Promise(resolve => { - nativeProducer.on('ready', resolve) + nativeProducer.connect({}, (err) => { + done() }) }) - afterEach(async () => { - await new Promise(resolve => { - nativeProducer.disconnect(resolve) - }) - }) + // afterEach((done) => { + // nativeProducer.disconnect(() => { + // done() + // }) + // }) describe('producer', () => { it('should be instrumented', async () => { @@ -335,8 +336,7 @@ describe('Plugin', () => { beforeEach((done) => { nativeConsumer = new Consumer({ 'bootstrap.servers': '127.0.0.1:9092', - 'group.id': 'test-group-native', - 'auto.offset.reset': 'latest' + 'group.id': 'test-group-native' }) nativeConsumer.connect({}, (err, d) => { @@ -344,9 +344,11 @@ describe('Plugin', () => { }) }) - afterEach(() => { + afterEach((done) => { nativeConsumer.unsubscribe() - nativeConsumer.disconnect() + nativeConsumer.disconnect(() => { + done() + }) }) function consume (consumer, producer, topic, message) { @@ -459,7 +461,7 @@ describe('Plugin', () => { tracer.use('@confluentinc/kafka-javascript', { dsmEnabled: true }) messages = [{ key: 'key1', value: 'test2' }] consumer = kafka.consumer({ - kafkaJS: { groupId: 'test-group', autoCommit: false } + kafkaJS: { groupId: 'test-group' } }) await consumer.connect() await consumer.subscribe({ topic: testTopic }) @@ -483,7 +485,6 @@ describe('Plugin', () => { afterEach(async () => { setDataStreamsContextSpy.restore() - await consumer.disconnect() }) it('Should set a checkpoint on produce', async () => { From 4d8ec5966aaa4c0501ab4afc912b3e849dc0d838 Mon Sep 17 00:00:00 2001 From: William Conti Date: Wed, 14 May 2025 08:21:52 -0400 Subject: [PATCH 14/21] other changes --- .../test/index.spec.js | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js b/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js index db6a01835ab..8f6240b2fe3 100644 --- a/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js +++ b/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js @@ -336,7 +336,7 @@ describe('Plugin', () => { beforeEach((done) => { nativeConsumer = new Consumer({ 'bootstrap.servers': '127.0.0.1:9092', - 'group.id': 'test-group-native' + 'group.id': 'test-group' }) nativeConsumer.connect({}, (err, d) => { @@ -354,11 +354,11 @@ describe('Plugin', () => { function consume (consumer, producer, topic, message) { consumer.consume(1, function (err, messages) { if (err && err.code === -185) { - setTimeout(() => consume(consumer, producer, topic, message), 10) + setTimeout(() => consume(consumer, producer, topic, message), 100) return } else if (!messages || messages.length === 0 || (err && err.code === -191)) { producer.produce(topic, null, message, null) - setTimeout(() => consume(consumer, producer, topic, message), 10) + setTimeout(() => consume(consumer, producer, topic, message), 100) return } @@ -461,7 +461,7 @@ describe('Plugin', () => { tracer.use('@confluentinc/kafka-javascript', { dsmEnabled: true }) messages = [{ key: 'key1', value: 'test2' }] consumer = kafka.consumer({ - kafkaJS: { groupId: 'test-group' } + kafkaJS: { groupId: 'test-group', fromBeginning: false } }) await consumer.connect() await consumer.subscribe({ topic: testTopic }) From d08113a5700bdf96a4c0d38a1cc75f72ae1887a3 Mon Sep 17 00:00:00 2001 From: William Conti Date: Wed, 14 May 2025 08:28:14 -0400 Subject: [PATCH 15/21] fix flaky tests --- .../test/index.spec.js | 92 +++++++++---------- 1 file changed, 46 insertions(+), 46 deletions(-) diff --git a/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js b/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js index eecce58c053..1611a0cd0c8 100644 --- a/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js +++ b/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js @@ -252,8 +252,11 @@ describe('Plugin', () => { let Consumer beforeEach(async () => { - tracer = require('../../dd-trace') await agent.load('@confluentinc/kafka-javascript') + }) + + beforeEach((done) => { + tracer = require('../../dd-trace') const lib = require(`../../../versions/${module}@${version}`).get() nativeApi = lib @@ -266,16 +269,8 @@ describe('Plugin', () => { dr_cb: true }) - nativeProducer.connect() - - await new Promise(resolve => { - nativeProducer.on('ready', resolve) - }) - }) - - afterEach(async () => { - await new Promise(resolve => { - nativeProducer.disconnect(resolve) + nativeProducer.connect({}, (err) => { + done() }) }) @@ -332,23 +327,47 @@ describe('Plugin', () => { }) describe('consumer', () => { - beforeEach(() => { + beforeEach((done) => { nativeConsumer = new Consumer({ 'bootstrap.servers': '127.0.0.1:9092', - 'group.id': 'test-group-native' + 'group.id': 'test-group' }) - nativeConsumer.on('ready', () => { - nativeConsumer.subscribe([testTopic]) + nativeConsumer.connect({}, (err, d) => { + done() }) - - nativeConsumer.connect() }) - afterEach(() => { - nativeConsumer.disconnect() + afterEach((done) => { + nativeConsumer.unsubscribe() + nativeConsumer.disconnect(() => { + done() + }) }) + function consume (consumer, producer, topic, message) { + consumer.consume(1, function (err, messages) { + if (err && err.code === -185) { + setTimeout(() => consume(consumer, producer, topic, message), 100) + return + } else if (!messages || messages.length === 0 || (err && err.code === -191)) { + producer.produce(topic, null, message, null) + setTimeout(() => consume(consumer, producer, topic, message), 100) + return + } + + const consumedMessage = messages[0] + + if (consumedMessage.value.toString() !== message.toString()) { + setTimeout(() => consume(consumer, producer, topic, message), 100) + return + } + + consumer.unsubscribe() + }) + producer.produce(topic, null, message, null) + } + it('should be instrumented', async () => { const expectedSpanPromise = expectSpanWithDefaults({ name: expectedSchema.receive.opName, @@ -363,22 +382,13 @@ describe('Plugin', () => { type: 'worker' }) + nativeConsumer.setDefaultConsumeTimeout(100) + nativeConsumer.subscribe([testTopic]) + // Send a test message using the producer const message = Buffer.from('test message for native consumer') - const key = 'native-consumer-key' - let consumePromise - nativeConsumer.on('ready', () => { - // Consume messages - consumePromise = new Promise(resolve => { - nativeConsumer.consume(1, (err, messages) => { - resolve() - }) - nativeProducer.produce(testTopic, null, message, key) - }) - }) - - await consumePromise + consume(nativeConsumer, nativeProducer, testTopic, message) return expectedSpanPromise }) @@ -395,23 +405,13 @@ describe('Plugin', () => { expect(parseInt(span.parent_id.toString())).to.be.gt(0) }, { timeoutMs: 10000 }) + nativeConsumer.setDefaultConsumeTimeout(100) + nativeConsumer.subscribe([testTopic]) // Send a test message using the producer - const message = Buffer.from('test message for native consumer') - const key = 'native-consumer-key' - - let consumePromise - nativeConsumer.on('ready', () => { - // Consume messages - consumePromise = new Promise(resolve => { - nativeConsumer.consume(1, (err, messages) => { - resolve() - }) - nativeProducer.produce(testTopic, null, message, key) - }) - }) + const message = Buffer.from('test message propagation for native consumer 1') - await consumePromise + consume(nativeConsumer, nativeProducer, testTopic, message) return expectedSpanPromise }) From fdd6300a810083244c41f4f3e27554c4b5364a5c Mon Sep 17 00:00:00 2001 From: William Conti Date: Wed, 14 May 2025 09:31:15 -0400 Subject: [PATCH 16/21] fix test flakiness --- .../test/index.spec.js | 59 ++++++++++++------- 1 file changed, 37 insertions(+), 22 deletions(-) diff --git a/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js b/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js index 1611a0cd0c8..49900091dda 100644 --- a/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js +++ b/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js @@ -274,6 +274,16 @@ describe('Plugin', () => { }) }) + afterEach((done) => { + try { + nativeProducer.disconnect(() => { + done() + }) + } catch (err) { + done(err) + } + }) + describe('producer', () => { it('should be instrumented', async () => { const expectedSpanPromise = expectSpanWithDefaults({ @@ -346,26 +356,31 @@ describe('Plugin', () => { }) function consume (consumer, producer, topic, message) { - consumer.consume(1, function (err, messages) { - if (err && err.code === -185) { - setTimeout(() => consume(consumer, producer, topic, message), 100) - return - } else if (!messages || messages.length === 0 || (err && err.code === -191)) { - producer.produce(topic, null, message, null) - setTimeout(() => consume(consumer, producer, topic, message), 100) - return - } - - const consumedMessage = messages[0] - - if (consumedMessage.value.toString() !== message.toString()) { - setTimeout(() => consume(consumer, producer, topic, message), 100) - return + return new Promise((resolve, reject) => { + function doConsume () { + consumer.consume(1, function (err, messages) { + if (err && err.code === -185) { + setTimeout(() => doConsume(), 20) + return + } else if (!messages || messages.length === 0 || (err && err.code === -191)) { + setTimeout(() => doConsume(), 20) + return + } + + const consumedMessage = messages[0] + + if (consumedMessage.value.toString() !== message.toString()) { + setTimeout(() => doConsume(), 20) + return + } + + consumer.unsubscribe() + resolve() + }) } - - consumer.unsubscribe() + doConsume() + producer.produce(topic, null, message, 'native-consumer-key') }) - producer.produce(topic, null, message, null) } it('should be instrumented', async () => { @@ -382,13 +397,13 @@ describe('Plugin', () => { type: 'worker' }) - nativeConsumer.setDefaultConsumeTimeout(100) + nativeConsumer.setDefaultConsumeTimeout(10) nativeConsumer.subscribe([testTopic]) // Send a test message using the producer const message = Buffer.from('test message for native consumer') - consume(nativeConsumer, nativeProducer, testTopic, message) + await consume(nativeConsumer, nativeProducer, testTopic, message) return expectedSpanPromise }) @@ -405,13 +420,13 @@ describe('Plugin', () => { expect(parseInt(span.parent_id.toString())).to.be.gt(0) }, { timeoutMs: 10000 }) - nativeConsumer.setDefaultConsumeTimeout(100) + nativeConsumer.setDefaultConsumeTimeout(10) nativeConsumer.subscribe([testTopic]) // Send a test message using the producer const message = Buffer.from('test message propagation for native consumer 1') - consume(nativeConsumer, nativeProducer, testTopic, message) + await consume(nativeConsumer, nativeProducer, testTopic, message) return expectedSpanPromise }) From c12273588a37cfcb73c4c136c650564d0b6d112d Mon Sep 17 00:00:00 2001 From: William Conti Date: Wed, 14 May 2025 10:50:15 -0400 Subject: [PATCH 17/21] fix plugins to run --- .github/workflows/plugins.yml | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/.github/workflows/plugins.yml b/.github/workflows/plugins.yml index 23acd9296cd..89a475d9101 100644 --- a/.github/workflows/plugins.yml +++ b/.github/workflows/plugins.yml @@ -257,7 +257,15 @@ jobs: strategy: matrix: # using node versions matrix since this plugin testing fails due to install differences between node versions - node-version: ['18', '20', '22', 'latest'] + include: + - node-version: 'latest' + node-opts: --no-async-context-frame + - node-version: '22' + node-opts: '' + - node-version: '20' + node-opts: '' + - node-version: '18' + node-opts: '' runs-on: ubuntu-latest services: kafka: From bffbf38fc625e14d00cae8c41f6aed2e4bbdb763 Mon Sep 17 00:00:00 2001 From: William Conti Date: Wed, 14 May 2025 11:14:20 -0400 Subject: [PATCH 18/21] fix final failing test --- packages/datadog-plugin-kafkajs/test/index.spec.js | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/datadog-plugin-kafkajs/test/index.spec.js b/packages/datadog-plugin-kafkajs/test/index.spec.js index da0f9d1b18f..34fdb0785a2 100644 --- a/packages/datadog-plugin-kafkajs/test/index.spec.js +++ b/packages/datadog-plugin-kafkajs/test/index.spec.js @@ -2,6 +2,7 @@ const { expect } = require('chai') const semver = require('semver') +const { storage } = require('../../datadog-core') const dc = require('dc-polyfill') const agent = require('../../dd-trace/test/plugins/agent') const { expectSomeSpan, withDefaults } = require('../../dd-trace/test/plugins/helpers') @@ -294,7 +295,9 @@ describe('Plugin', () => { const afterStart = dc.channel('dd-trace:kafkajs:consumer:afterStart') const spy = sinon.spy(() => { - expect(tracer.scope().active()).to.not.be.null + const store = storage('legacy').getStore() + expect(store).to.not.be.null + expect(store).to.not.be.undefined afterStart.unsubscribe(spy) }) afterStart.subscribe(spy) From 30b1825575cbe361021791a9f528b5ced6adeac8 Mon Sep 17 00:00:00 2001 From: William Conti Date: Wed, 14 May 2025 13:47:58 -0400 Subject: [PATCH 19/21] fix producer commit --- packages/datadog-plugin-kafkajs/src/producer.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/datadog-plugin-kafkajs/src/producer.js b/packages/datadog-plugin-kafkajs/src/producer.js index 709f670802c..f61cc3ae4c5 100644 --- a/packages/datadog-plugin-kafkajs/src/producer.js +++ b/packages/datadog-plugin-kafkajs/src/producer.js @@ -52,7 +52,9 @@ class KafkajsProducerPlugin extends ProducerPlugin { * @param {ProducerResponseItem[]} commitList * @returns {void} */ - commit (commitList) { + commit (ctx) { + const commitList = ctx.res + if (!this.config.dsmEnabled) return if (!commitList || !Array.isArray(commitList)) return const keys = [ From b5afcc3bc8bce8782312d10b232c60d9c0f00884 Mon Sep 17 00:00:00 2001 From: William Conti Date: Wed, 14 May 2025 14:09:41 -0400 Subject: [PATCH 20/21] another fix --- .../src/confluentinc-kafka-javascript.js | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/packages/datadog-instrumentations/src/confluentinc-kafka-javascript.js b/packages/datadog-instrumentations/src/confluentinc-kafka-javascript.js index 3d0b44c7c9e..5513c3ae583 100644 --- a/packages/datadog-instrumentations/src/confluentinc-kafka-javascript.js +++ b/packages/datadog-instrumentations/src/confluentinc-kafka-javascript.js @@ -66,7 +66,8 @@ function instrumentBaseModule (module) { headers = convertHeaders(ctx.messages[0].headers) const result = produce.apply(this, [topic, partition, message, key, timestamp, opaque, headers]) - channels.producerCommit.publish(result) + ctx.res = result + channels.producerCommit.publish(ctx) channels.producerFinish.runStores(ctx, () => {}) return result } catch (error) { @@ -213,7 +214,8 @@ function instrumentKafkaJS (kafkaJS) { const result = send.apply(this, arguments) result.then((res) => { - channels.producerCommit.publish(res) + ctx.res = res + channels.producerCommit.publish(ctx) channels.producerFinish.publish(undefined) }, (err) => { if (err) { From bc6abe5ea8f10188ebc8201ac8a5b4a7d4b6a8f0 Mon Sep 17 00:00:00 2001 From: William Conti Date: Tue, 27 May 2025 12:55:45 -0400 Subject: [PATCH 21/21] fix headers --- packages/datadog-instrumentations/src/kafkajs.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/datadog-instrumentations/src/kafkajs.js b/packages/datadog-instrumentations/src/kafkajs.js index 5e1c2e32931..2e3cab3dfdc 100644 --- a/packages/datadog-instrumentations/src/kafkajs.js +++ b/packages/datadog-instrumentations/src/kafkajs.js @@ -62,16 +62,16 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf const ctx = {} ctx.bootstrapServers = bootstrapServers ctx.clusterId = clusterId + ctx.disableHeaderInjection = disabledHeaderWeakSet.has(producer) const { topic, messages = [] } = arguments[0] for (const message of messages) { - if (message !== null && typeof message === 'object') { + if (message !== null && typeof message === 'object' && !ctx.disableHeaderInjection) { message.headers = message.headers || {} } } ctx.topic = topic ctx.messages = messages - ctx.disableHeaderInjection = disabledHeaderWeakSet.has(producer) return producerStartCh.runStores(ctx, () => { try {