Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion lib/transport.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,20 @@ function transport (fullOptions) {
}

if (targets) {
target = bundlerOverrides['pino-worker'] || join(__dirname, 'worker.js')
target = bundlerOverrides['pino-worker'] || join(__dirname, 'worker-mixed.js')

options.targets = targets.map((dest) => {
if (dest.pipeline) {
dest.targets = dest.targets.map((pTarget) => {
// parse pipeline target paths
return {
...pTarget,
target: fixTarget(pTarget.target)
}
})
return { ...dest }
}

return {
...dest,
target: fixTarget(dest.target)
Expand Down
63 changes: 63 additions & 0 deletions lib/worker-mixed.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
'use strict'

const pino = require('../pino.js')
const build = require('pino-abstract-transport')
const buildPipelineStream = require('./worker-pipeline.js')
const loadTransportStreamBuilder = require('./transport-stream')
// This file is not checked by the code coverage tool,
// as it is not reliable.

/* istanbul ignore file */

module.exports = async function ({ targets, levels, dedupe }) {
// build target streams
targets = await Promise.all(targets.map(async (t) => {
if (t.pipeline) {
return {
level: t.level,
stream: await buildPipelineStream(t)
}
} else {
const fn = await loadTransportStreamBuilder(t.target)
const stream = await fn(t.options)
return {
level: t.level,
stream
}
}
}))

return build(process, {
parse: 'lines',
metadata: true,
close (err, cb) {
let expected = 0
for (const transport of targets) {
expected++
transport.stream.on('close', closeCb)
transport.stream.end()
}

function closeCb () {
if (--expected === 0) {
cb(err)
}
}
}
})

function process (stream) {
const multi = pino.multistream(targets, { levels, dedupe })
// TODO manage backpressure
stream.on('data', function (chunk) {
const { lastTime, lastMsg, lastObj, lastLevel } = this
multi.lastLevel = lastLevel
multi.lastTime = lastTime
multi.lastMsg = lastMsg
multi.lastObj = lastObj

// TODO handle backpressure
multi.write(chunk + '\n')
})
}
}