Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions packages/@uppy/companion/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,39 @@ app.use(companionApp)
```

To enable companion socket for realtime feed to the client while upload is going
on, you call the `socket` method like so.
on, you call the `setupSocket` method from the app response like so.

```javascript
// ...
const { app: companionApp, setupSocket } = companion.app(options)
app.use(companionApp)

const server = app.listen(PORT)
setupSocket(server)
```

#### WebSocket Authentication

To add authentication/authorization to WebSocket connections, you can provide an `onConnection` callback in your options:

companion.socket(server)
```javascript
const options = {
// ... other options
onConnection: async (ws, req) => {
// Custom authentication logic
const token = req.headers.authorization || new URL(req.url, 'http://localhost').searchParams.get('token')

if (!isValidToken(token)) {
throw new Error('Unauthorized')
}

// Connection will be allowed if this function doesn't throw
}
}
```

The `onConnection` callback receives the WebSocket instance and HTTP request object. If the callback throws an error, the connection will be closed with a 1008 (Policy Violation) status code.

### Run as standalone server

Please make sure that the required env variables are set before runnning/using
Expand Down
9 changes: 7 additions & 2 deletions packages/@uppy/companion/src/companion.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ export const errors = {
* Entry point into initializing the Companion app.
*
* @param {object} optionsArg
* @returns {{ app: import('express').Express, emitter: any }}}
* @returns {{ app: import('express').Express, emitter: any, setupSocket: function }}}
*/
export function app(optionsArg = {}) {
setLoggerProcessName(optionsArg)
Expand Down Expand Up @@ -310,5 +310,10 @@ export function app(optionsArg = {}) {
processId,
})

return { app, emitter }
return {
app,
emitter,
setupSocket: (server) =>
socket(server, { onConnection: options.onConnection }),
}
}
1 change: 1 addition & 0 deletions packages/@uppy/companion/src/config/companion.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export const defaultOptions = {
streamingUpload: true,
clientSocketConnectTimeout: 60000,
metrics: true,
onConnection: null,
}

/**
Expand Down
17 changes: 15 additions & 2 deletions packages/@uppy/companion/src/server/socket.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,28 @@ import Uploader from './Uploader.js'
* the socket is used to send progress events during an upload
*
* @param {import('http').Server | import('https').Server} server
* @param {object} [options] - Options object
* @param {function} [options.onConnection] - Optional callback to authenticate/authorize WebSocket connections
*/
export default function setupSocket(server) {
export default function setupSocket(server, options = {}) {
const wss = new WebSocketServer({ server })
const redisClient = redis.client()

// A new connection is usually created when an upload begins,
// or when connection fails while an upload is on-going and,
// client attempts to reconnect.
wss.on('connection', (ws, req) => {
wss.on('connection', async (ws, req) => {
// Call the onConnection callback if provided for authentication/authorization
if (options.onConnection) {
try {
await options.onConnection(ws, req)
} catch (error) {
logger.error(error, 'socket.auth.error')
ws.close(1008, 'Authentication failed')
return
}
}

const fullPath = req.url
// the token identifies which ongoing upload's progress, the socket
// connection wishes to listen to.
Expand Down
4 changes: 2 additions & 2 deletions packages/@uppy/companion/src/standalone/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ export default function server(inputCompanionOptions) {
}

// initialize companion
const { app: companionApp } = companion.app(companionOptions)
const { app: companionApp, setupSocket } = companion.app(companionOptions)

// add companion to server middleware
router.use(companionApp)
Expand Down Expand Up @@ -212,5 +212,5 @@ export default function server(inputCompanionOptions) {
}
})

return { app, companionOptions }
return { app, companionOptions, setupSocket }
}
6 changes: 3 additions & 3 deletions packages/@uppy/companion/src/standalone/start-server.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
#!/usr/bin/env node
import packageJson from '../../package.json' with { type: 'json' }
import * as companion from '../companion.js'
import logger from '../server/logger.js'
import standalone from './index.js'

const port = process.env.COMPANION_PORT || process.env.PORT || 3020

const { app } = standalone()
const { app, setupSocket } = standalone()

companion.socket(app.listen(port))
const server = app.listen(port)
setupSocket(server)

logger.info(`Welcome to Companion! v${packageJson.version}`)
logger.info(`Listening on http://localhost:${port}`)
140 changes: 140 additions & 0 deletions packages/@uppy/companion/test/socket-auth.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import { createServer } from 'node:http'
import { once } from 'node:events'
import { describe, expect, test, vi } from 'vitest'
import WebSocket from 'ws'
import socket from '../src/server/socket.js'

