diff --git a/.github/workflows/appsec.yml b/.github/workflows/appsec.yml index f5b83a39549..a737f8e178d 100644 --- a/.github/workflows/appsec.yml +++ b/.github/workflows/appsec.yml @@ -302,3 +302,35 @@ jobs: - uses: ./.github/actions/node/active-lts - run: yarn test:appsec:plugins:ci - uses: codecov/codecov-action@ad3126e916f78f00edff4ed0317cf185271ccc2d # v5.4.2 + + kafka: + runs-on: ubuntu-latest + services: + kafka: + image: apache/kafka-native:3.8.0-rc2 + env: + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_NODE_ID: '1' + KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093 + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@127.0.0.1:9093 + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_CLUSTER_ID: r4zt_wrqTRuT7W2NJsB_GA + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:9092 + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: '1' + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0' + ports: + - 9092:9092 + - 9093:9093 + env: + PLUGINS: kafkajs + SERVICES: kafka + steps: + - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + - uses: ./.github/actions/node/oldest-maintenance-lts + - uses: ./.github/actions/install + - run: yarn test:appsec:plugins:ci + - uses: ./.github/actions/node/active-lts + - run: yarn test:appsec:plugins:ci + - uses: codecov/codecov-action@ad3126e916f78f00edff4ed0317cf185271ccc2d # v5.4.2 diff --git a/packages/datadog-instrumentations/src/confluentinc-kafka-javascript.js b/packages/datadog-instrumentations/src/confluentinc-kafka-javascript.js index 42c084ce592..7d269377f4d 100644 --- a/packages/datadog-instrumentations/src/confluentinc-kafka-javascript.js +++ b/packages/datadog-instrumentations/src/confluentinc-kafka-javascript.js @@ -2,8 +2,7 @@ const { addHook, - channel, - AsyncResource + channel } = require('./helpers/instrument') const shimmer = require('../../datadog-shimmer') @@ -32,7 +31,6 @@ const disabledHeaderWeakSet = new WeakSet() // we need to store the offset per partition per topic for the consumer to track offsets for DSM const latestConsumerOffsets = new Map() -// Customize the instrumentation for Confluent Kafka JavaScript addHook({ name: '@confluentinc/kafka-javascript', versions: ['>=1.0.0'] }, (module) => { // Hook native module classes first instrumentBaseModule(module) @@ -62,23 +60,25 @@ function instrumentBaseModule (module) { const brokers = this.globalConfig?.['bootstrap.servers'] - const asyncResource = new AsyncResource('bound-anonymous-fn') - return asyncResource.runInAsyncScope(() => { - try { - channels.producerStart.publish({ - topic, - messages: [{ key, value: message }], - bootstrapServers: brokers - }) + const ctx = { + topic, + messages: [{ key, value: message }], + bootstrapServers: brokers + } - const result = produce.apply(this, arguments) + return channels.producerStart.runStores(ctx, () => { + try { + const headers = convertHeaders(ctx.messages[0].headers) + const result = produce.apply(this, [topic, partition, message, key, timestamp, opaque, headers]) - channels.producerCommit.publish() - channels.producerFinish.publish() + ctx.result = result + channels.producerCommit.publish(ctx) + channels.producerFinish.publish(ctx) return result } catch (error) { - channels.producerError.publish(error) - channels.producerFinish.publish() + ctx.error = error + channels.producerError.publish(ctx) + channels.producerFinish.publish(ctx) throw error } }) @@ -110,32 +110,39 @@ function instrumentBaseModule (module) { callback = numMessages } + const ctx = { + groupId + } // Handle callback-based consumption if (typeof callback === 'function') { return consume.call(this, numMessages, function wrappedCallback (err, messages) { if (messages && messages.length > 0) { messages.forEach(message => { - channels.consumerStart.publish({ - topic: message?.topic, - partition: message?.partition, - message, - groupId - }) + ctx.topic = message?.topic + ctx.partition = message?.partition + ctx.message = message + + // TODO: We should be using publish here instead of runStores but we need bindStart to be called + channels.consumerStart.runStores(ctx, () => {}) updateLatestOffset(message?.topic, message?.partition, message?.offset, groupId) }) } if (err) { - channels.consumerError.publish(err) + ctx.error = err + channels.consumerError.publish(ctx) } try { const result = callback.apply(this, arguments) - channels.consumerFinish.publish() + if (messages && messages.length > 0) { + channels.consumerFinish.publish(ctx) + } return result } catch (error) { - channels.consumerError.publish(error) - channels.consumerFinish.publish() + ctx.error = error + channels.consumerError.publish(ctx) + channels.consumerFinish.publish(ctx) throw error } }) @@ -204,45 +211,44 @@ function instrumentKafkaJS (kafkaJS) { return send.apply(this, arguments) } - const asyncResource = new AsyncResource('bound-anonymous-fn') - return asyncResource.runInAsyncScope(() => { - try { - channels.producerStart.publish({ - topic: payload?.topic, - messages: payload?.messages || [], - bootstrapServers: kafka._ddBrokers, - disableHeaderInjection: disabledHeaderWeakSet.has(producer) - }) + const ctx = { + topic: payload?.topic, + messages: payload?.messages || [], + bootstrapServers: kafka._ddBrokers, + disableHeaderInjection: disabledHeaderWeakSet.has(producer) + } + return channels.producerStart.runStores(ctx, () => { + try { const result = send.apply(this, arguments) - result.then( - asyncResource.bind(res => { - channels.producerCommit.publish(res) - channels.producerFinish.publish() - }), - asyncResource.bind(err => { - if (err) { - // Fixes bug where we would inject message headers for kafka brokers - // that don't support headers (version <0.11). On the error, we disable - // header injection. Tnfortunately the error name / type is not more specific. - // This approach is implemented by other tracers as well. - if (err.name === 'KafkaJSError' && err.type === 'ERR_UNKNOWN') { - disabledHeaderWeakSet.add(producer) - log.error('Kafka Broker responded with UNKNOWN_SERVER_ERROR (-1). ' + - 'Please look at broker logs for more information. ' + - 'Tracer message header injection for Kafka is disabled.') - } - channels.producerError.publish(err) + result.then((res) => { + ctx.result = res + channels.producerCommit.publish(ctx) + channels.producerFinish.publish(ctx) + }, (err) => { + if (err) { + // Fixes bug where we would inject message headers for kafka brokers + // that don't support headers (version <0.11). On the error, we disable + // header injection. Tnfortunately the error name / type is not more specific. + // This approach is implemented by other tracers as well. + if (err.name === 'KafkaJSError' && err.type === 'ERR_UNKNOWN') { + disabledHeaderWeakSet.add(producer) + log.error('Kafka Broker responded with UNKNOWN_SERVER_ERROR (-1). ' + + 'Please look at broker logs for more information. ' + + 'Tracer message header injection for Kafka is disabled.') } - channels.producerFinish.publish() - }) - ) + ctx.error = err + channels.producerError.publish(ctx) + } + channels.producerFinish.publish(ctx) + }) return result } catch (e) { - channels.producerError.publish(e) - channels.producerFinish.publish() + ctx.error = e + channels.producerError.publish(ctx) + channels.producerFinish.publish(ctx) throw e } }) @@ -350,10 +356,11 @@ function wrapKafkaCallback (callback, { startCh, commitCh, finishCh, errorCh }, return function wrappedKafkaCallback (payload) { const commitPayload = getPayload(payload) - const asyncResource = new AsyncResource('bound-anonymous-fn') - return asyncResource.runInAsyncScope(() => { - startCh.publish(commitPayload) + const ctx = { + extractedArgs: commitPayload + } + return startCh.runStores(ctx, () => { updateLatestOffset(commitPayload?.topic, commitPayload?.partition, commitPayload?.offset, commitPayload?.groupId) try { @@ -361,22 +368,25 @@ function wrapKafkaCallback (callback, { startCh, commitCh, finishCh, errorCh }, if (result && typeof result.then === 'function') { return result - .then(asyncResource.bind(res => { - finishCh.publish() + .then((res) => { + ctx.result = res + finishCh.publish(ctx) return res - })) - .catch(asyncResource.bind(err => { - errorCh.publish(err) - finishCh.publish() + }) + .catch((err) => { + ctx.error = err + errorCh.publish(ctx) + finishCh.publish(ctx) throw err - })) + }) } else { - finishCh.publish() + finishCh.publish(ctx) return result } } catch (error) { - errorCh.publish(error) - finishCh.publish() + ctx.error = error + errorCh.publish(ctx) + finishCh.publish(ctx) throw error } }) @@ -404,3 +414,8 @@ function updateLatestOffset (topic, partition, offset, groupId) { function getLatestOffsets () { return [...latestConsumerOffsets.values()] } + +function convertHeaders (headers) { + // convert headers from object to array of objects with 1 key and value per array entry + return Object.entries(headers).map(([key, value]) => ({ [key.toString()]: value.toString() })) +} diff --git a/packages/datadog-instrumentations/src/kafkajs.js b/packages/datadog-instrumentations/src/kafkajs.js index dbdf148bfec..430e1e9d0ca 100644 --- a/packages/datadog-instrumentations/src/kafkajs.js +++ b/packages/datadog-instrumentations/src/kafkajs.js @@ -2,8 +2,7 @@ const { channel, - addHook, - AsyncResource + addHook } = require('./helpers/instrument') const shimmer = require('../../datadog-shimmer') @@ -60,29 +59,33 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf producer.send = function () { const wrappedSend = (clusterId) => { - const innerAsyncResource = new AsyncResource('bound-anonymous-fn') + const { topic, messages = [] } = arguments[0] + + const ctx = { + bootstrapServers, + clusterId, + disableHeaderInjection: disabledHeaderWeakSet.has(producer), + messages, + topic + } - return innerAsyncResource.runInAsyncScope(() => { - if (!producerStartCh.hasSubscribers) { - return send.apply(this, arguments) + for (const message of messages) { + if (message !== null && typeof message === 'object' && !ctx.disableHeaderInjection) { + message.headers = message.headers || {} } + } + return producerStartCh.runStores(ctx, () => { try { - const { topic, messages = [] } = arguments[0] - producerStartCh.publish({ - topic, - messages, - bootstrapServers, - clusterId, - disableHeaderInjection: disabledHeaderWeakSet.has(producer) - }) const result = send.apply(this, arguments) result.then( - innerAsyncResource.bind(res => { - producerFinishCh.publish() - producerCommitCh.publish(res) - }), - innerAsyncResource.bind(err => { + (res) => { + ctx.result = res + producerFinishCh.publish(ctx) + producerCommitCh.publish(ctx) + }, + (err) => { + ctx.error = err if (err) { // Fixes bug where we would inject message headers for kafka brokers that don't support headers // (version <0.11). On the error, we disable header injection. @@ -96,14 +99,14 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf } producerErrorCh.publish(err) } - producerFinishCh.publish() + producerFinishCh.publish(ctx) }) - ) return result } catch (e) { - producerErrorCh.publish(e) - producerFinishCh.publish() + ctx.error = e + producerErrorCh.publish(ctx) + producerFinishCh.publish(ctx) throw e } }) @@ -188,30 +191,35 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf const wrappedCallback = (fn, startCh, finishCh, errorCh, extractArgs, clusterId) => { return typeof fn === 'function' ? function (...args) { - const innerAsyncResource = new AsyncResource('bound-anonymous-fn') - return innerAsyncResource.runInAsyncScope(() => { - const extractedArgs = extractArgs(args, clusterId) + const extractedArgs = extractArgs(args, clusterId) + const ctx = { + extractedArgs + } - startCh.publish(extractedArgs) + return startCh.runStores(ctx, () => { try { const result = fn.apply(this, args) if (result && typeof result.then === 'function') { result.then( - innerAsyncResource.bind(() => finishCh.publish()), - innerAsyncResource.bind(err => { + (res) => { + ctx.result = res + finishCh.publish(ctx) + }, + (err) => { + ctx.error = err if (err) { - errorCh.publish(err) + errorCh.publish(ctx) } - finishCh.publish() + finishCh.publish(ctx) }) - ) } else { - finishCh.publish() + finishCh.publish(ctx) } return result } catch (e) { - errorCh.publish(e) - finishCh.publish() + ctx.error = e + errorCh.publish(ctx) + finishCh.publish(ctx) throw e } }) diff --git a/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js b/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js index 75168ebfb26..3ad6b675ffe 100644 --- a/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js +++ b/packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js @@ -491,7 +491,7 @@ describe('Plugin', () => { tracer.use('@confluentinc/kafka-javascript', { dsmEnabled: true }) messages = [{ key: 'key1', value: 'test2' }] consumer = kafka.consumer({ - kafkaJS: { groupId: 'test-group', autoCommit: false } + kafkaJS: { groupId: 'test-group', fromBeginning: false } }) await consumer.connect() await consumer.subscribe({ topic: testTopic }) @@ -515,7 +515,6 @@ describe('Plugin', () => { afterEach(async () => { setDataStreamsContextSpy.restore() - await consumer.disconnect() }) it('Should set a checkpoint on produce', async () => { diff --git a/packages/datadog-plugin-kafkajs/src/batch-consumer.js b/packages/datadog-plugin-kafkajs/src/batch-consumer.js index 4c701fac01e..74c3caa1451 100644 --- a/packages/datadog-plugin-kafkajs/src/batch-consumer.js +++ b/packages/datadog-plugin-kafkajs/src/batch-consumer.js @@ -6,7 +6,9 @@ class KafkajsBatchConsumerPlugin extends ConsumerPlugin { static get id () { return 'kafkajs' } static get operation () { return 'consume-batch' } - start ({ topic, partition, messages, groupId, clusterId }) { + start (ctx) { + const { topic, messages, groupId, clusterId } = ctx.extractedArgs || ctx + if (!this.config.dsmEnabled) return for (const message of messages) { if (!message || !message.headers) continue diff --git a/packages/datadog-plugin-kafkajs/src/consumer.js b/packages/datadog-plugin-kafkajs/src/consumer.js index 836e0edcfd9..b8c758da018 100644 --- a/packages/datadog-plugin-kafkajs/src/consumer.js +++ b/packages/datadog-plugin-kafkajs/src/consumer.js @@ -64,7 +64,9 @@ class KafkajsConsumerPlugin extends ConsumerPlugin { } } - start ({ topic, partition, message, groupId, clusterId }) { + bindStart (ctx) { + const { topic, partition, message, groupId, clusterId } = ctx.extractedArgs || ctx + let childOf const headers = convertToTextMap(message?.headers) if (headers) { @@ -83,7 +85,7 @@ class KafkajsConsumerPlugin extends ConsumerPlugin { metrics: { 'kafka.partition': partition } - }) + }, ctx) if (message?.offset) span.setTag('kafka.message.offset', message?.offset) if (this.config.dsmEnabled && headers) { @@ -97,16 +99,18 @@ class KafkajsConsumerPlugin extends ConsumerPlugin { } if (afterStartCh.hasSubscribers) { - afterStartCh.publish({ topic, partition, message, groupId }) + afterStartCh.publish({ topic, partition, message, groupId, currentStore: ctx.currentStore }) } + + return ctx.currentStore } - finish () { + finish (ctx) { if (beforeFinishCh.hasSubscribers) { beforeFinishCh.publish() } - super.finish() + super.finish(ctx) } } diff --git a/packages/datadog-plugin-kafkajs/src/producer.js b/packages/datadog-plugin-kafkajs/src/producer.js index 912886b0f1b..1f990f812c6 100644 --- a/packages/datadog-plugin-kafkajs/src/producer.js +++ b/packages/datadog-plugin-kafkajs/src/producer.js @@ -52,7 +52,9 @@ class KafkajsProducerPlugin extends ProducerPlugin { * @param {ProducerResponseItem[]} commitList * @returns {void} */ - commit (commitList) { + commit (ctx) { + const commitList = ctx.result + if (!this.config.dsmEnabled) return if (!commitList || !Array.isArray(commitList)) return const keys = [ @@ -67,7 +69,8 @@ class KafkajsProducerPlugin extends ProducerPlugin { } } - start ({ topic, messages, bootstrapServers, clusterId, disableHeaderInjection }) { + bindStart (ctx) { + const { topic, messages, bootstrapServers, clusterId, disableHeaderInjection } = ctx const span = this.startSpan({ resource: topic, meta: { @@ -79,7 +82,7 @@ class KafkajsProducerPlugin extends ProducerPlugin { metrics: { 'kafka.batch_size': messages.length } - }) + }, ctx) if (bootstrapServers) { span.setTag(BOOTSTRAP_SERVERS_KEY, bootstrapServers) } @@ -105,6 +108,8 @@ class KafkajsProducerPlugin extends ProducerPlugin { } } } + + return ctx.currentStore } } diff --git a/packages/datadog-plugin-kafkajs/test/index.spec.js b/packages/datadog-plugin-kafkajs/test/index.spec.js index 826f6204252..5454d125991 100644 --- a/packages/datadog-plugin-kafkajs/test/index.spec.js +++ b/packages/datadog-plugin-kafkajs/test/index.spec.js @@ -352,8 +352,8 @@ describe('Plugin', () => { it('should publish on afterStart channel', (done) => { const afterStart = dc.channel('dd-trace:kafkajs:consumer:afterStart') - const spy = sinon.spy(() => { - expect(tracer.scope().active()).to.not.be.null + const spy = sinon.spy((ctx) => { + expect(ctx.currentStore.span).to.not.be.null afterStart.unsubscribe(spy) }) afterStart.subscribe(spy) diff --git a/packages/dd-trace/src/appsec/iast/context/context-plugin.js b/packages/dd-trace/src/appsec/iast/context/context-plugin.js index d65b68258ae..de0c7672af0 100644 --- a/packages/dd-trace/src/appsec/iast/context/context-plugin.js +++ b/packages/dd-trace/src/appsec/iast/context/context-plugin.js @@ -11,7 +11,7 @@ const { TagKey } = require('../telemetry/iast-metric') class IastContextPlugin extends IastPlugin { startCtxOn (channelName, tag) { - super.addSub(channelName, (message) => this.startContext()) + super.addSub(channelName, (message) => this.startContext(message?.currentStore)) this._getAndRegisterSubscription({ channelName, @@ -44,11 +44,10 @@ class IastContextPlugin extends IastPlugin { } } - startContext () { + startContext (store = storage('legacy').getStore()) { let isRequestAcquired = false let iastContext - const store = storage('legacy').getStore() if (store) { const topContext = this.getTopContext() const rootSpan = this.getRootSpan(store) diff --git a/packages/dd-trace/src/appsec/iast/taint-tracking/plugins/kafka.js b/packages/dd-trace/src/appsec/iast/taint-tracking/plugins/kafka.js index ac95722a996..033eb42ebcb 100644 --- a/packages/dd-trace/src/appsec/iast/taint-tracking/plugins/kafka.js +++ b/packages/dd-trace/src/appsec/iast/taint-tracking/plugins/kafka.js @@ -1,7 +1,6 @@ 'use strict' const shimmer = require('../../../../../../datadog-shimmer') -const { storage } = require('../../../../../../datadog-core') const { getIastContext } = require('../../iast-context') const { KAFKA_MESSAGE_KEY, KAFKA_MESSAGE_VALUE } = require('../source-types') const { newTaintedObject, newTaintedString } = require('../operations') @@ -10,7 +9,7 @@ const { SourceIastPlugin } = require('../../iast-plugin') class KafkaConsumerIastPlugin extends SourceIastPlugin { onConfigure () { this.addSub({ channelName: 'dd-trace:kafkajs:consumer:afterStart', tag: [KAFKA_MESSAGE_KEY, KAFKA_MESSAGE_VALUE] }, - ({ message }) => this.taintKafkaMessage(message) + ({ message, currentStore }) => this.taintKafkaMessage(message, currentStore) ) } @@ -21,8 +20,8 @@ class KafkaConsumerIastPlugin extends SourceIastPlugin { } } - taintKafkaMessage (message) { - const iastContext = getIastContext(storage('legacy').getStore()) + taintKafkaMessage (message, currentStore) { + const iastContext = getIastContext(currentStore) if (iastContext && message) { const { key, value } = message diff --git a/packages/dd-trace/src/plugins/outbound.js b/packages/dd-trace/src/plugins/outbound.js index 1c3071f895d..d263a7021c2 100644 --- a/packages/dd-trace/src/plugins/outbound.js +++ b/packages/dd-trace/src/plugins/outbound.js @@ -26,6 +26,10 @@ class OutboundPlugin extends TracingPlugin { }) } + bindFinish (ctx) { + return ctx.parentStore + } + startSpan (...args) { const span = super.startSpan(...args) if ( @@ -86,10 +90,6 @@ class OutboundPlugin extends TracingPlugin { return peerData } - bindFinish (ctx) { - return ctx.parentStore - } - finish (ctx) { const span = ctx?.currentStore?.span || this.activeSpan this.tagPeerService(span) diff --git a/packages/dd-trace/test/appsec/iast/context/context-plugin.spec.js b/packages/dd-trace/test/appsec/iast/context/context-plugin.spec.js index db5f76987e3..4e00128328f 100644 --- a/packages/dd-trace/test/appsec/iast/context/context-plugin.spec.js +++ b/packages/dd-trace/test/appsec/iast/context/context-plugin.spec.js @@ -125,26 +125,24 @@ describe('IastContextPlugin', () => { }) it('should obtain needed info from data before starting iast context', () => { - const data = {} - sinon.stub(plugin, 'getTopContext').returns(topContext) sinon.stub(plugin, 'getRootSpan').returns(rootSpan) - plugin.startContext(data) + plugin.startContext() expect(plugin.getTopContext).to.be.calledOnce expect(plugin.getRootSpan).to.be.calledWith(store) }) it('should call overheadController before starting iast context', () => { - plugin.startContext({}) + plugin.startContext() expect(acquireRequest).to.be.calledOnceWith(rootSpan) }) it('should add _dd.iast.enabled:0 tag in the rootSpan', () => { const addTags = sinon.stub(rootSpan, 'addTags') - plugin.startContext({}) + plugin.startContext() expect(addTags).to.be.calledOnceWith({ [IAST_ENABLED_TAG_KEY]: 0 }) }) @@ -152,7 +150,7 @@ describe('IastContextPlugin', () => { it('should not fail if store does not contain span', () => { getStore.returns({}) - plugin.startContext({}) + plugin.startContext() expect(acquireRequest).to.be.calledOnceWith(undefined) }) @@ -171,28 +169,26 @@ describe('IastContextPlugin', () => { it('should add _dd.iast.enabled: 1 tag in the rootSpan', () => { const addTags = sinon.stub(rootSpan, 'addTags') - plugin.startContext({}) + plugin.startContext() expect(addTags).to.be.calledOnceWith({ [IAST_ENABLED_TAG_KEY]: 1 }) }) it('should create and save new IAST context and store it', () => { - const data = {} - plugin.startContext(data) + plugin.startContext() expect(newIastContext).to.be.calledOnceWith(rootSpan) expect(saveIastContext).to.be.calledOnceWith(store, topContext, context) }) it('should create new taint-tracking transaction', () => { - const data = {} - plugin.startContext(data) + plugin.startContext() expect(createTransaction).to.be.calledOnceWith('span-id', context) }) it('should obtain needed info from data before starting iast context', () => { - plugin.startContext({}) + plugin.startContext() expect(initializeRequestContext).to.be.calledOnceWith(context) })