Skip to content

Commit

Permalink
fix: retry on body support (#3294)
Browse files Browse the repository at this point in the history
* test: add testing

* refactor: enhance body wrapping

* fix: do not mutate original opts

* docs: extend documentation
  • Loading branch information
metcoder95 authored May 29, 2024
1 parent 18af4b0 commit 5f11247
Show file tree
Hide file tree
Showing 5 changed files with 281 additions and 9 deletions.
3 changes: 3 additions & 0 deletions docs/docs/api/RetryHandler.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ It represents the retry state for a given request.
- **dispatch** `(options: Dispatch.DispatchOptions, handlers: Dispatch.DispatchHandlers) => Promise<Dispatch.DispatchResponse>` (required) - Dispatch function to be called after every retry.
- **handler** Extends [`Dispatch.DispatchHandlers`](Dispatcher.md#dispatcherdispatchoptions-handler) (required) - Handler function to be called after the request is successful or the retries are exhausted.

>__Note__: The `RetryHandler` does not retry over stateful bodies (e.g. streams, AsyncIterable) as those, once consumed, are left in an state that cannot be reutilized. For these situations the `RetryHandler` will identify
>the body as stateful and will not retry the request rejecting with the error `UND_ERR_REQ_RETRY`.
Examples:

```js
Expand Down
1 change: 1 addition & 0 deletions lib/core/symbols.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ module.exports = {
kHost: Symbol('host'),
kNoRef: Symbol('no ref'),
kBodyUsed: Symbol('used'),
kBody: Symbol('abstracted request body'),
kRunning: Symbol('running'),
kBlocking: Symbol('blocking'),
kPending: Symbol('pending'),
Expand Down
60 changes: 57 additions & 3 deletions lib/core/util.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,72 @@
'use strict'

const assert = require('node:assert')
const { kDestroyed, kBodyUsed, kListeners } = require('./symbols')
const { kDestroyed, kBodyUsed, kListeners, kBody } = require('./symbols')
const { IncomingMessage } = require('node:http')
const stream = require('node:stream')
const net = require('node:net')
const { InvalidArgumentError } = require('./errors')
const { Blob } = require('node:buffer')
const nodeUtil = require('node:util')
const { stringify } = require('node:querystring')
const { EventEmitter: EE } = require('node:events')
const { InvalidArgumentError } = require('./errors')
const { headerNameLowerCasedRecord } = require('./constants')
const { tree } = require('./tree')

const [nodeMajor, nodeMinor] = process.versions.node.split('.').map(v => Number(v))

class BodyAsyncIterable {
constructor (body) {
this[kBody] = body
this[kBodyUsed] = false
}

async * [Symbol.asyncIterator] () {
assert(!this[kBodyUsed], 'disturbed')
this[kBodyUsed] = true
yield * this[kBody]
}
}

function wrapRequestBody (body) {
if (isStream(body)) {
// TODO (fix): Provide some way for the user to cache the file to e.g. /tmp
// so that it can be dispatched again?
// TODO (fix): Do we need 100-expect support to provide a way to do this properly?
if (bodyLength(body) === 0) {
body
.on('data', function () {
assert(false)
})
}

if (typeof body.readableDidRead !== 'boolean') {
body[kBodyUsed] = false
EE.prototype.on.call(body, 'data', function () {
this[kBodyUsed] = true
})
}

return body
} else if (body && typeof body.pipeTo === 'function') {
// TODO (fix): We can't access ReadableStream internal state
// to determine whether or not it has been disturbed. This is just
// a workaround.
return new BodyAsyncIterable(body)
} else if (
body &&
typeof body !== 'string' &&
!ArrayBuffer.isView(body) &&
isIterable(body)
) {
// TODO: Should we allow re-using iterable if !this.opts.idempotent
// or through some other flag?
return new BodyAsyncIterable(body)
} else {
return body
}
}

function nop () {}

function isStream (obj) {
Expand Down Expand Up @@ -634,5 +687,6 @@ module.exports = {
isHttpOrHttpsPrefixed,
nodeMajor,
nodeMinor,
safeHTTPMethods: ['GET', 'HEAD', 'OPTIONS', 'TRACE']
safeHTTPMethods: ['GET', 'HEAD', 'OPTIONS', 'TRACE'],
wrapRequestBody
}
15 changes: 11 additions & 4 deletions lib/handler/retry-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@ const assert = require('node:assert')

const { kRetryHandlerDefaultRetry } = require('../core/symbols')
const { RequestRetryError } = require('../core/errors')
const { isDisturbed, parseHeaders, parseRangeHeader } = require('../core/util')
const {
isDisturbed,
parseHeaders,
parseRangeHeader,
wrapRequestBody
} = require('../core/util')

function calculateRetryAfterHeader (retryAfter) {
const current = Date.now()
Expand All @@ -29,7 +34,7 @@ class RetryHandler {

this.dispatch = handlers.dispatch
this.handler = handlers.handler
this.opts = dispatchOpts
this.opts = { ...dispatchOpts, body: wrapRequestBody(opts.body) }
this.abort = null
this.aborted = false
this.retryOpts = {
Expand Down Expand Up @@ -174,7 +179,9 @@ class RetryHandler {
this.abort(
new RequestRetryError('Request failed', statusCode, {
headers,
count: this.retryCount
data: {
count: this.retryCount
}
})
)
return false
Expand Down Expand Up @@ -278,7 +285,7 @@ class RetryHandler {

const err = new RequestRetryError('Request failed', statusCode, {
headers,
count: this.retryCount
data: { count: this.retryCount }
})

this.abort(err)
Expand Down
211 changes: 209 additions & 2 deletions test/retry-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const { tspl } = require('@matteo.collina/tspl')
const { test, after } = require('node:test')
const { createServer } = require('node:http')
const { once } = require('node:events')
const { Readable } = require('node:stream')

const { RetryHandler, Client } = require('..')
const { RequestHandler } = require('../lib/api/api-request')
Expand Down Expand Up @@ -204,6 +205,74 @@ test('Should account for network and response errors', async t => {
await t.completed
})

test('Issue #3288 - request with body (asynciterable)', async t => {
t = tspl(t, { plan: 6 })
const server = createServer()
const dispatchOptions = {
method: 'POST',
path: '/',
headers: {
'content-type': 'application/json'
},
body: (function * () {
yield 'hello'
yield 'world'
})()
}

server.on('request', (req, res) => {
res.writeHead(500, {
'content-type': 'application/json'
})

res.end('{"message": "failed"}')
})

server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
const handler = new RetryHandler(dispatchOptions, {
dispatch: client.dispatch.bind(client),
handler: {
onConnect () {
t.ok(true, 'pass')
},
onBodySent () {
t.ok(true, 'pass')
},
onHeaders (status, _rawHeaders, resume, _statusMessage) {
t.strictEqual(status, 500)
return true
},
onData (chunk) {
return true
},
onComplete () {
t.fail()
},
onError (err) {
t.equal(err.message, 'Request failed')
t.equal(err.statusCode, 500)
t.equal(err.data.count, 1)
}
}
})

after(async () => {
await client.close()
server.close()

await once(server, 'close')
})

client.dispatch(
dispatchOptions,
handler
)
})

await t.completed
})

test('Should use retry-after header for retries', async t => {
t = tspl(t, { plan: 4 })

Expand Down Expand Up @@ -734,6 +803,145 @@ test('retrying a request with a body', async t => {
await t.completed
})

test('retrying a request with a body (stream)', async t => {
let counter = 0
const server = createServer()
const dispatchOptions = {
retryOptions: {
retry: (err, { state, opts }, done) => {
counter++

if (
err.statusCode === 500 ||
err.message.includes('other side closed')
) {
setTimeout(done, 500)
return
}

return done(err)
}
},
method: 'POST',
path: '/',
headers: {
'content-type': 'application/json'
},
body: Readable.from(Buffer.from(JSON.stringify({ hello: 'world' })))
}

t = tspl(t, { plan: 3 })

server.on('request', (req, res) => {
switch (counter) {
case 0:
res.writeHead(500)
res.end('failed')
return
default:
t.fail()
}
})

server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
const handler = new RetryHandler(dispatchOptions, {
dispatch: client.dispatch.bind(client),
handler: new RequestHandler(dispatchOptions, (err, data) => {
t.equal(err.statusCode, 500)
t.equal(err.data.count, 1)
t.equal(err.code, 'UND_ERR_REQ_RETRY')
})
})

after(async () => {
await client.close()
server.close()

await once(server, 'close')
})

client.dispatch(
dispatchOptions,
handler
)
})

await t.completed
})

test('retrying a request with a body (buffer)', async t => {
let counter = 0
const server = createServer()
const dispatchOptions = {
retryOptions: {
retry: (err, { state, opts }, done) => {
counter++

if (
err.statusCode === 500 ||
err.message.includes('other side closed')
) {
setTimeout(done, 500)
return
}

return done(err)
}
},
method: 'POST',
path: '/',
headers: {
'content-type': 'application/json'
},
body: Buffer.from(JSON.stringify({ hello: 'world' }))
}

t = tspl(t, { plan: 1 })

server.on('request', (req, res) => {
switch (counter) {
case 0:
req.destroy()
return
case 1:
res.writeHead(500)
res.end('failed')
return
case 2:
res.writeHead(200)
res.end('hello world!')
return
default:
t.fail()
}
})

server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
const handler = new RetryHandler(dispatchOptions, {
dispatch: client.dispatch.bind(client),
handler: new RequestHandler(dispatchOptions, (err, data) => {
t.ifError(err)
})
})

after(async () => {
await client.close()
server.close()

await once(server, 'close')
})

client.dispatch(
dispatchOptions,
handler
)
})

await t.completed
})

test('should not error if request is not meant to be retried', async t => {
t = tspl(t, { plan: 3 })

Expand Down Expand Up @@ -777,8 +985,7 @@ test('should not error if request is not meant to be retried', async t => {
t.strictEqual(Buffer.concat(chunks).toString('utf-8'), 'Bad request')
},
onError (err) {
console.log({ err })
t.fail()
t.fail(err)
}
}
})
Expand Down

0 comments on commit 5f11247

Please sign in to comment.