describe('socket onConnection authentication', () => {
test('should call onConnection callback when provided', async () => {
const server = createServer()
const onConnectionMock = vi.fn()

// Setup socket with onConnection callback
socket(server, { onConnection: onConnectionMock })

// Start server on available port
server.listen(0)
await once(server, 'listening')

const address = server.address()
const port = typeof address === 'string' ? 0 : address?.port || 0

try {
// Create WebSocket connection
const ws = new WebSocket(`ws://localhost:${port}/api/test-token`)

// Wait for connection to be established
await once(ws, 'open')

// Verify onConnection was called
expect(onConnectionMock).toHaveBeenCalledTimes(1)
expect(onConnectionMock).toHaveBeenCalledWith(
expect.any(Object), // WebSocket instance
expect.any(Object) // Request object
)

ws.close()
} finally {
server.close()
}
})

test('should close connection when onConnection throws error', async () => {
const server = createServer()
const onConnectionMock = vi.fn().mockRejectedValue(new Error('Unauthorized'))

// Setup socket with failing onConnection callback
socket(server, { onConnection: onConnectionMock })

// Start server on available port
server.listen(0)
await once(server, 'listening')

const address = server.address()
const port = typeof address === 'string' ? 0 : address?.port || 0

try {
// Create WebSocket connection
const ws = new WebSocket(`ws://localhost:${port}/api/test-token`)

// Wait for connection to be closed
const closeEvent = await once(ws, 'close')

// Verify connection was closed with authentication error code
expect(closeEvent[0]).toBe(1008) // 1008 = Policy Violation
expect(closeEvent[1].toString()).toBe('Authentication failed')

// Verify onConnection was called
expect(onConnectionMock).toHaveBeenCalledTimes(1)
} finally {
server.close()
}
})

test('should work normally without onConnection callback', async () => {
const server = createServer()

// Setup socket without onConnection callback
socket(server, {})

// Start server on available port
server.listen(0)
await once(server, 'listening')

const address = server.address()
const port = typeof address === 'string' ? 0 : address?.port || 0

try {
// Create WebSocket connection
const ws = new WebSocket(`ws://localhost:${port}/api/test-token`)

// Wait for connection to be established
await once(ws, 'open')

// Connection should be successful
expect(ws.readyState).toBe(WebSocket.OPEN)

ws.close()
} finally {
server.close()
}
})

test('should pass correct WebSocket and request objects to onConnection', async () => {
const server = createServer()
let capturedWs, capturedReq

const onConnectionMock = vi.fn((ws, req) => {
capturedWs = ws
capturedReq = req
})

// Setup socket with onConnection callback
socket(server, { onConnection: onConnectionMock })

// Start server on available port
server.listen(0)
await once(server, 'listening')

const address = server.address()
const port = typeof address === 'string' ? 0 : address?.port || 0

try {
// Create WebSocket connection with specific URL
const ws = new WebSocket(`ws://localhost:${port}/api/specific-token`)

// Wait for connection to be established
await once(ws, 'open')

// Verify the captured objects
expect(capturedWs).toBeDefined()
expect(capturedReq).toBeDefined()
// @ts-ignore - capturedReq is checked above
expect(capturedReq.url).toBe('/api/specific-token')

ws.close()
} finally {
server.close()
}
})
})