Skip to content

tracing: remove asyncresource from kafka instrumentations #5703

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 39 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
7fcbd5d
fix kafka asyncstorage
wconti27 May 12, 2025
88346c1
fix confluent kafka
wconti27 May 12, 2025
00c5465
run kafka latest
wconti27 May 12, 2025
6f3379a
fix batch consumer
wconti27 May 12, 2025
ece0a0b
re run on failure
wconti27 May 13, 2025
e923e03
another fix
wconti27 May 13, 2025
27e4a74
change test name
wconti27 May 13, 2025
38ed0d6
bump timeout
wconti27 May 13, 2025
1f36645
produce message again if not found
wconti27 May 13, 2025
cff7f0c
revert tests
wconti27 May 13, 2025
f8d01af
bump kafka hook version to 1.3.0 which seems more stable
wconti27 May 13, 2025
1743986
possible fix
wconti27 May 13, 2025
d6faa52
more changes
wconti27 May 13, 2025
4d8ec59
other changes
wconti27 May 14, 2025
d08113a
fix flaky tests
wconti27 May 14, 2025
fdd6300
fix test flakiness
wconti27 May 14, 2025
3f4be38
merge with master
wconti27 May 14, 2025
c122735
fix plugins to run
wconti27 May 14, 2025
bffbf38
fix final failing test
wconti27 May 14, 2025
30b1825
fix producer commit
wconti27 May 14, 2025
b5afcc3
another fix
wconti27 May 14, 2025
ef13fd8
merge with master
wconti27 May 21, 2025
c3740e7
Merge branch 'master' into conti/fix-kafka-async-storage
wconti27 May 22, 2025
bc6abe5
fix headers
wconti27 May 27, 2025
3cbb073
Merge branch 'master' into conti/fix-kafka-async-storage
uurien May 28, 2025
5d83791
Run appsec kafka integration tests
uurien May 28, 2025
c34d95d
Fix kafkajs tests
uurien May 28, 2025
efbf99e
Fix tests
uurien May 28, 2025
ded8927
Fix lint
uurien May 28, 2025
56196e9
revert confluent testing change
wconti27 May 28, 2025
b758c3b
fix reviewer comments
wconti27 May 30, 2025
66d80c6
add comment
wconti27 May 30, 2025
ea73b54
Merge branch 'master' into conti/fix-kafka-async-storage
wconti27 May 30, 2025
fb84244
Merge branch 'master' into conti/fix-kafka-async-storage
wconti27 May 30, 2025
f62ab73
fix backlogs test
wconti27 May 30, 2025
59a478e
fix
wconti27 May 30, 2025
9220a29
Merge branch 'master' into conti/fix-kafka-async-storage
wconti27 May 30, 2025
7ab6b8c
Merge branch 'master' into conti/fix-kafka-async-storage
BridgeAR May 30, 2025
e009c85
Merge branch 'master' into conti/fix-kafka-async-storage
wconti27 Jun 2, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions .github/workflows/appsec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -302,3 +302,35 @@ jobs:
- uses: ./.github/actions/node/active-lts
- run: yarn test:appsec:plugins:ci
- uses: codecov/codecov-action@ad3126e916f78f00edff4ed0317cf185271ccc2d # v5.4.2

kafka:
runs-on: ubuntu-latest
services:
kafka:
image: apache/kafka-native:3.8.0-rc2
env:
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_NODE_ID: '1'
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_CONTROLLER_QUORUM_VOTERS: [email protected]:9093
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CLUSTER_ID: r4zt_wrqTRuT7W2NJsB_GA
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:9092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: '1'
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0'
ports:
- 9092:9092
- 9093:9093
env:
PLUGINS: kafkajs
SERVICES: kafka
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: ./.github/actions/node/oldest-maintenance-lts
- uses: ./.github/actions/install
- run: yarn test:appsec:plugins:ci
- uses: ./.github/actions/node/active-lts
- run: yarn test:appsec:plugins:ci
- uses: codecov/codecov-action@ad3126e916f78f00edff4ed0317cf185271ccc2d # v5.4.2
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

