Skip to content

tracing: remove asyncresource from kafka instrumentations #5703

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion .github/workflows/plugins.yml
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,15 @@ jobs:
strategy:
matrix:
# using node versions matrix since this plugin testing fails due to install differences between node versions
node-version: ['18', '20', '22']
include:
- node-version: 'latest'
node-opts: --no-async-context-frame
- node-version: '22'
node-opts: ''
- node-version: '20'
node-opts: ''
- node-version: '18'
node-opts: ''
runs-on: ubuntu-latest
services:
kafka:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

const {
addHook,
channel,
AsyncResource
channel
} = require('./helpers/instrument')
const shimmer = require('../../datadog-shimmer')

Expand Down Expand Up @@ -32,7 +31,6 @@ const disabledHeaderWeakSet = new WeakSet()
// we need to store the offset per partition per topic for the consumer to track offsets for DSM
const latestConsumerOffsets = new Map()

// Customize the instrumentation for Confluent Kafka JavaScript
addHook({ name: '@confluentinc/kafka-javascript', versions: ['>=1.0.0'] }, (module) => {
// Hook native module classes first
instrumentBaseModule(module)
Expand All @@ -55,30 +53,31 @@ function instrumentBaseModule (module) {
// Hook the produce method
if (typeof producer?.produce === 'function') {
shimmer.wrap(producer, 'produce', function wrapProduce (produce) {
return function wrappedProduce (topic, partition, message, key, timestamp, opaque) {
return function wrappedProduce (topic, partition, message, key, timestamp, opaque, headers) {
if (!channels.producerStart.hasSubscribers) {
return produce.apply(this, arguments)
}

const brokers = this.globalConfig?.['bootstrap.servers']

const asyncResource = new AsyncResource('bound-anonymous-fn')
return asyncResource.runInAsyncScope(() => {
try {
channels.producerStart.publish({
topic,
messages: [{ key, value: message }],
bootstrapServers: brokers
})
const ctx = {}
ctx.topic = topic
ctx.messages = [{ key, value: message }]
ctx.bootstrapServers = brokers

const result = produce.apply(this, arguments)
return channels.producerStart.runStores(ctx, () => {
try {
headers = convertHeaders(ctx.messages[0].headers)
const result = produce.apply(this, [topic, partition, message, key, timestamp, opaque, headers])

channels.producerCommit.publish(undefined)
channels.producerFinish.publish(undefined)
ctx.res = result
channels.producerCommit.publish(ctx)
channels.producerFinish.runStores(ctx, () => {})
return result
} catch (error) {
channels.producerError.publish(error)
channels.producerFinish.publish(undefined)
ctx.err = error
channels.producerError.publish(ctx)
channels.producerFinish.runStores(ctx, () => {})
throw error
}
})
Expand Down Expand Up @@ -110,32 +109,37 @@ function instrumentBaseModule (module) {
callback = numMessages
}

const ctx = {}
// Handle callback-based consumption
if (typeof callback === 'function') {
return consume.call(this, numMessages, function wrappedCallback (err, messages) {
if (messages && messages.length > 0) {
messages.forEach(message => {
channels.consumerStart.publish({
topic: message?.topic,
partition: message?.partition,
message,
groupId
})
ctx.topic = message?.topic
ctx.partition = message?.partition
ctx.message = message
ctx.groupId = groupId

channels.consumerStart.runStores(ctx, () => {})
updateLatestOffset(message?.topic, message?.partition, message?.offset, groupId)
})
}

if (err) {
channels.consumerError.publish(err)
ctx.err = err
channels.consumerError.publish(ctx)
}

try {
const result = callback.apply(this, arguments)
channels.consumerFinish.publish(undefined)
if (messages && messages.length > 0) {
channels.consumerFinish.runStores(ctx, () => {})
}
return result
} catch (error) {
channels.consumerError.publish(error)
channels.consumerFinish.publish(undefined)
ctx.err = error
channels.consumerError.publish(ctx)
channels.consumerFinish.runStores(ctx, () => {})
throw error
}
})
Expand Down Expand Up @@ -204,44 +208,42 @@ function instrumentKafkaJS (kafkaJS) {
return send.apply(this, arguments)
}

const asyncResource = new AsyncResource('bound-anonymous-fn')
return asyncResource.runInAsyncScope(() => {
try {
channels.producerStart.publish({
topic: payload?.topic,
messages: payload?.messages || [],
bootstrapServers: kafka._ddBrokers,
disableHeaderInjection: disabledHeaderWeakSet.has(producer)
})
const ctx = {}
ctx.topic = payload?.topic
ctx.messages = payload?.messages || []
ctx.bootstrapServers = kafka._ddBrokers
ctx.disableHeaderInjection = disabledHeaderWeakSet.has(producer)

return channels.producerStart.runStores(ctx, () => {
try {
const result = send.apply(this, arguments)

result.then(
asyncResource.bind(res => {
channels.producerCommit.publish(res)
channels.producerFinish.publish(undefined)
}),
asyncResource.bind(err => {
if (err) {
// Fixes bug where we would inject message headers for kafka brokers
// that don't support headers (version <0.11). On the error, we disable
// header injection. Tnfortunately the error name / type is not more specific.
// This approach is implemented by other tracers as well.
if (err.name === 'KafkaJSError' && err.type === 'ERR_UNKNOWN') {
disabledHeaderWeakSet.add(producer)
log.error('Kafka Broker responded with UNKNOWN_SERVER_ERROR (-1). ' +
'Please look at broker logs for more information. ' +
'Tracer message header injection for Kafka is disabled.')
}
channels.producerError.publish(err)
result.then((res) => {
ctx.res = res
channels.producerCommit.publish(ctx)
channels.producerFinish.publish(undefined)
}, (err) => {
if (err) {
// Fixes bug where we would inject message headers for kafka brokers
// that don't support headers (version <0.11). On the error, we disable
// header injection. Tnfortunately the error name / type is not more specific.
// This approach is implemented by other tracers as well.
if (err.name === 'KafkaJSError' && err.type === 'ERR_UNKNOWN') {
disabledHeaderWeakSet.add(producer)
log.error('Kafka Broker responded with UNKNOWN_SERVER_ERROR (-1). ' +
'Please look at broker logs for more information. ' +
'Tracer message header injection for Kafka is disabled.')
}
channels.producerFinish.publish(undefined)
})
)
ctx.err = err
channels.producerError.publish(ctx)
}
channels.producerFinish.publish(undefined)
})

return result
} catch (e) {
channels.producerError.publish(e)
ctx.err = e
channels.producerError.publish(ctx)
channels.producerFinish.publish(undefined)
throw e
}
Expand Down Expand Up @@ -350,33 +352,36 @@ function wrapKafkaCallback (callback, { startCh, commitCh, finishCh, errorCh },
return function wrappedKafkaCallback (payload) {
const commitPayload = getPayload(payload)

const asyncResource = new AsyncResource('bound-anonymous-fn')
return asyncResource.runInAsyncScope(() => {
startCh.publish(commitPayload)
const ctx = {}
ctx.extractedArgs = commitPayload

return startCh.runStores(ctx, () => {
updateLatestOffset(commitPayload?.topic, commitPayload?.partition, commitPayload?.offset, commitPayload?.groupId)

try {
const result = callback.apply(this, arguments)

if (result && typeof result.then === 'function') {
return result
.then(asyncResource.bind(res => {
finishCh.publish(undefined)
.then((res) => {
ctx.res = res
finishCh.runStores(ctx, () => {})
return res
}))
.catch(asyncResource.bind(err => {
errorCh.publish(err)
finishCh.publish(undefined)
})
.catch((err) => {
ctx.err = err
errorCh.publish(ctx)
finishCh.runStores(ctx, () => {})
throw err
}))
})
} else {
finishCh.publish(undefined)
finishCh.runStores(ctx, () => {})
return result
}
} catch (error) {
errorCh.publish(error)
finishCh.publish(undefined)
ctx.err = error
errorCh.publish(ctx)
finishCh.runStores(ctx, () => {})
throw error
}
})
Expand Down Expand Up @@ -404,3 +409,8 @@ function updateLatestOffset (topic, partition, offset, groupId) {
function getLatestOffsets () {
return Array.from(latestConsumerOffsets.values())
}

function convertHeaders (headers) {
// convert headers from object to array of objects with 1 key and value per array entry
return Object.entries(headers).map(([key, value]) => ({ [key.toString()]: value.toString() }))
}
75 changes: 40 additions & 35 deletions packages/datadog-instrumentations/src/kafkajs.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

const {
channel,
addHook,
AsyncResource
addHook
} = require('./helpers/instrument')
const shimmer = require('../../datadog-shimmer')

Expand Down Expand Up @@ -60,29 +59,31 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf

producer.send = function () {
const wrappedSend = (clusterId) => {
const innerAsyncResource = new AsyncResource('bound-anonymous-fn')

return innerAsyncResource.runInAsyncScope(() => {
if (!producerStartCh.hasSubscribers) {
return send.apply(this, arguments)
const ctx = {}
ctx.bootstrapServers = bootstrapServers
ctx.clusterId = clusterId

const { topic, messages = [] } = arguments[0]
for (const message of messages) {
if (message !== null && typeof message === 'object') {
message.headers = message.headers || {}
}
}
ctx.topic = topic
ctx.messages = messages
ctx.disableHeaderInjection = disabledHeaderWeakSet.has(producer)

return producerStartCh.runStores(ctx, () => {
try {
const { topic, messages = [] } = arguments[0]
producerStartCh.publish({
topic,
messages,
bootstrapServers,
clusterId,
disableHeaderInjection: disabledHeaderWeakSet.has(producer)
})
const result = send.apply(this, arguments)
result.then(
innerAsyncResource.bind(res => {
producerFinishCh.publish(undefined)
producerCommitCh.publish(res)
}),
innerAsyncResource.bind(err => {
(res) => {
ctx.res = res
producerFinishCh.runStores(ctx, () => {})
producerCommitCh.publish(ctx)
},
(err) => {
ctx.err = err
if (err) {
// Fixes bug where we would inject message headers for kafka brokers that don't support headers
// (version <0.11). On the error, we disable header injection.
Expand All @@ -96,14 +97,14 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf
}
producerErrorCh.publish(err)
}
producerFinishCh.publish(undefined)
producerFinishCh.runStores(ctx, () => {})
})
)

return result
} catch (e) {
producerErrorCh.publish(e)
producerFinishCh.publish(undefined)
ctx.err = e
producerErrorCh.publish(ctx)
producerFinishCh.runStores(ctx, () => {})
throw e
}
})
Expand Down Expand Up @@ -188,29 +189,33 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf
const wrappedCallback = (fn, startCh, finishCh, errorCh, extractArgs, clusterId) => {
return typeof fn === 'function'
? function (...args) {
const innerAsyncResource = new AsyncResource('bound-anonymous-fn')
return innerAsyncResource.runInAsyncScope(() => {
const extractedArgs = extractArgs(args, clusterId)
const ctx = {}
const extractedArgs = extractArgs(args, clusterId)
ctx.extractedArgs = extractedArgs

startCh.publish(extractedArgs)
return startCh.runStores(ctx, () => {
try {
const result = fn.apply(this, args)
if (result && typeof result.then === 'function') {
result.then(
innerAsyncResource.bind(() => finishCh.publish(undefined)),
innerAsyncResource.bind(err => {
(res) => {
ctx.res = res
finishCh.runStores(ctx, () => {})
},
(err) => {
ctx.err = err
if (err) {
errorCh.publish(err)
errorCh.publish(ctx)
}
finishCh.publish(undefined)
finishCh.runStores(ctx, () => {})
})
)
} else {
finishCh.publish(undefined)
finishCh.runStores(ctx, () => {})
}
return result
} catch (e) {
errorCh.publish(e)
ctx.err = e
errorCh.publish(ctx)
finishCh.publish(undefined)
throw e
}
Expand Down
Loading
Loading