Skip to content
This repository was archived by the owner on Aug 4, 2023. It is now read-only.

Commit 95d7622

Browse files
authored
feat: add support for Lambda data flushing (#178)
This adds 'Client#lambdaStart()' and the 'lambdaEnd: true' option to 'Client#flush([opts,] [cb])' to support for flushing Lambda invocation tracing data and signaling the Elastic Lambda extension per spec. https://github.com/elastic/apm/blob/main/specs/agents/tracing-instrumentation-aws-lambda.md#data-flushing This also changes to run each test file in its own process.as required by "lambda-usage.test.js". Refs: elastic/apm-agent-nodejs#2485
1 parent d6b26ec commit 95d7622

File tree

7 files changed

+384
-36
lines changed

7 files changed

+384
-36
lines changed

CHANGELOG.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,24 @@
11
# elastic-apm-http-client changelog
22

3+
## v11.0.0
4+
5+
- Add support for coordinating data flushing in an AWS Lambda environment. The
6+
following two API additions are used to ensure that (a) the Elastic Lambda
7+
extension is signaled at invocation end [per spec](https://github.com/elastic/apm/blob/main/specs/agents/tracing-instrumentation-aws-lambda.md#data-flushing)
8+
and (b) a new intake request is not started when a Lambda function invocation
9+
is not active.
10+
11+
- `Client#lambdaStart()` should be used to indicate when a Lambda function
12+
invocation begins.
13+
- `Client#flush([opts,] cb)` now supports an optional `opts.lambdaEnd`
14+
boolean. Set it to true to indicate this is a flush at the end of a Lambda
15+
function invocation.
16+
17+
This is a **BREAKING CHANGE**, because current versions of elastic-apm-node
18+
depend on `^10.4.0`. If this were released as another 10.x, then usage of
19+
current elastic-apm-node with this version of the client would break
20+
behavior in a Lambda environment.
21+
322
## v10.4.0
423

524
- Add APM Server version checking to the client. On creation the client will

README.md

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,33 @@ is provided. For example, in an AWS Lambda function some metadata is not
341341
available until the first function invocation -- which is some async time after
342342
Client creation.
343343

344+
### `client.lambdaStart()`
345+
346+
Tells the client that a Lambda function invocation has started.
347+
348+
#### Notes on Lambda usage
349+
350+
To properly handle [data flushing for instrumented Lambda functions](https://github.com/elastic/apm/blob/main/specs/agents/tracing-instrumentation-aws-lambda.md#data-flushing)
351+
this Client should be used as follows in a Lambda environment.
352+
353+
- When a Lambda invocation starts, `client.lambdaStart()` must be called.
354+
355+
The Client prevents intake requests to APM Server when in a Lambda environment
356+
when a function invocation is *not* active. This is to ensure that an intake
357+
request does not accidentally span a period when a Lambda VM is frozen,
358+
which can lead to timeouts and lost APM data.
359+
360+
- When a Lambda invocation finishes, `client.flush({lambdaEnd: true}, cb)` must
361+
be called.
362+
363+
The `lambdaEnd: true` tells the Client to (a) mark the lambda as inactive so
364+
a subsequent intake request is not started until the next invocation, and
365+
(b) signal the Elastic AWS Lambda Extension that this invocation is done.
366+
The user's Lambda handler should not finish until `cb` is called. This
367+
ensures that the extension receives tracing data and the end signal before
368+
the Lambda Runtime freezes the VM.
369+
370+
344371
### `client.sendSpan(span[, callback])`
345372

346373
Send a span to the APM Server.
@@ -381,7 +408,7 @@ Arguments:
381408
- `callback` - Callback is called when the `metricset` have been flushed to
382409
the underlying system
383410

384-
### `client.flush([callback])`
411+
### `client.flush([opts,] [callback])`
385412

386413
Flush the internal buffer and end the current HTTP request to the APM
387414
Server. If no HTTP request is in process nothing happens. In an AWS Lambda
@@ -390,6 +417,11 @@ because the APM agent always flushes at the end of a Lambda handler.
390417

391418
Arguments:
392419

420+
- `opts`:
421+
- `opts.lambdaEnd` - An optional boolean to indicate if this is the final
422+
flush at the end of the Lambda function invocation. The client will do
423+
some extra handling if this is the case. See notes in `client.lambdaStart()`
424+
above.
393425
- `callback` - Callback is called when the internal buffer has been
394426
flushed and the HTTP request ended. If no HTTP request is in progress
395427
the callback is called in the next tick.

index.js

Lines changed: 154 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ const http = require('http')
77
const https = require('https')
88
const util = require('util')
99
const os = require('os')
10+
const { performance } = require('perf_hooks')
1011
const { URL } = require('url')
1112
const zlib = require('zlib')
1213

@@ -25,11 +26,12 @@ const truncate = require('./lib/truncate')
2526

2627
module.exports = Client
2728

28-
// This symbol is used as a marker in the client stream to indicate special
29+
// These symbols are used as markers in the client stream to indicate special
2930
// flush handling.
3031
const kFlush = Symbol('flush')
32+
const kLambdaEndFlush = Symbol('lambdaEndFlush')
3133
function isFlushMarker (obj) {
32-
return obj === kFlush
34+
return obj === kFlush || obj === kLambdaEndFlush
3335
}
3436

3537
const hostname = os.hostname()
@@ -103,6 +105,9 @@ function Client (opts) {
103105
this._cloudMetadata = null
104106
this._extraMetadata = null
105107
this._metadataFilters = new Filters()
108+
// _lambdaActive indicates if a Lambda function invocation is active. It is
109+
// only meaningful if `isLambdaExecutionEnvironment`.
110+
this._lambdaActive = false
106111

107112
// Internal runtime stats for developer debugging/tuning.
108113
this._numEvents = 0 // number of events given to the client
@@ -187,6 +192,14 @@ function Client (opts) {
187192
this._index = clientsToAutoEnd.length
188193
clientsToAutoEnd.push(this)
189194

195+
// The 'beforeExit' event is significant in Lambda invocation completion
196+
// handling, so we log it for debugging.
197+
if (isLambdaExecutionEnvironment && this._log.isLevelEnabled('trace')) {
198+
process.prependListener('beforeExit', () => {
199+
this._log.trace('process "beforeExit"')
200+
})
201+
}
202+
190203
if (this._conf.centralConfig) {
191204
this._pollConfig()
192205
}
@@ -280,6 +293,7 @@ Client.prototype.config = function (opts) {
280293
// http request options
281294
this._conf.requestIntake = getIntakeRequestOptions(this._conf, this._agent)
282295
this._conf.requestConfig = getConfigRequestOptions(this._conf, this._agent)
296+
this._conf.requestSignalLambdaEnd = getSignalLambdaEndRequestOptions(this._conf, this._agent)
283297

284298
this._conf.metadata = getMetadata(this._conf)
285299

@@ -307,8 +321,8 @@ Client.prototype.setExtraMetadata = function (extraMetadata) {
307321
this._resetEncodedMetadata()
308322

309323
if (this._conf.expectExtraMetadata) {
324+
this._log.trace('maybe uncork (expectExtraMetadata)')
310325
this._maybeUncork()
311-
this._log.trace('uncorked (expectExtraMetadata)')
312326
}
313327
}
314328

@@ -429,7 +443,7 @@ Client.prototype._ref = function () {
429443

430444
Client.prototype._write = function (obj, enc, cb) {
431445
if (isFlushMarker(obj)) {
432-
this._writeFlush(cb)
446+
this._writeFlush(obj, cb)
433447
} else {
434448
const t = process.hrtime()
435449
const chunk = this._encode(obj, enc)
@@ -481,10 +495,10 @@ Client.prototype._writev = function (objs, cb) {
481495
offset = flushIdx
482496
} else if (flushIdx === objs.length - 1) {
483497
// The next item is a flush marker, and it is the *last* item in the queue.
484-
this._writeFlush(cb)
498+
this._writeFlush(objs[flushIdx].chunk, cb)
485499
} else {
486500
// The next item in the queue is a flush.
487-
this._writeFlush(processBatch)
501+
this._writeFlush(objs[flushIdx].chunk, processBatch)
488502
offset++
489503
}
490504
}
@@ -525,42 +539,55 @@ Client.prototype._writeBatch = function (objs, cb) {
525539
}, '_writeBatch')
526540
}
527541

528-
Client.prototype._writeFlush = function (cb) {
529-
this._log.trace({ activeIntakeReq: this._activeIntakeReq }, '_writeFlush')
530-
if (this._activeIntakeReq) {
531-
// In a Lambda environment a flush is almost certainly a signal that the
532-
// runtime environment is about to be frozen: tell the intake request
533-
// to finish up quickly.
534-
if (this._intakeRequestGracefulExitFn && isLambdaExecutionEnvironment) {
535-
this._intakeRequestGracefulExitFn()
542+
Client.prototype._writeFlush = function (flushMarker, cb) {
543+
this._log.trace({ activeIntakeReq: this._activeIntakeReq, lambdaEnd: flushMarker === kLambdaEndFlush }, '_writeFlush')
544+
545+
let onFlushed = cb
546+
if (isLambdaExecutionEnvironment && flushMarker === kLambdaEndFlush) {
547+
onFlushed = () => {
548+
// Signal the Elastic AWS Lambda extension that it is done passing data
549+
// for this invocation, then call `cb()` so the wrapped Lambda handler
550+
// can finish.
551+
this._signalLambdaEnd(cb)
536552
}
537-
this._onIntakeReqConcluded = cb
553+
}
554+
555+
if (this._activeIntakeReq) {
556+
this._onIntakeReqConcluded = onFlushed
538557
this._chopper.chop()
539558
} else {
540-
this._chopper.chop(cb)
559+
this._chopper.chop(onFlushed)
541560
}
542561
}
543562

544563
Client.prototype._maybeCork = function () {
545-
if (!this._writableState.corked && this._conf.bufferWindowTime !== -1) {
546-
this.cork()
547-
if (this._corkTimer && this._corkTimer.refresh) {
548-
// the refresh function was added in Node 10.2.0
549-
this._corkTimer.refresh()
550-
} else {
551-
this._corkTimer = setTimeout(() => {
552-
this.uncork()
553-
}, this._conf.bufferWindowTime)
564+
if (!this._writableState.corked) {
565+
if (isLambdaExecutionEnvironment && !this._lambdaActive) {
566+
this.cork()
567+
} else if (this._conf.bufferWindowTime !== -1) {
568+
this.cork()
569+
if (this._corkTimer && this._corkTimer.refresh) {
570+
// the refresh function was added in Node 10.2.0
571+
this._corkTimer.refresh()
572+
} else {
573+
this._corkTimer = setTimeout(() => {
574+
this.uncork()
575+
}, this._conf.bufferWindowTime)
576+
}
554577
}
555578
} else if (this._writableState.length >= this._conf.bufferWindowSize) {
556579
this._maybeUncork()
557580
}
558581
}
559582

560583
Client.prototype._maybeUncork = function () {
561-
// client must remain corked until cloud metadata has been
562-
// fetched-or-skipped.
563584
if (!this._encodedMetadata) {
585+
// The client must remain corked until cloud metadata has been
586+
// fetched-or-skipped.
587+
return
588+
} else if (isLambdaExecutionEnvironment && !this._lambdaActive) {
589+
// In a Lambda env, we must only uncork when an invocation is active,
590+
// otherwise we could start an intake request just before the VM is frozen.
564591
return
565592
}
566593

@@ -569,7 +596,7 @@ Client.prototype._maybeUncork = function () {
569596
// to `_maybeUncork` have time to be added to the queue. If we didn't do
570597
// this, that last write would trigger a single call to `_write`.
571598
process.nextTick(() => {
572-
if (this.destroyed === false) {
599+
if (this.destroyed === false && !(isLambdaExecutionEnvironment && !this._lambdaActive)) {
573600
this.uncork()
574601
}
575602
})
@@ -603,6 +630,10 @@ Client.prototype._encode = function (obj, enc) {
603630
return ndjson.serialize(out)
604631
}
605632

633+
Client.prototype.lambdaStart = function () {
634+
this._lambdaActive = true
635+
}
636+
606637
// With the cork/uncork handling on this stream, `this.write`ing on this
607638
// stream when already destroyed will lead to:
608639
// Error: Cannot call write after a stream was destroyed
@@ -652,14 +683,44 @@ Client.prototype.sendMetricSet = function (metricset, cb) {
652683
return this.write({ metricset }, Client.encoding.METRICSET, cb)
653684
}
654685

655-
Client.prototype.flush = function (cb) {
656-
this._maybeUncork()
686+
/**
687+
* If possible, start a flush of currently queued APM events to APM server.
688+
*
689+
* "If possible," because there are some guards on uncorking. See `_maybeUncork`.
690+
*
691+
* @param {Object} opts - Optional.
692+
* - {Boolean} opts.lambdaEnd - Optional. Default false. Setting this true
693+
* tells the client to also handle the end of a Lambda function invocation.
694+
* @param {Function} cb - Optional. `cb()` will be called when the data has
695+
* be sent to APM Server (or failed in the attempt).
696+
*/
697+
Client.prototype.flush = function (opts, cb) {
698+
if (typeof opts === 'function') {
699+
cb = opts
700+
opts = {}
701+
} else if (!opts) {
702+
opts = {}
703+
}
704+
const lambdaEnd = !!opts.lambdaEnd
657705

658706
// Write the special "flush" signal. We do this so that the order of writes
659707
// and flushes are kept. If we where to just flush the client right here, the
660708
// internal Writable buffer might still contain data that hasn't yet been
661709
// given to the _write function.
662-
return this.write(kFlush, cb)
710+
711+
if (lambdaEnd && isLambdaExecutionEnvironment && this._lambdaActive) {
712+
// To flush the current data and ensure that subsequently sent events *in
713+
// the same tick* do not start a new intake request, we must uncork
714+
// synchronously -- rather than the nextTick uncork done in `_maybeUncork()`.
715+
assert(this._encodedMetadata, 'client.flush({lambdaEnd:true}) must not be called before metadata has been set')
716+
const rv = this.write(kLambdaEndFlush, cb)
717+
this.uncork()
718+
this._lambdaActive = false
719+
return rv
720+
} else {
721+
this._maybeUncork()
722+
return this.write(kFlush, cb)
723+
}
663724
}
664725

665726
// A handler that can be called on process "beforeExit" to attempt quick and
@@ -909,7 +970,12 @@ function getChoppedStreamHandler (client, onerror) {
909970
// during a request. Given that the normal makeIntakeRequest behaviour
910971
// is to keep a request open for up to 10s (`apiRequestTimeout`), we must
911972
// manually unref the socket.
912-
if (!intakeRequestGracefulExitCalled) {
973+
//
974+
// The exception is when in a Lambda environment, where we *do* want to
975+
// keep the node process running to complete this intake request.
976+
// Otherwise a 'beforeExit' event can be sent, which the Lambda runtime
977+
// interprets as "the Lambda handler callback was never called".
978+
if (!isLambdaExecutionEnvironment && !intakeRequestGracefulExitCalled) {
913979
log.trace('intakeReq "socket": unref it')
914980
intakeReqSocket.unref()
915981
}
@@ -1048,6 +1114,55 @@ Client.prototype.supportsKeepingUnsampledTransaction = function () {
10481114
}
10491115
}
10501116

1117+
/**
1118+
* Signal to the Elastic AWS Lambda extension that a lambda function execution
1119+
* is done.
1120+
* https://github.com/elastic/apm/blob/main/specs/agents/tracing-instrumentation-aws-lambda.md#data-flushing
1121+
*
1122+
* @param {Function} cb() is called when finished. There are no arguments.
1123+
*/
1124+
Client.prototype._signalLambdaEnd = function (cb) {
1125+
this._log.trace('_signalLambdaEnd start')
1126+
const startTime = performance.now()
1127+
const finish = errOrErrMsg => {
1128+
const durationMs = performance.now() - startTime
1129+
if (errOrErrMsg) {
1130+
this._log.error({ err: errOrErrMsg, durationMs }, 'error signaling lambda invocation done')
1131+
} else {
1132+
this._log.trace({ durationMs }, 'signaled lambda invocation done')
1133+
}
1134+
cb()
1135+
}
1136+
1137+
// We expect to be talking to the localhost Elastic Lambda extension, so we
1138+
// want a shorter timeout than `_conf.serverTimeout`.
1139+
const TIMEOUT_MS = 5000
1140+
1141+
const req = this._transportRequest(this._conf.requestSignalLambdaEnd, res => {
1142+
res.on('error', err => {
1143+
// Not sure this event can ever be emitted, but just in case.
1144+
res.destroy(err)
1145+
})
1146+
res.resume()
1147+
if (res.statusCode !== 202) {
1148+
finish(`unexpected response status code: ${res.statusCode}`)
1149+
return
1150+
}
1151+
res.on('end', function () {
1152+
finish()
1153+
})
1154+
})
1155+
req.setTimeout(TIMEOUT_MS)
1156+
req.on('timeout', () => {
1157+
this._log.trace('_signalLambdaEnd timeout')
1158+
req.destroy(new Error(`timeout (${TIMEOUT_MS}ms) signaling Lambda invocation done`))
1159+
})
1160+
req.on('error', err => {
1161+
finish(err)
1162+
})
1163+
req.end()
1164+
}
1165+
10511166
/**
10521167
* Fetch the APM Server version and set `this._apmServerVersion`.
10531168
* https://www.elastic.co/guide/en/apm/server/current/server-info.html
@@ -1164,6 +1279,13 @@ function getIntakeRequestOptions (opts, agent) {
11641279
return getBasicRequestOptions('POST', '/intake/v2/events', headers, opts, agent)
11651280
}
11661281

1282+
function getSignalLambdaEndRequestOptions (opts, agent) {
1283+
const headers = getHeaders(opts)
1284+
headers['Content-Length'] = 0
1285+
1286+
return getBasicRequestOptions('POST', '/intake/v2/events?flushed=true', headers, opts, agent)
1287+
}
1288+
11671289
function getConfigRequestOptions (opts, agent) {
11681290
const path = '/config/v1/agents?' + querystring.stringify({
11691291
'service.name': opts.serviceName,

0 commit comments

Comments
 (0)