const {
addHook,
channel,
AsyncResource
channel
} = require('./helpers/instrument')
const shimmer = require('../../datadog-shimmer')

Expand Down Expand Up @@ -32,7 +31,6 @@ const disabledHeaderWeakSet = new WeakSet()
// we need to store the offset per partition per topic for the consumer to track offsets for DSM
const latestConsumerOffsets = new Map()

// Customize the instrumentation for Confluent Kafka JavaScript
addHook({ name: '@confluentinc/kafka-javascript', versions: ['>=1.0.0'] }, (module) => {
// Hook native module classes first
instrumentBaseModule(module)
Expand All @@ -55,30 +53,31 @@ function instrumentBaseModule (module) {
// Hook the produce method
if (typeof producer?.produce === 'function') {
shimmer.wrap(producer, 'produce', function wrapProduce (produce) {
return function wrappedProduce (topic, partition, message, key, timestamp, opaque) {
return function wrappedProduce (topic, partition, message, key, timestamp, opaque, headers) {
if (!channels.producerStart.hasSubscribers) {
return produce.apply(this, arguments)
}

const brokers = this.globalConfig?.['bootstrap.servers']

const asyncResource = new AsyncResource('bound-anonymous-fn')
return asyncResource.runInAsyncScope(() => {
try {
channels.producerStart.publish({
topic,
messages: [{ key, value: message }],
bootstrapServers: brokers
})
const ctx = {}
ctx.topic = topic
ctx.messages = [{ key, value: message }]
ctx.bootstrapServers = brokers

const result = produce.apply(this, arguments)
return channels.producerStart.runStores(ctx, () => {
try {
headers = convertHeaders(ctx.messages[0].headers)
const result = produce.apply(this, [topic, partition, message, key, timestamp, opaque, headers])

channels.producerCommit.publish(undefined)
channels.producerFinish.publish(undefined)
ctx.res = result
channels.producerCommit.publish(ctx)
channels.producerFinish.runStores(ctx, () => {})
return result
} catch (error) {
channels.producerError.publish(error)
channels.producerFinish.publish(undefined)
ctx.err = error
channels.producerError.publish(ctx)
channels.producerFinish.runStores(ctx, () => {})
throw error
}
})
Expand Down Expand Up @@ -110,32 +109,37 @@ function instrumentBaseModule (module) {
callback = numMessages
}

const ctx = {}
// Handle callback-based consumption
if (typeof callback === 'function') {
return consume.call(this, numMessages, function wrappedCallback (err, messages) {
if (messages && messages.length > 0) {
messages.forEach(message => {
channels.consumerStart.publish({
topic: message?.topic,
partition: message?.partition,
message,
groupId
})
ctx.topic = message?.topic
ctx.partition = message?.partition
ctx.message = message
ctx.groupId = groupId

channels.consumerStart.runStores(ctx, () => {})
updateLatestOffset(message?.topic, message?.partition, message?.offset, groupId)
})
}

if (err) {
channels.consumerError.publish(err)
ctx.err = err
channels.consumerError.publish(ctx)
}

try {
const result = callback.apply(this, arguments)
channels.consumerFinish.publish(undefined)
if (messages && messages.length > 0) {
channels.consumerFinish.runStores(ctx, () => {})
}
return result
} catch (error) {
channels.consumerError.publish(error)
channels.consumerFinish.publish(undefined)
ctx.err = error
channels.consumerError.publish(ctx)
channels.consumerFinish.runStores(ctx, () => {})
throw error
}
})
Expand Down Expand Up @@ -204,44 +208,42 @@ function instrumentKafkaJS (kafkaJS) {
return send.apply(this, arguments)
}

const asyncResource = new AsyncResource('bound-anonymous-fn')
return asyncResource.runInAsyncScope(() => {
try {
channels.producerStart.publish({
topic: payload?.topic,
messages: payload?.messages || [],
bootstrapServers: kafka._ddBrokers,
disableHeaderInjection: disabledHeaderWeakSet.has(producer)
})
const ctx = {}
ctx.topic = payload?.topic
ctx.messages = payload?.messages || []
ctx.bootstrapServers = kafka._ddBrokers
ctx.disableHeaderInjection = disabledHeaderWeakSet.has(producer)

return channels.producerStart.runStores(ctx, () => {
try {
const result = send.apply(this, arguments)

result.then(
asyncResource.bind(res => {
channels.producerCommit.publish(res)
channels.producerFinish.publish(undefined)
}),
asyncResource.bind(err => {
if (err) {
// Fixes bug where we would inject message headers for kafka brokers
// that don't support headers (version <0.11). On the error, we disable
// header injection. Tnfortunately the error name / type is not more specific.
// This approach is implemented by other tracers as well.
if (err.name === 'KafkaJSError' && err.type === 'ERR_UNKNOWN') {
disabledHeaderWeakSet.add(producer)
log.error('Kafka Broker responded with UNKNOWN_SERVER_ERROR (-1). ' +
'Please look at broker logs for more information. ' +
'Tracer message header injection for Kafka is disabled.')
}
channels.producerError.publish(err)
result.then((res) => {
ctx.res = res
channels.producerCommit.publish(ctx)
channels.producerFinish.publish(undefined)
}, (err) => {
if (err) {
// Fixes bug where we would inject message headers for kafka brokers
// that don't support headers (version <0.11). On the error, we disable
// header injection. Tnfortunately the error name / type is not more specific.
// This approach is implemented by other tracers as well.
if (err.name === 'KafkaJSError' && err.type === 'ERR_UNKNOWN') {
disabledHeaderWeakSet.add(producer)
log.error('Kafka Broker responded with UNKNOWN_SERVER_ERROR (-1). ' +
'Please look at broker logs for more information. ' +
'Tracer message header injection for Kafka is disabled.')
}
channels.producerFinish.publish(undefined)
})
)
ctx.err = err
channels.producerError.publish(ctx)
}
channels.producerFinish.publish(undefined)
})

return result
} catch (e) {
channels.producerError.publish(e)
ctx.err = e
channels.producerError.publish(ctx)
channels.producerFinish.publish(undefined)
throw e
}
Expand Down Expand Up @@ -350,33 +352,36 @@ function wrapKafkaCallback (callback, { startCh, commitCh, finishCh, errorCh },
return function wrappedKafkaCallback (payload) {
const commitPayload = getPayload(payload)

const asyncResource = new AsyncResource('bound-anonymous-fn')
return asyncResource.runInAsyncScope(() => {
startCh.publish(commitPayload)
const ctx = {}
ctx.extractedArgs = commitPayload

return startCh.runStores(ctx, () => {
updateLatestOffset(commitPayload?.topic, commitPayload?.partition, commitPayload?.offset, commitPayload?.groupId)

try {
const result = callback.apply(this, arguments)

if (result && typeof result.then === 'function') {
return result
.then(asyncResource.bind(res => {
finishCh.publish(undefined)
.then((res) => {
ctx.res = res
finishCh.runStores(ctx, () => {})
return res
}))
.catch(asyncResource.bind(err => {
errorCh.publish(err)
finishCh.publish(undefined)
})
.catch((err) => {
ctx.err = err
errorCh.publish(ctx)
finishCh.runStores(ctx, () => {})
throw err
}))
})
} else {
finishCh.publish(undefined)
finishCh.runStores(ctx, () => {})
return result
}
} catch (error) {
errorCh.publish(error)
finishCh.publish(undefined)
ctx.err = error
errorCh.publish(ctx)
finishCh.runStores(ctx, () => {})
throw error
}
})
Expand Down Expand Up @@ -404,3 +409,8 @@ function updateLatestOffset (topic, partition, offset, groupId) {
function getLatestOffsets () {
return Array.from(latestConsumerOffsets.values())
}

function convertHeaders (headers) {
// convert headers from object to array of objects with 1 key and value per array entry
return Object.entries(headers).map(([key, value]) => ({ [key.toString()]: value.toString() }))
}
Loading