Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/dull-fans-drop.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@tus/server": patch
---

Fix unhandled promise rejection when converting a web stream to a Node.js stream when a client disconnects
27 changes: 19 additions & 8 deletions packages/server/src/handlers/BaseHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import type {DataStore, CancellationContext} from '@tus/utils'
import {ERRORS, type Upload, StreamLimiter, EVENTS} from '@tus/utils'
import throttle from 'lodash.throttle'
import stream from 'node:stream/promises'
import {PassThrough, type Readable} from 'node:stream'
import {PassThrough, Readable} from 'node:stream'

const reExtractFileID = /([^/]+)\/?$/
const reForwardedHost = /host="?([^";]+)/
Expand Down Expand Up @@ -127,7 +127,7 @@ export class BaseHandler extends EventEmitter {
}

protected writeToStore(
data: Readable,
webStream: ReadableStream | null,
upload: Upload,
maxFileSize: number,
context: CancellationContext
Expand All @@ -143,10 +143,17 @@ export class BaseHandler extends EventEmitter {
// Create a PassThrough stream as a proxy to manage the request stream.
// This allows for aborting the write process without affecting the incoming request stream.
const proxy = new PassThrough()
const nodeStream = webStream ? Readable.fromWeb(webStream) : Readable.from([])

// Ignore errors on the data stream to prevent crashes from client disconnections
// We handle errors on the proxy stream instead.
nodeStream.on('error', (err) => {
/* do nothing */
})

// gracefully terminate the proxy stream when the request is aborted
const onAbort = () => {
data.unpipe(proxy)
nodeStream.unpipe(proxy)

if (!proxy.closed) {
proxy.end()
Expand All @@ -155,13 +162,13 @@ export class BaseHandler extends EventEmitter {
context.signal.addEventListener('abort', onAbort, {once: true})

proxy.on('error', (err) => {
data.unpipe(proxy)
nodeStream.unpipe(proxy)
reject(err.name === 'AbortError' ? ERRORS.ABORTED : err)
})

const postReceive = throttle(
(offset: number) => {
this.emit(EVENTS.POST_RECEIVE, data, {...upload, offset})
this.emit(EVENTS.POST_RECEIVE, nodeStream, {...upload, offset})
},
this.options.postReceiveInterval,
{leading: false}
Expand All @@ -177,9 +184,13 @@ export class BaseHandler extends EventEmitter {
// to ensure that errors in the pipeline do not cause the request stream to be destroyed,
// which would result in a socket hangup error for the client.
stream
.pipeline(data.pipe(proxy), new StreamLimiter(maxFileSize), async (stream) => {
return this.store.write(stream as StreamLimiter, upload.id, upload.offset)
})
.pipeline(
nodeStream.pipe(proxy),
new StreamLimiter(maxFileSize),
async (stream) => {
return this.store.write(stream as StreamLimiter, upload.id, upload.offset)
}
)
.then(resolve)
.catch(reject)
.finally(() => {
Expand Down
7 changes: 1 addition & 6 deletions packages/server/src/handlers/PatchHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,7 @@ export class PatchHandler extends BaseHandler {
}

const maxBodySize = await this.calculateMaxBodySize(req, upload, maxFileSize)
newOffset = await this.writeToStore(
req.body ? Readable.fromWeb(req.body) : Readable.from([]),
upload,
maxBodySize,
context
)
newOffset = await this.writeToStore(req.body, upload, maxBodySize, context)
} finally {
await lock.unlock()
}
Expand Down
7 changes: 1 addition & 6 deletions packages/server/src/handlers/PostHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,7 @@ export class PostHandler extends BaseHandler {
// The request MIGHT include a Content-Type header when using creation-with-upload extension
if (validateHeader('content-type', req.headers.get('content-type'))) {
const bodyMaxSize = await this.calculateMaxBodySize(req, upload, maxFileSize)
const newOffset = await this.writeToStore(
req.body ? Readable.fromWeb(req.body) : Readable.from([]),
upload,
bodyMaxSize,
context
)
const newOffset = await this.writeToStore(req.body, upload, bodyMaxSize, context)

responseData.headers['Upload-Offset'] = newOffset.toString()
isFinal = newOffset === Number.parseInt(upload_length as string, 10)
Expand Down