diff --git a/.changeset/cool-hairs-tap.md b/.changeset/cool-hairs-tap.md new file mode 100644 index 00000000000..c92963745cf --- /dev/null +++ b/.changeset/cool-hairs-tap.md @@ -0,0 +1,6 @@ +--- +"@atproto/bsky": patch +--- + +Improve performance when serving blobs + diff --git a/.changeset/eight-cobras-swim.md b/.changeset/eight-cobras-swim.md new file mode 100644 index 00000000000..8a89def7089 --- /dev/null +++ b/.changeset/eight-cobras-swim.md @@ -0,0 +1,5 @@ +--- +"@atproto/common-web": patch +--- + +Add `createRetryable` utility function diff --git a/.changeset/loud-ants-tap.md b/.changeset/loud-ants-tap.md new file mode 100644 index 00000000000..4d522ddae98 --- /dev/null +++ b/.changeset/loud-ants-tap.md @@ -0,0 +1,5 @@ +--- +"@atproto-labs/xrpc-utils": patch +--- + +New utility package to work with xrpc-server diff --git a/.changeset/modern-bees-share.md b/.changeset/modern-bees-share.md new file mode 100644 index 00000000000..dece862e6a6 --- /dev/null +++ b/.changeset/modern-bees-share.md @@ -0,0 +1,5 @@ +--- +"@atproto-labs/did-resolver": patch +--- + +Ensure proper escaping when building PLC url diff --git a/.changeset/neat-insects-judge.md b/.changeset/neat-insects-judge.md new file mode 100644 index 00000000000..cb5221a8dc8 --- /dev/null +++ b/.changeset/neat-insects-judge.md @@ -0,0 +1,9 @@ +--- +"@atproto/identity": patch +"@atproto/dev-env": patch +"@atproto/ozone": patch +"@atproto/bsky": patch +"@atproto/pds": patch +--- + +Remove dependency on Axios diff --git a/.changeset/witty-owls-fail.md b/.changeset/witty-owls-fail.md new file mode 100644 index 00000000000..f7f218f011f --- /dev/null +++ b/.changeset/witty-owls-fail.md @@ -0,0 +1,5 @@ +--- +"@atproto/common-web": patch +--- + +Add `allFulfilled` utility diff --git a/package.json b/package.json index 365523f5b8e..080ec6fcdbf 100644 --- a/package.json +++ b/package.json @@ -36,7 +36,7 @@ "@swc/core": "^1.3.42", "@swc/jest": "^0.2.24", "@types/jest": "^28.1.4", - "@types/node": "^18.19.56", + "@types/node": "^18.19.67", "@typescript-eslint/eslint-plugin": "^7.4.0", "@typescript-eslint/parser": "^7.4.0", "dotenv": "^16.0.3", diff --git a/packages/aws/src/bunny.ts b/packages/aws/src/bunny.ts index 62cfeedc891..dadee53d554 100644 --- a/packages/aws/src/bunny.ts +++ b/packages/aws/src/bunny.ts @@ -1,4 +1,4 @@ -import { handleAllSettledErrors } from '@atproto/common' +import { allFulfilled } from '@atproto/common' import { ImageInvalidator } from './types' export type BunnyConfig = { @@ -11,7 +11,7 @@ const API_PURGE_URL = 'https://api.bunny.net/purge' export class BunnyInvalidator implements ImageInvalidator { constructor(public cfg: BunnyConfig) {} async invalidate(_subject: string, paths: string[]) { - const results = await Promise.allSettled( + await allFulfilled( paths.map(async (path) => purgeUrl({ url: this.cfg.urlPrefix + path, @@ -19,7 +19,6 @@ export class BunnyInvalidator implements ImageInvalidator { }), ), ) - handleAllSettledErrors(results) } } diff --git a/packages/aws/src/util.ts b/packages/aws/src/util.ts index 8f603055bea..90e9fc4d21f 100644 --- a/packages/aws/src/util.ts +++ b/packages/aws/src/util.ts @@ -1,14 +1,13 @@ -import { handleAllSettledErrors } from '@atproto/common' +import { allFulfilled } from '@atproto/common' import { ImageInvalidator } from './types' export class MultiImageInvalidator implements ImageInvalidator { constructor(public invalidators: ImageInvalidator[]) {} async invalidate(subject: string, paths: string[]) { - const results = await Promise.allSettled( + await allFulfilled( this.invalidators.map((invalidator) => invalidator.invalidate(subject, paths), ), ) - handleAllSettledErrors(results) } } diff --git a/packages/bsky/package.json b/packages/bsky/package.json index 9bd5c1f3b96..3416e81e071 100644 --- a/packages/bsky/package.json +++ b/packages/bsky/package.json @@ -27,9 +27,12 @@ "buf:gen": "buf generate ../bsync/proto && buf generate ./proto" }, "dependencies": { + "@atproto-labs/fetch-node": "workspace:*", + "@atproto-labs/xrpc-utils": "workspace:*", "@atproto/api": "workspace:^", "@atproto/common": "workspace:^", "@atproto/crypto": "workspace:^", + "@atproto/did": "workspace:^", "@atproto/identity": "workspace:^", "@atproto/lexicon": "workspace:^", "@atproto/repo": "workspace:^", @@ -41,6 +44,7 @@ "@connectrpc/connect-express": "^1.1.4", "@connectrpc/connect-node": "^1.1.4", "@did-plc/lib": "^0.0.1", + "@types/http-errors": "^2.0.1", "compression": "^1.7.4", "cors": "^2.8.5", "express": "^4.17.2", @@ -60,7 +64,8 @@ "statsig-node": "^5.23.1", "structured-headers": "^1.0.1", "typed-emitter": "^2.1.0", - "uint8arrays": "3.0.0" + "uint8arrays": "3.0.0", + "undici": "^6.19.8" }, "devDependencies": { "@atproto/api": "workspace:^", @@ -76,7 +81,6 @@ "@types/express-serve-static-core": "^4.17.36", "@types/pg": "^8.6.6", "@types/qs": "^6.9.7", - "axios": "^0.27.2", "jest": "^28.1.2", "ts-node": "^10.8.2", "typescript": "^5.6.3" diff --git a/packages/bsky/src/api/blob-dispatcher.ts b/packages/bsky/src/api/blob-dispatcher.ts new file mode 100644 index 00000000000..24dd9694571 --- /dev/null +++ b/packages/bsky/src/api/blob-dispatcher.ts @@ -0,0 +1,38 @@ +import { isUnicastIp, unicastLookup } from '@atproto-labs/fetch-node' +import { Agent, Dispatcher, Pool, RetryAgent } from 'undici' + +import { ServerConfig } from '../config' +import { RETRYABLE_HTTP_STATUS_CODES } from '../util/retry' + +export function createBlobDispatcher(cfg: ServerConfig): Dispatcher { + const baseDispatcher = new Agent({ + allowH2: cfg.proxyAllowHTTP2, // This is experimental + headersTimeout: cfg.proxyHeadersTimeout, + maxResponseSize: cfg.proxyMaxResponseSize, + bodyTimeout: cfg.proxyBodyTimeout, + factory: cfg.disableSsrfProtection + ? undefined + : (origin, opts) => { + const { protocol, hostname } = + origin instanceof URL ? origin : new URL(origin) + if (protocol !== 'https:') { + throw new Error(`Forbidden protocol "${protocol}"`) + } + if (isUnicastIp(hostname) === false) { + throw new Error('Hostname resolved to non-unicast address') + } + return new Pool(origin, opts) + }, + connect: { + lookup: cfg.disableSsrfProtection ? undefined : unicastLookup, + }, + }) + + return cfg.proxyMaxRetries > 0 + ? new RetryAgent(baseDispatcher, { + statusCodes: [...RETRYABLE_HTTP_STATUS_CODES], + methods: ['GET', 'HEAD'], + maxRetries: cfg.proxyMaxRetries, + }) + : baseDispatcher +} diff --git a/packages/bsky/src/api/blob-resolver.ts b/packages/bsky/src/api/blob-resolver.ts index facb70c4d6f..89e4a3e716c 100644 --- a/packages/bsky/src/api/blob-resolver.ts +++ b/packages/bsky/src/api/blob-resolver.ts @@ -1,97 +1,278 @@ -import { pipeline, Readable } from 'stream' -import express from 'express' -import createError from 'http-errors' -import axios, { AxiosError } from 'axios' +import { + ACCEPT_ENCODING_COMPRESSED, + ACCEPT_ENCODING_UNCOMPRESSED, + buildProxiedContentEncoding, + formatAcceptHeader, +} from '@atproto-labs/xrpc-utils' +import { + createDecoders, + VerifyCidError, + VerifyCidTransform, +} from '@atproto/common' +import { AtprotoDid, isAtprotoDid } from '@atproto/did' +import createError, { isHttpError } from 'http-errors' import { CID } from 'multiformats/cid' -import { ensureValidDid } from '@atproto/syntax' -import { forwardStreamErrors, VerifyCidTransform } from '@atproto/common' -import { DidNotFoundError } from '@atproto/identity' +import { Duplex, Transform, Writable } from 'node:stream' +import { pipeline } from 'node:stream/promises' +import { Dispatcher } from 'undici' + +import { ServerConfig } from '../config' import AppContext from '../context' -import { httpLogger as log } from '../logger' -import { retryHttp } from '../util/retry' import { Code, + DataPlaneClient, getServiceEndpoint, isDataplaneError, unpackIdentityServices, } from '../data-plane' +import { parseCid } from '../hydration/util' +import { httpLogger as log } from '../logger' +import { Middleware, proxyResponseHeaders, responseSignal } from '../util/http' -// Resolve and verify blob from its origin host +export function createMiddleware(ctx: AppContext): Middleware { + return async (req, res, next) => { + if (req.method !== 'GET' && req.method !== 'HEAD') return next() + if (!req.url?.startsWith('/blob/')) return next() + const { length, 2: didParam, 3: cidParam } = req.url.split('/') + if (length !== 4 || !didParam || !cidParam) return next() -export const createRouter = (ctx: AppContext): express.Router => { - const router = express.Router() + // @TODO Check sec-fetch-* headers (e.g. to prevent files from being + // displayed as a web page) ? - router.get('/blob/:did/:cid', async function (req, res, next) { try { - const { did, cid: cidStr } = req.params - try { - ensureValidDid(did) - } catch (err) { - return next(createError(400, 'Invalid did')) - } - let cid: CID - try { - cid = CID.parse(cidStr) - } catch (err) { - return next(createError(400, 'Invalid cid')) + const streamOptions: StreamBlobOptions = { + did: didParam, + cid: cidParam, + signal: responseSignal(res), + // Because we will be verifying the CID, we need to ensure that the + // upstream response can be de-compressed. We do this by negotiating the + // "accept-encoding" header based on the downstream client's capabilities. + acceptEncoding: buildProxiedContentEncoding( + req.headers['accept-encoding'], + ctx.cfg.proxyPreferCompressed, + ), } - const verifiedImage = await resolveBlob(ctx, did, cid) + await streamBlob(ctx, streamOptions, (upstream, { cid, did, url }) => { + const encoding = upstream.headers['content-encoding'] + const verifier = createCidVerifier(cid, encoding) - // Send chunked response, destroying stream early (before - // closing chunk) if the bytes don't match the expected cid. - res.statusCode = 200 - res.setHeader('content-type', verifiedImage.contentType) - res.setHeader('x-content-type-options', 'nosniff') - res.setHeader('content-security-policy', `default-src 'none'; sandbox`) - pipeline(verifiedImage.stream, res, (err) => { - if (err) { + const logError = (err: unknown) => { log.warn( - { err, did, cid: cidStr, pds: verifiedImage.pds }, + { err, did, cid: cid.toString(), pds: url.origin }, 'blob resolution failed during transmission', ) } + + const onError = (err: unknown) => { + // No need to pipe the data (verifier) into the response, as it is + // "errored". The response processing will continue in the "catch" + // block below (because streamBlob() will reject the promise in case + // of "error" event on the writable stream returned by the factory). + clearTimeout(graceTimer) + logError(err) + } + + // Catch any error that occurs before the timer bellow is triggered. + // The promise returned by streamBlob() will be rejected as soon as + // the verifier errors. + verifier.on('error', onError) + + // The way I/O work, it is likely that, in case of small payloads, the + // full upstream response is already buffered at this point. In order to + // return a 404 instead of a broken response stream, we allow the event + // loop to to process any pending I/O events before we start piping the + // bytes to the response. For larger payloads, the response will look + // like a 200 with a broken chunked response stream. The only way around + // that would be to buffer the entire response before piping it to the + // response, which will hurt latency (need the full payload) and memory + // usage (either RAM or DISK). Since this is more of an edge case, we + // allow the broken response stream to be sent. + const graceTimer = setTimeout(() => { + verifier.off('error', onError) + + // Make sure that the content served from the bsky api domain cannot + // be used to perform XSS attacks (by serving HTML pages) + res.setHeader( + 'Content-Security-Policy', + `default-src 'none'; sandbox`, + ) + res.setHeader('X-Content-Type-Options', 'nosniff') + res.setHeader('X-Frame-Options', 'DENY') + res.setHeader('X-XSS-Protection', '0') + + // @TODO Add a cache-control header ? + // @TODO Add content-disposition header (to force download) ? + + proxyResponseHeaders(upstream, res) + + // Force chunked encoding. This is required because the verifier will + // trigger an error *after* the last chunk has been passed through. + // Because the number of bytes sent will match the content-length, the + // HTTP response will be considered "complete" by the HTTP server. At + // this point, only trailers headers could indicate that an error + // occurred, but that is not the behavior we expect. + res.removeHeader('content-length') + + // From this point on, triggering the next middleware (including any + // error handler) can be problematic because content-type, + // content-enconding, etc. headers have already been set. Because of + // this, we make sure that res.headersSent is set to true, preventing + // another error handler middleware from being called (from the catch + // block bellow). Not flushing the headers here would require to + // revert the headers set from this middleware (which we don't do for + // now). + res.flushHeaders() + + // Pipe the verifier output into the HTTP response + void pipeline([verifier, res]).catch(logError) + }, 10) // 0 works too. Allow for additional data to come in for 10ms. + + // Write the upstream response into the verifier. + return verifier }) } catch (err) { - if (err instanceof AxiosError) { - if (err.code === AxiosError.ETIMEDOUT) { - log.warn( - { host: err.request?.host, path: err.request?.path }, - 'blob resolution timeout', - ) - return next(createError(504)) // Gateway timeout - } - if (!err.response || err.response.status >= 500) { + if (res.headersSent || res.destroyed) { + res.destroy() + } else if (err instanceof VerifyCidError) { + // @NOTE This only works because of the graceTimer above. It will also + // only be triggered for small payloads. + next(createError(404, err.message)) + } else if (isHttpError(err)) { + next(err) + } else { + next(createError(502, 'Upstream Error', { cause: err })) + } + } + } +} + +export type StreamBlobOptions = { + cid: string + did: string + acceptEncoding?: string + signal?: AbortSignal +} + +export type StreamBlobFactory = ( + data: Dispatcher.StreamFactoryData, + info: { + url: URL + did: AtprotoDid + cid: CID + }, +) => Writable + +export async function streamBlob( + ctx: AppContext, + options: StreamBlobOptions, + factory: StreamBlobFactory, +) { + const { did, cid } = parseBlobParams(options) + const url = await getBlobUrl(ctx.dataplane, did, cid) + + const headers = getBlobHeaders(ctx.cfg, url) + + headers.set( + 'accept-encoding', + options.acceptEncoding || + formatAcceptHeader( + ctx.cfg.proxyPreferCompressed + ? ACCEPT_ENCODING_COMPRESSED + : ACCEPT_ENCODING_UNCOMPRESSED, + ), + ) + + let headersReceived = false + + return ctx.blobDispatcher + .stream( + { + method: 'GET', + origin: url.origin, + path: url.pathname + url.search, + headers, + signal: options.signal, + }, + (upstream) => { + headersReceived = true + + if (upstream.statusCode !== 200) { log.warn( - { host: err.request?.host, path: err.request?.path }, - 'blob resolution failed upstream', + { + did, + cid: cid.toString(), + pds: url.origin, + status: upstream.statusCode, + }, + `blob resolution failed upstream`, ) - return next(createError(502)) + + throw upstream.statusCode >= 400 && upstream.statusCode < 500 + ? createError(404, 'Blob not found', { cause: upstream }) // 4xx => 404 + : createError(502, 'Upstream Error', { cause: upstream }) // !200 && !4xx => 502 } - return next(createError(404, 'Blob not found')) - } - if (err instanceof DidNotFoundError) { - return next(createError(404, 'Blob not found')) + + return factory(upstream, { url, did, cid }) + }, + ) + .catch((err) => { + // Is this a connection error, or a stream error ? + if (!headersReceived) { + // connection error, dns error, headers timeout, ... + log.warn( + { err, did, cid: cid.toString(), pds: url.origin }, + 'blob resolution failed during connection', + ) + + throw createError(502, 'Upstream Error', { cause: err }) } - return next(err) - } - }) - return router + throw err + }) +} + +function parseBlobParams(params: { cid: string; did: string }) { + const { cid, did } = params + if (!isAtprotoDid(did)) throw createError(400, 'Invalid did') + const cidObj = parseCid(cid) + if (!cidObj) throw createError(400, 'Invalid cid') + return { cid: cidObj, did } } -export async function resolveBlob(ctx: AppContext, did: string, cid: CID) { - const cidStr = cid.toString() +async function getBlobUrl( + dataplane: DataPlaneClient, + did: string, + cid: CID, +): Promise { + const pds = await getBlobPds(dataplane, did, cid) + + const url = new URL(`/xrpc/com.atproto.sync.getBlob`, pds) + url.searchParams.set('did', did) + url.searchParams.set('cid', cid.toString()) + + return url +} +async function getBlobPds( + dataplane: DataPlaneClient, + did: string, + cid: CID, +): Promise { const [identity, { takenDown }] = await Promise.all([ - ctx.dataplane.getIdentityByDid({ did }).catch((err) => { + dataplane.getIdentityByDid({ did }).catch((err) => { if (isDataplaneError(err, Code.NotFound)) { return undefined } throw err }), - ctx.dataplane.getBlobTakedown({ did, cid: cid.toString() }), + dataplane.getBlobTakedown({ did, cid: cid.toString() }), ]) + + if (takenDown) { + throw createError(404, 'Blob not found') + } + const services = identity && unpackIdentityServices(identity.services) const pds = services && @@ -99,62 +280,116 @@ export async function resolveBlob(ctx: AppContext, did: string, cid: CID) { id: 'atproto_pds', type: 'AtprotoPersonalDataServer', }) + if (!pds) { throw createError(404, 'Origin not found') } - if (takenDown) { - throw createError(404, 'Blob not found') - } - const blobResult = await retryHttp(() => - getBlob(ctx, { pds, did, cid: cidStr }), - ) - const imageStream: Readable = blobResult.data - const verifyCid = new VerifyCidTransform(cid) - - forwardStreamErrors(imageStream, verifyCid) - return { - pds, - contentType: - blobResult.headers['content-type'] || 'application/octet-stream', - stream: imageStream.pipe(verifyCid), - } -} - -async function getBlob( - ctx: AppContext, - opts: { pds: string; did: string; cid: string }, -) { - const { pds, did, cid } = opts - return axios.get(`${pds}/xrpc/com.atproto.sync.getBlob`, { - params: { did, cid }, - decompress: true, - responseType: 'stream', - timeout: 5000, // 5sec of inactivity on the connection - headers: getRateLimitBypassHeaders(ctx, pds), - }) + return pds } -function getRateLimitBypassHeaders( - ctx: AppContext, - pds: string, -): { 'x-ratelimit-bypass'?: string } { - const { +function getBlobHeaders( + { blobRateLimitBypassKey: bypassKey, blobRateLimitBypassHostname: bypassHostname, - } = ctx.cfg - if (!bypassKey || !bypassHostname) { - return {} - } - const url = new URL(pds) - if (bypassHostname.startsWith('.')) { - if (url.hostname.endsWith(bypassHostname)) { - return { 'x-ratelimit-bypass': bypassKey } - } - } else { - if (url.hostname === bypassHostname) { - return { 'x-ratelimit-bypass': bypassKey } + }: ServerConfig, + url: URL, +): Map { + const headers = new Map() + + if (bypassKey && bypassHostname) { + const matchesUrl = bypassHostname.startsWith('.') + ? url.hostname.endsWith(bypassHostname) + : url.hostname === bypassHostname + + if (matchesUrl) { + headers.set('x-ratelimit-bypass', bypassKey) } } - return {} + + return headers +} + +/** + * This function creates a passthrough stream that will decompress (if needed) + * and verify the CID of the input stream. The output data will be identical to + * the input data. + * + * If you need the un-compressed data, you should use a decompress + verify + * pipeline instead. + */ +function createCidVerifier(cid: CID, encoding?: string | string[]): Duplex { + // If the upstream content is compressed, we do not want to return a + // de-compressed stream here. Indeed, the "compression" middleware will + // compress the response before it is sent downstream, if it is not already + // compressed. Because of this, it is preferable to return the content as-is + // to avoid re-compressing it. + // + // We do still want to be able to verify the CID, which requires decompressing + // the input bytes. + // + // To that end, we create a passthrough in order to "tee" the stream into two + // streams: one that will be sent, unaltered, downstream, and a pipeline that + // will be used to decompress & verify the CID (discarding de-compressed + // data). + + const decoders = createDecoders(encoding) + const verifier = new VerifyCidTransform(cid) + + // Optimization: If the content is not compressed, we don't need to "tee" the + // stream, we can use the verifier as simple passthrough. + if (!decoders.length) return verifier + + const pipelineController = new AbortController() + const pipelineStreams: Duplex[] = [...decoders, verifier] + const pipelineInput = pipelineStreams[0]! + + // Create a promise that will resolve if, and only if, the decoding and + // verification succeed. + const pipelinePromise: Promise = pipeline(pipelineStreams, { + signal: pipelineController.signal, + }).then( + () => null, + (err) => { + const error = asError(err) + + // the data being processed by the pipeline is invalid (e.g. invalid + // compressed content, non-matching the CID, ...). If that occurs, we can + // destroy the passthrough (this allows not to wait for the "flush" event + // to propagate the error). + passthrough.destroy(error) + + return error + }, + ) + + // We don't care about the un-compressed data, we only use the verifier to + // detect any error through the pipelinePromise. We still need to pass the + // verifier into flowing mode to ensure that the pipelinePromise resolves. + verifier.resume() + + const passthrough = new Transform({ + transform(chunk, encoding, callback) { + pipelineInput.write(chunk, encoding) + callback(null, chunk) + }, + flush(callback) { + // End the input stream, which will resolve the pipeline promise + pipelineInput.end() + // End the pass-through stream according to the result of the pipeline + pipelinePromise.then(callback) + }, + destroy(err, callback) { + pipelineController.abort() // Causes pipeline() to destroy all streams + callback(err) + }, + }) + + return passthrough +} + +function asError(err: unknown): Error { + return err instanceof Error + ? err + : new Error('Processing failed', { cause: err }) } diff --git a/packages/bsky/src/config.ts b/packages/bsky/src/config.ts index 4a7b92b4566..944166e8f72 100644 --- a/packages/bsky/src/config.ts +++ b/packages/bsky/src/config.ts @@ -50,6 +50,14 @@ export interface ServerConfigValues { // client config clientCheckEmailConfirmed?: boolean topicsEnabled?: boolean + // http proxy agent + disableSsrfProtection?: boolean + proxyAllowHTTP2?: boolean + proxyHeadersTimeout?: number + proxyBodyTimeout?: number + proxyMaxResponseSize?: number + proxyMaxRetries?: number + proxyPreferCompressed?: boolean } export class ServerConfig { @@ -58,7 +66,9 @@ export class ServerConfig { static readEnv(overrides?: Partial) { const version = process.env.BSKY_VERSION || undefined - const debugMode = process.env.NODE_ENV !== 'production' + const debugMode = + // Because security related features are disabled in development mode, this requires explicit opt-in. + process.env.NODE_ENV === 'development' || process.env.NODE_ENV === 'test' const publicUrl = process.env.BSKY_PUBLIC_URL || undefined const serverDid = process.env.BSKY_SERVER_DID || 'did:example:test' const envPort = parseInt(process.env.BSKY_PORT || '', 10) @@ -150,6 +160,23 @@ export class ServerConfig { const maxThreadDepth = process.env.BSKY_MAX_THREAD_DEPTH ? parseInt(process.env.BSKY_MAX_THREAD_DEPTH || '', 10) : undefined + + const disableSsrfProtection = process.env.BSKY_DISABLE_SSRF_PROTECTION + ? process.env.BSKY_DISABLE_SSRF_PROTECTION === 'true' + : debugMode + + const proxyAllowHTTP2 = process.env.BSKY_PROXY_ALLOW_HTTP2 === 'true' + const proxyHeadersTimeout = + parseInt(process.env.BSKY_PROXY_HEADERS_TIMEOUT || '', 10) || undefined + const proxyBodyTimeout = + parseInt(process.env.BSKY_PROXY_BODY_TIMEOUT || '', 10) || undefined + const proxyMaxResponseSize = + parseInt(process.env.BSKY_PROXY_MAX_RESPONSE_SIZE || '', 10) || undefined + const proxyMaxRetries = + parseInt(process.env.BSKY_PROXY_MAX_RETRIES || '', 10) || undefined + const proxyPreferCompressed = + process.env.BSKY_PROXY_PREFER_COMPRESSED === 'true' + return new ServerConfig({ version, debugMode, @@ -193,6 +220,13 @@ export class ServerConfig { bigThreadUris, bigThreadDepth, maxThreadDepth, + disableSsrfProtection, + proxyAllowHTTP2, + proxyHeadersTimeout, + proxyBodyTimeout, + proxyMaxResponseSize, + proxyMaxRetries, + proxyPreferCompressed, ...stripUndefineds(overrides ?? {}), }) } @@ -217,11 +251,6 @@ export class ServerConfig { return this.assignedPort || this.cfg.port } - get localUrl() { - assert(this.port, 'No port assigned') - return `http://localhost:${this.port}` - } - get publicUrl() { return this.cfg.publicUrl } @@ -377,6 +406,34 @@ export class ServerConfig { get maxThreadDepth() { return this.cfg.maxThreadDepth } + + get disableSsrfProtection(): boolean { + return this.cfg.disableSsrfProtection ?? false + } + + get proxyAllowHTTP2(): boolean { + return this.cfg.proxyAllowHTTP2 ?? false + } + + get proxyHeadersTimeout(): number { + return this.cfg.proxyHeadersTimeout ?? 30e3 + } + + get proxyBodyTimeout(): number { + return this.cfg.proxyBodyTimeout ?? 30e3 + } + + get proxyMaxResponseSize(): number { + return this.cfg.proxyMaxResponseSize ?? 10 * 1024 * 1024 // 10mb + } + + get proxyMaxRetries(): number { + return this.cfg.proxyMaxRetries ?? 3 + } + + get proxyPreferCompressed(): boolean { + return this.cfg.proxyPreferCompressed ?? true + } } function stripUndefineds( diff --git a/packages/bsky/src/context.ts b/packages/bsky/src/context.ts index db59d119f89..06e48ba8cf4 100644 --- a/packages/bsky/src/context.ts +++ b/packages/bsky/src/context.ts @@ -17,6 +17,7 @@ import { parseLabelerHeader, } from './util' import { httpLogger as log } from './logger' +import { Dispatcher } from 'undici' export class AppContext { constructor( @@ -34,6 +35,7 @@ export class AppContext { courierClient: CourierClient | undefined authVerifier: AuthVerifier featureGates: FeatureGates + blobDispatcher: Dispatcher }, ) {} @@ -93,6 +95,10 @@ export class AppContext { return this.opts.featureGates } + get blobDispatcher(): Dispatcher { + return this.opts.blobDispatcher + } + reqLabelers(req: express.Request): ParsedLabelers { const val = req.header('atproto-accept-labelers') let parsed: ParsedLabelers | null diff --git a/packages/bsky/src/data-plane/server/indexing/index.ts b/packages/bsky/src/data-plane/server/indexing/index.ts index eb07b1d123d..66f9f98f6b4 100644 --- a/packages/bsky/src/data-plane/server/indexing/index.ts +++ b/packages/bsky/src/data-plane/server/indexing/index.ts @@ -31,7 +31,7 @@ import * as Labeler from './plugins/labeler' import * as ChatDeclaration from './plugins/chat-declaration' import RecordProcessor from './processor' import { subLogger } from '../../../logger' -import { retryHttp } from '../../../util/retry' +import { retryXrpc } from '../../../util/retry' import { BackgroundQueue } from '../background' export class IndexingService { @@ -165,7 +165,7 @@ export class IndexingService { ) const { api } = new AtpAgent({ service: pds }) - const { data: car } = await retryHttp(() => + const { data: car } = await retryXrpc(() => api.com.atproto.sync.getRepo({ did }), ) const { root, blocks } = await readCarWithRoot(car) @@ -287,7 +287,7 @@ export class IndexingService { if (!pds) return false const { api } = new AtpAgent({ service: pds }) try { - await retryHttp(() => api.com.atproto.sync.getLatestCommit({ did })) + await retryXrpc(() => api.com.atproto.sync.getLatestCommit({ did })) return true } catch (err) { if (err instanceof ComAtprotoSyncGetLatestCommit.RepoNotFoundError) { diff --git a/packages/bsky/src/image/server.ts b/packages/bsky/src/image/server.ts index dd98738b890..3ee752b25a0 100644 --- a/packages/bsky/src/image/server.ts +++ b/packages/bsky/src/image/server.ts @@ -1,136 +1,170 @@ -import fs from 'fs/promises' -import fsSync from 'fs' -import os from 'os' -import path from 'path' -import { Readable } from 'stream' -import axios, { AxiosError } from 'axios' -import express, { - Request, - Response, - Express, - ErrorRequestHandler, - NextFunction, -} from 'express' -import createError, { isHttpError } from 'http-errors' -import { BlobNotFoundError } from '@atproto/repo' import { cloneStream, - forwardStreamErrors, + createDecoders, isErrnoException, + VerifyCidError, + VerifyCidTransform, } from '@atproto/common' -import { BadPathError, ImageUriBuilder } from './uri' +import { BlobNotFoundError } from '@atproto/repo' +import createError, { isHttpError } from 'http-errors' +import fsSync from 'node:fs' +import fs from 'node:fs/promises' +import os from 'node:os' +import path from 'node:path' +import { Duplex, Readable } from 'node:stream' +import { pipeline } from 'node:stream/promises' + +import { streamBlob, StreamBlobOptions } from '../api/blob-resolver' +import AppContext from '../context' +import { Middleware, responseSignal } from '../util/http' import log from './logger' -import { resize } from './sharp' -import { formatsToMimes, Options } from './util' -import { retryHttp } from '../util/retry' -import { ServerConfig } from '../config' - -export class ImageProcessingServer { - app: Express = express() - uriBuilder: ImageUriBuilder - - constructor( - public cfg: ServerConfig, - public cache: BlobCache, - ) { - this.uriBuilder = new ImageUriBuilder('') - this.app.get('*', this.handler.bind(this)) - this.app.use(errorMiddleware) +import { createImageProcessor, createImageUpscaler } from './sharp' +import { BadPathError, ImageUriBuilder } from './uri' +import { formatsToMimes, Options, SharpInfo } from './util' + +export function createMiddleware( + ctx: AppContext, + { prefix = '/' }: { prefix?: string } = {}, +): Middleware { + if (!prefix.startsWith('/') || !prefix.endsWith('/')) { + throw new TypeError('Prefix must start and end with a slash') + } + + // If there is a CDN, we don't need to serve images + if (ctx.cfg.cdnUrl) { + return (req, res, next) => next() } - async handler(req: Request, res: Response, next: NextFunction) { + const cache = new BlobDiskCache(ctx.cfg.blobCacheLocation) + + return async (req, res, next) => { + if (res.destroyed) return + if (req.method !== 'GET' && req.method !== 'HEAD') return next() + if (!req.url?.startsWith(prefix)) return next() + const { 0: path, 1: _search } = req.url.slice(prefix.length - 1).split('?') + if (!path.startsWith('/') || path === '/') return next() + try { - const path = req.path const options = ImageUriBuilder.getOptions(path) - const cacheKey = [ - options.did, - options.cid.toString(), - options.preset, - ].join('::') + + const cacheKey = [options.did, options.cid, options.preset].join('::') // Cached flow try { - const cachedImage = await this.cache.get(cacheKey) + const cachedImage = await cache.get(cacheKey) res.statusCode = 200 res.setHeader('x-cache', 'hit') res.setHeader('content-type', getMime(options.format)) res.setHeader('cache-control', `public, max-age=31536000`) // 1 year res.setHeader('content-length', cachedImage.size) - forwardStreamErrors(cachedImage, res) - return cachedImage.pipe(res) + await pipeline(cachedImage, res) + return } catch (err) { - // Ignore BlobNotFoundError and move on to non-cached flow - if (!(err instanceof BlobNotFoundError)) throw err + if (!(err instanceof BlobNotFoundError)) { + log.error({ cacheKey, err }, 'failed to serve cached image') + } + + if (res.headersSent || res.destroyed) { + res.destroy() + return // nothing we can do... + } else { + // Ignore and move on to non-cached flow. + res.removeHeader('x-cache') + res.removeHeader('content-type') + res.removeHeader('cache-control') + res.removeHeader('content-length') + } } // Non-cached flow - const { localUrl } = this.cfg - const did = options.did - const cidStr = options.cid.toString() - - const blobResult = await retryHttp(() => - getBlob({ baseUrl: localUrl, did, cid: cidStr }), - ) - - const imageStream: Readable = blobResult.data - const processedImage = await resize(imageStream, options) - - // Cache in the background - this.cache - .put(cacheKey, cloneStream(processedImage)) - .catch((err) => log.error(err, 'failed to cache image')) - // Respond - res.statusCode = 200 - res.setHeader('x-cache', 'miss') - res.setHeader('content-type', getMime(options.format)) - res.setHeader('cache-control', `public, max-age=31536000`) // 1 year - forwardStreamErrors(processedImage, res) - return ( - processedImage - // @NOTE sharp does emit this in time to be set as a header - .once('info', (info) => res.setHeader('content-length', info.size)) - .pipe(res) - ) - } catch (err: unknown) { - if (err instanceof BadPathError) { - return next(createError(400, err)) + const streamOptions: StreamBlobOptions = { + did: options.did, + cid: options.cid, + signal: responseSignal(res), } - if (err instanceof AxiosError) { - if (err.code === AxiosError.ETIMEDOUT) { - return next(createError(504)) // Gateway timeout - } - if (!err.response || err.response.status >= 500) { - return next(createError(502)) + + await streamBlob(ctx, streamOptions, (upstream, { did, cid, url }) => { + // Definitely not an image ? Let's fail right away. + if (isImageMime(upstream.headers['content-type']) === false) { + throw createError(400, 'Not an image') } - if (err.response.status === 400) { - return next(createError(400)) + + // Let's transform (decompress, verify CID, upscale), process and respond + + const transforms: Duplex[] = [ + ...createDecoders(upstream.headers['content-encoding']), + new VerifyCidTransform(cid), + createImageUpscaler(options), + ] + const processor = createImageProcessor(options) + + // Cache in the background + cache + .put(cacheKey, cloneStream(processor)) + .catch((err) => log.error(err, 'failed to cache image')) + + res.statusCode = 200 + res.setHeader('cache-control', `public, max-age=31536000`) // 1 year + res.setHeader('x-cache', 'miss') + processor.once('info', ({ size, format }: SharpInfo) => { + const type = formatsToMimes.get(format) || 'application/octet-stream' + + // @NOTE sharp does emit this in time to be set as a header + res.setHeader('content-length', size) + res.setHeader('content-type', type) + }) + + const streams = [...transforms, processor, res] + void pipeline(streams).catch((err: unknown) => { + log.warn( + { err, did, cid: cid.toString(), pds: url.origin }, + 'blob resolution failed during transmission', + ) + }) + + return streams[0]! + }) + } catch (err) { + if (res.headersSent || res.destroyed) { + res.destroy() + } else { + res.removeHeader('content-type') + res.removeHeader('content-length') + res.removeHeader('cache-control') + res.removeHeader('x-cache') + + if (err instanceof BadPathError) { + next(createError(400, err)) + } else if (err instanceof VerifyCidError) { + next(createError(404, 'Blob not found', err)) + } else if (isHttpError(err)) { + next(err) + } else { + next(createError(502, 'Upstream Error', { cause: err })) } - return next(createError(404, 'Image not found')) } - return next(err) } } } -const errorMiddleware: ErrorRequestHandler = function (err, _req, res, next) { - if (isHttpError(err)) { - log.error(err, `error: ${err.message}`) - } else { - log.error(err, 'unhandled exception') +function isImageMime( + contentType: string | string[] | undefined, +): undefined | boolean { + if (contentType == null || contentType === 'application/octet-stream') { + return undefined // maybe } - if (res.headersSent) { - return next(err) + if (Array.isArray(contentType)) { + if (contentType.length === 0) return undefined // should never happen + if (contentType.length === 1) return isImageMime(contentType[0]) + return contentType.every(isImageMime) // Should we throw a 502 here? } - const httpError = createError(err) - return res.status(httpError.status).json({ - message: httpError.expose ? httpError.message : 'Internal Server Error', - }) + return contentType.startsWith('image/') } function getMime(format: Options['format']) { - const mime = formatsToMimes[format] + const mime = formatsToMimes.get(format) if (!mime) throw new Error('Unknown format') return mime } @@ -193,13 +227,3 @@ export class BlobDiskCache implements BlobCache { await fs.rm(this.tempDir, { recursive: true, force: true }) } } - -function getBlob(opts: { baseUrl: string; did: string; cid: string }) { - const { baseUrl, did, cid } = opts - const enc = encodeURIComponent - return axios.get(`${baseUrl}/blob/${enc(did)}/${enc(cid)}`, { - decompress: true, - responseType: 'stream', - timeout: 2000, // 2sec of inactivity on the connection - }) -} diff --git a/packages/bsky/src/image/sharp.ts b/packages/bsky/src/image/sharp.ts index 1edc7a58835..0df4dc9e333 100644 --- a/packages/bsky/src/image/sharp.ts +++ b/packages/bsky/src/image/sharp.ts @@ -1,87 +1,83 @@ -import { Readable } from 'stream' -import { pipeline } from 'stream/promises' +import { errHasMsg } from '@atproto/common' +import { PassThrough, Readable } from 'node:stream' +import { pipeline } from 'node:stream/promises' import sharp from 'sharp' -import { errHasMsg, forwardStreamErrors } from '@atproto/common' import { formatsToMimes, ImageInfo, Options } from './util' export type { Options } -export async function resize( - stream: Readable, - options: Options, -): Promise { - const { height, width, min = false, fit = 'cover', format, quality } = options - - let processor = sharp() - - // Scale up to hit any specified minimum size - if (typeof min !== 'boolean') { - const upsizeProcessor = sharp().resize({ - fit: 'outside', - width: min.width, - height: min.height, - withoutReduction: true, - withoutEnlargement: false, - }) - forwardStreamErrors(stream, upsizeProcessor) - stream = stream.pipe(upsizeProcessor) - } +/** + * Scale up to hit any specified minimum size + */ +export function createImageUpscaler({ min = false }: Options) { + // Due to the way sharp works, up-scaling must happen in a separate processor + // than down-scaling. + return typeof min !== 'boolean' + ? sharp().resize({ + fit: 'outside', + width: min.width, + height: min.height, + withoutReduction: true, + withoutEnlargement: false, + }) + : new PassThrough() +} - // Scale down (or possibly up if min is true) to desired size - processor = processor.resize({ +/** + * Scale down (or possibly up if min is true) to desired size, then compress + * to the desired format. + */ +export function createImageProcessor({ + height, + width, + min = false, + fit = 'cover', + format, + quality = 100, +}: Options) { + const processor = sharp().resize({ fit, width, height, withoutEnlargement: min !== true, }) - // Output to specified format if (format === 'jpeg') { - processor = processor.jpeg({ quality: quality ?? 100 }) + return processor.jpeg({ quality }) } else if (format === 'png') { - processor = processor.png({ quality: quality ?? 100 }) + return processor.png({ quality }) } else { - const exhaustiveCheck: never = format - throw new Error(`Unhandled case: ${exhaustiveCheck}`) + throw new Error(`Unhandled case: ${format}`) } - - forwardStreamErrors(stream, processor) - return stream.pipe(processor) } export async function maybeGetInfo( stream: Readable, ): Promise { - let metadata: sharp.Metadata try { const processor = sharp() - const [result] = await Promise.all([ + + const [{ size, height, width, format }] = await Promise.all([ processor.metadata(), pipeline(stream, processor), // Handles error propagation ]) - metadata = result + + if (size == null || height == null || width == null || format == null) { + return null + } + + return { + height, + width, + size, + mime: formatsToMimes.get(format) ?? 'unknown', + } } catch (err) { if (errHasMsg(err, 'Input buffer contains unsupported image format')) { return null } throw err } - const { size, height, width, format } = metadata - if ( - size === undefined || - height === undefined || - width === undefined || - format === undefined - ) { - return null - } - - return { - height, - width, - size, - mime: formatsToMimes[format] ?? ('unknown' as const), - } } export async function getInfo(stream: Readable): Promise { diff --git a/packages/bsky/src/image/util.ts b/packages/bsky/src/image/util.ts index ce18ba343d5..07a58fcc369 100644 --- a/packages/bsky/src/image/util.ts +++ b/packages/bsky/src/image/util.ts @@ -1,4 +1,6 @@ -import { FormatEnum } from 'sharp' +import { FormatEnum, OutputInfo } from 'sharp' + +export type ImageMime = `image/${string}` export type Options = Dimensions & { format: 'jpeg' | 'png' @@ -15,18 +17,24 @@ export type Options = Dimensions & { export type ImageInfo = Dimensions & { size: number - mime: `image/${string}` | 'unknown' + mime: ImageMime | 'unknown' } export type Dimensions = { height: number; width: number } -export const formatsToMimes: { [s in keyof FormatEnum]?: `image/${string}` } = { - jpg: 'image/jpeg', - jpeg: 'image/jpeg', - png: 'image/png', - gif: 'image/gif', - svg: 'image/svg+xml', - tif: 'image/tiff', - tiff: 'image/tiff', - webp: 'image/webp', -} +export const formatsToMimes = new Map([ + ['jpg', 'image/jpeg'], + ['jpeg', 'image/jpeg'], + ['png', 'image/png'], + ['gif', 'image/gif'], + ['svg', 'image/svg+xml'], + ['tif', 'image/tiff'], + ['tiff', 'image/tiff'], + ['webp', 'image/webp'], + ['avif', 'image/avif'], + ['heif', 'image/heif'], + ['jp2', 'image/jp2'], + ['jxl', 'image/jxl'], +]) + +export type SharpInfo = OutputInfo & { format: keyof FormatEnum } diff --git a/packages/bsky/src/index.ts b/packages/bsky/src/index.ts index a87e24f3f13..3c3f826df28 100644 --- a/packages/bsky/src/index.ts +++ b/packages/bsky/src/index.ts @@ -8,15 +8,15 @@ import compression from 'compression' import { AtpAgent } from '@atproto/api' import { IdResolver } from '@atproto/identity' import { DAY, SECOND } from '@atproto/common' +import { Keypair } from '@atproto/crypto' import API, { health, wellKnown, blobResolver } from './api' import * as error from './error' import { loggerMiddleware } from './logger' import { ServerConfig } from './config' import { createServer } from './lexicon' import { ImageUriBuilder } from './image/uri' -import { BlobDiskCache, ImageProcessingServer } from './image/server' +import * as imageServer from './image/server' import AppContext from './context' -import { Keypair } from '@atproto/crypto' import { createDataPlaneClient } from './data-plane/client' import { Hydrator } from './hydration/hydrator' import { Views } from './views' @@ -25,6 +25,7 @@ import { authWithApiKey as bsyncAuth, createBsyncClient } from './bsync' import { authWithApiKey as courierAuth, createCourierClient } from './courier' import { FeatureGates } from './feature-gates' import { VideoUriBuilder } from './views/util' +import { createBlobDispatcher } from './api/blob-dispatcher' export * from './data-plane' export type { ServerConfigValues } from './config' @@ -73,15 +74,6 @@ export class BskyAppView { `${config.publicUrl}/vid/%s/%s/thumbnail.jpg`, }) - let imgProcessingServer: ImageProcessingServer | undefined - if (!config.cdnUrl) { - const imgProcessingCache = new BlobDiskCache(config.blobCacheLocation) - imgProcessingServer = new ImageProcessingServer( - config, - imgProcessingCache, - ) - } - const searchAgent = config.searchUrl ? new AtpAgent({ service: config.searchUrl }) : undefined @@ -151,6 +143,8 @@ export class BskyAppView { env: config.statsigEnv, }) + const blobDispatcher = createBlobDispatcher(config) + const ctx = new AppContext({ cfg: config, dataplane, @@ -165,6 +159,7 @@ export class BskyAppView { courierClient, authVerifier, featureGates, + blobDispatcher, }) let server = createServer({ @@ -180,10 +175,8 @@ export class BskyAppView { app.use(health.createRouter(ctx)) app.use(wellKnown.createRouter(ctx)) - app.use(blobResolver.createRouter(ctx)) - if (imgProcessingServer) { - app.use('/img', imgProcessingServer.app) - } + app.use(blobResolver.createMiddleware(ctx)) + app.use(imageServer.createMiddleware(ctx, { prefix: '/img/' })) app.use(server.xrpc.router) app.use(error.handler) diff --git a/packages/bsky/src/util/http.ts b/packages/bsky/src/util/http.ts new file mode 100644 index 00000000000..fcc980d6ddf --- /dev/null +++ b/packages/bsky/src/util/http.ts @@ -0,0 +1,41 @@ +import createHttpError from 'http-errors' +import { IncomingMessage, ServerResponse } from 'node:http' +import { IncomingHttpHeaders } from 'undici/types/header' + +type NextFunction = (err?: unknown) => void + +export type Middleware = ( + req: IncomingMessage, + res: ServerResponse, + next: NextFunction, +) => void + +export type ResponseData = { statusCode: number; headers: IncomingHttpHeaders } + +const RESPONSE_HEADERS_TO_PROXY = new Set([ + 'content-type', + 'content-length', + 'content-encoding', + 'content-language', + 'cache-control', + 'last-modified', + 'etag', + 'expires', + 'retry-after', + 'vary', // Might vary based on "accept" headers +] as const satisfies (keyof IncomingHttpHeaders)[]) + +export function proxyResponseHeaders(data: ResponseData, res: ServerResponse) { + res.statusCode = data.statusCode >= 500 ? 502 : data.statusCode + for (const name of RESPONSE_HEADERS_TO_PROXY) { + const val = data.headers[name] + if (val) res.setHeader(name, val) + } +} + +export function responseSignal(res: ServerResponse): AbortSignal { + if (res.destroyed) throw createHttpError(499, 'Client Disconnected') + const controller = new AbortController() + res.once('close', () => controller.abort()) + return controller.signal +} diff --git a/packages/bsky/src/util/retry.ts b/packages/bsky/src/util/retry.ts index 62b1747815e..b9ad99f3955 100644 --- a/packages/bsky/src/util/retry.ts +++ b/packages/bsky/src/util/retry.ts @@ -1,38 +1,14 @@ -import { AxiosError } from 'axios' -import { XRPCError, ResponseType } from '@atproto/xrpc' -import { RetryOptions, retry } from '@atproto/common' -import { Code, ConnectError } from '@connectrpc/connect' +import { createRetryable } from '@atproto/common' +import { ResponseType, XRPCError } from '@atproto/xrpc' -export async function retryHttp( - fn: () => Promise, - opts: RetryOptions = {}, -): Promise { - return retry(fn, { retryable: retryableHttp, ...opts }) -} +export const RETRYABLE_HTTP_STATUS_CODES = new Set([ + 408, 425, 429, 500, 502, 503, 504, 522, 524, +]) -export function retryableHttp(err: unknown) { +export const retryXrpc = createRetryable((err: unknown) => { if (err instanceof XRPCError) { if (err.status === ResponseType.Unknown) return true - return retryableHttpStatusCodes.has(err.status) - } - if (err instanceof AxiosError) { - if (!err.response) return true - return retryableHttpStatusCodes.has(err.response.status) + return RETRYABLE_HTTP_STATUS_CODES.has(err.status) } return false -} - -const retryableHttpStatusCodes = new Set([ - 408, 425, 429, 500, 502, 503, 504, 522, 524, -]) - -export async function retryConnect( - fn: () => Promise, - opts: RetryOptions = {}, -): Promise { - return retry(fn, { retryable: retryableConnect, ...opts }) -} - -export function retryableConnect(err: unknown) { - return err instanceof ConnectError && err.code === Code.Unavailable -} +}) diff --git a/packages/bsky/tests/_util.ts b/packages/bsky/tests/_util.ts index 75c83f1a6d7..30dc53c0eea 100644 --- a/packages/bsky/tests/_util.ts +++ b/packages/bsky/tests/_util.ts @@ -1,14 +1,18 @@ -import { AtUri } from '@atproto/syntax' +import { AppBskyFeedGetPostThread } from '@atproto/api' import { lexToJson } from '@atproto/lexicon' +import { AtUri } from '@atproto/syntax' +import { type Express } from 'express' import { CID } from 'multiformats/cid' +import { Server } from 'node:http' + +import { AddressInfo } from 'node:net' +import { isViewRecord } from '../src/lexicon/types/app/bsky/embed/record' import { FeedViewPost, PostView, isPostView, isThreadViewPost, } from '../src/lexicon/types/app/bsky/feed/defs' -import { isViewRecord } from '../src/lexicon/types/app/bsky/embed/record' -import { AppBskyFeedGetPostThread } from '@atproto/api' import { LabelerView, isLabelerView, @@ -234,3 +238,46 @@ export const stripViewerFromLabeler = ( labeler.creator = stripViewer(labeler.creator) return stripViewer(labeler) } + +export async function startServer(app: Express) { + return new Promise<{ + origin: string + server: Server + stop: () => Promise + }>((resolve, reject) => { + const onListen = () => { + const port = (server.address() as AddressInfo).port + resolve({ + server, + origin: `http://localhost:${port}`, + stop: () => stopServer(server), + }) + cleanup() + } + const onError = (err: Error) => { + reject(err) + cleanup() + } + const cleanup = () => { + server.removeListener('listening', onListen) + server.removeListener('error', onError) + } + + const server = app + .listen(0) + .once('listening', onListen) + .once('error', onError) + }) +} + +export async function stopServer(server: Server) { + return new Promise((resolve, reject) => { + server.close((err) => { + if (err) { + reject(err) + } else { + resolve() + } + }) + }) +} diff --git a/packages/bsky/tests/blob-resolver.test.ts b/packages/bsky/tests/blob-resolver.test.ts index 985f347f7c2..71a486862aa 100644 --- a/packages/bsky/tests/blob-resolver.test.ts +++ b/packages/bsky/tests/blob-resolver.test.ts @@ -1,14 +1,14 @@ -import axios, { AxiosInstance } from 'axios' -import { CID } from 'multiformats/cid' import { cidForCbor, verifyCidForBytes } from '@atproto/common' -import { TestNetwork, basicSeed } from '@atproto/dev-env' import { randomBytes } from '@atproto/crypto' +import { TestNetwork, basicSeed } from '@atproto/dev-env' +import { CID } from 'multiformats/cid' +import { request } from 'undici' describe('blob resolver', () => { let network: TestNetwork - let client: AxiosInstance let fileDid: string let fileCid: CID + let fileSize: number beforeAll(async () => { network = await TestNetwork.create({ @@ -19,10 +19,7 @@ describe('blob resolver', () => { await network.processAll() fileDid = sc.dids.carol fileCid = sc.posts[fileDid][0].images[0].image.ref - client = axios.create({ - baseURL: network.bsky.url, - validateStatus: () => true, - }) + fileSize = sc.posts[fileDid][0].images[0].image.size }) afterAll(async () => { @@ -30,70 +27,99 @@ describe('blob resolver', () => { }) it('resolves blob with good signature check.', async () => { - const { data, status, headers } = await client.get( - `/blob/${fileDid}/${fileCid.toString()}`, - { responseType: 'arraybuffer' }, + const response = await request( + new URL(`/blob/${fileDid}/${fileCid.toString()}`, network.bsky.url), ) - expect(status).toEqual(200) - expect(headers['content-type']).toEqual('image/jpeg') - expect(headers['content-security-policy']).toEqual( + expect(response.statusCode).toEqual(200) + expect(response.headers['content-type']).toEqual('image/jpeg') + expect(response.headers['content-security-policy']).toEqual( `default-src 'none'; sandbox`, ) - expect(headers['x-content-type-options']).toEqual('nosniff') - await expect(verifyCidForBytes(fileCid, data)).resolves.toBeUndefined() + expect(response.headers['x-content-type-options']).toEqual('nosniff') + + const bytes = new Uint8Array(await response.body.arrayBuffer()) + await expect(verifyCidForBytes(fileCid, bytes)).resolves.toBeUndefined() }) it('404s on missing blob.', async () => { const badCid = await cidForCbor({ unknown: true }) - const { data, status } = await client.get( - `/blob/${fileDid}/${badCid.toString()}`, + const response = await request( + new URL(`/blob/${fileDid}/${badCid.toString()}`, network.bsky.url), ) - expect(status).toEqual(404) - expect(data).toEqual({ + expect(response.statusCode).toEqual(404) + await expect(response.body.json()).resolves.toEqual({ error: 'NotFoundError', message: 'Blob not found', }) }) it('404s on missing identity.', async () => { - const { data, status } = await client.get( - `/blob/did:plc:unknown/${fileCid.toString()}`, + const nonExistingDid = `did:plc:${'a'.repeat(24)}` + + const response = await request( + new URL( + `/blob/${nonExistingDid}/${fileCid.toString()}`, + network.bsky.url, + ), ) - expect(status).toEqual(404) - expect(data).toEqual({ + expect(response.statusCode).toEqual(404) + await expect(response.body.json()).resolves.toEqual({ error: 'NotFoundError', message: 'Origin not found', }) }) it('400s on invalid did.', async () => { - const { data, status } = await client.get( - `/blob/did::/${fileCid.toString()}`, + const response = await request( + new URL(`/blob/did::/${fileCid.toString()}`, network.bsky.url), ) - expect(status).toEqual(400) - expect(data).toEqual({ + expect(response.statusCode).toEqual(400) + await expect(response.body.json()).resolves.toEqual({ error: 'BadRequestError', message: 'Invalid did', }) }) it('400s on invalid cid.', async () => { - const { data, status } = await client.get(`/blob/${fileDid}/barfy`) - expect(status).toEqual(400) - expect(data).toEqual({ + const response = await request( + new URL(`/blob/${fileDid}/barfy`, network.bsky.url), + ) + expect(response.statusCode).toEqual(400) + await expect(response.body.json()).resolves.toEqual({ error: 'BadRequestError', message: 'Invalid cid', }) }) - it('fails on blob with bad signature check.', async () => { + it('400s on missing file.', async () => { + const missingCid = await cidForCbor('missing-file') + + const response = await request( + new URL(`/blob/${fileDid}/${missingCid}`, network.bsky.url), + ) + expect(response.statusCode).toEqual(404) + await expect(response.body.json()).resolves.toEqual({ + error: 'NotFoundError', + message: 'Blob not found', + }) + }) + + it('replaces the file with invalid bytes.', async () => { await network.pds.ctx.blobstore(fileDid).delete(fileCid) await network.pds.ctx .blobstore(fileDid) - .putPermanent(fileCid, randomBytes(100)) - const tryGetBlob = client.get(`/blob/${fileDid}/${fileCid.toString()}`) - await expect(tryGetBlob).rejects.toThrow( - 'maxContentLength size of -1 exceeded', + .putPermanent(fileCid, randomBytes(fileSize)) + }) + + it('fails to fetch bytes on blob with bad signature check.', async () => { + const response = await request( + new URL(`/blob/${fileDid}/${fileCid.toString()}`, network.bsky.url), ) + + expect(response.statusCode).toEqual(404) + await expect(response.body.json()).resolves.toEqual({ + error: 'NotFoundError', + message: 'Bad cid check', + }) }) }) diff --git a/packages/bsky/tests/image/server.test.ts b/packages/bsky/tests/image/server.test.ts index 4d879c30c03..c2f9b21f8ef 100644 --- a/packages/bsky/tests/image/server.test.ts +++ b/packages/bsky/tests/image/server.test.ts @@ -1,13 +1,12 @@ -import axios, { AxiosInstance } from 'axios' -import { CID } from 'multiformats/cid' import { cidForCbor } from '@atproto/common' import { TestNetwork, basicSeed } from '@atproto/dev-env' +import { CID } from 'multiformats/cid' +import { Readable } from 'node:stream' import { getInfo } from '../../src/image/sharp' import { ImageUriBuilder } from '../../src/image/uri' describe('image processing server', () => { let network: TestNetwork - let client: AxiosInstance let fileDid: string let fileCid: CID @@ -20,10 +19,6 @@ describe('image processing server', () => { await network.processAll() fileDid = sc.dids.carol fileCid = sc.posts[fileDid][0].images[0].image.ref - client = axios.create({ - baseURL: `${network.bsky.url}/img`, - validateStatus: () => true, - }) }) afterAll(async () => { @@ -31,16 +26,19 @@ describe('image processing server', () => { }) it('processes image from blob resolver.', async () => { - const res = await client.get( - ImageUriBuilder.getPath({ - preset: 'feed_fullsize', - did: fileDid, - cid: fileCid.toString(), - }), - { responseType: 'stream' }, + const res = await fetch( + new URL( + `/img${ImageUriBuilder.getPath({ + preset: 'feed_fullsize', + did: fileDid, + cid: fileCid.toString(), + })}`, + network.bsky.url, + ), ) - const info = await getInfo(res.data) + const bytes = new Uint8Array(await res.arrayBuffer()) + const info = await getInfo(Readable.from([bytes])) expect(info).toEqual({ height: 580, @@ -48,7 +46,7 @@ describe('image processing server', () => { size: 127578, mime: 'image/jpeg', }) - expect(res.headers).toEqual( + expect(Object.fromEntries(res.headers)).toEqual( expect.objectContaining({ 'content-type': 'image/jpeg', 'cache-control': 'public, max-age=31536000', @@ -63,26 +61,36 @@ describe('image processing server', () => { did: fileDid, cid: fileCid.toString(), }) - const res1 = await client.get(path, { responseType: 'arraybuffer' }) - expect(res1.headers['x-cache']).toEqual('miss') - const res2 = await client.get(path, { responseType: 'arraybuffer' }) - expect(res2.headers['x-cache']).toEqual('hit') - const res3 = await client.get(path, { responseType: 'arraybuffer' }) - expect(res3.headers['x-cache']).toEqual('hit') - expect(Buffer.compare(res1.data, res2.data)).toEqual(0) - expect(Buffer.compare(res1.data, res3.data)).toEqual(0) + const url = new URL(`/img${path}`, network.bsky.url) + + const res1 = await fetch(url) + expect(res1.headers.get('x-cache')).toEqual('miss') + const bytes1 = new Uint8Array(await res1.arrayBuffer()) + const res2 = await fetch(url) + expect(res2.headers.get('x-cache')).toEqual('hit') + const bytes2 = new Uint8Array(await res2.arrayBuffer()) + const res3 = await fetch(url) + expect(res3.headers.get('x-cache')).toEqual('hit') + const bytes3 = new Uint8Array(await res3.arrayBuffer()) + expect(Buffer.compare(bytes1, bytes2)).toEqual(0) + expect(Buffer.compare(bytes1, bytes3)).toEqual(0) }) it('errors on missing file.', async () => { const missingCid = await cidForCbor('missing-file') - const res = await client.get( - ImageUriBuilder.getPath({ - preset: 'feed_fullsize', - did: fileDid, - cid: missingCid.toString(), - }), - ) + + const path = ImageUriBuilder.getPath({ + preset: 'feed_fullsize', + did: fileDid, + cid: missingCid.toString(), + }) + + const url = new URL(`/img${path}`, network.bsky.url) + + const res = await fetch(url) expect(res.status).toEqual(404) - expect(res.data).toEqual({ message: 'Image not found' }) + await expect(res.json()).resolves.toMatchObject({ + message: 'Blob not found', + }) }) }) diff --git a/packages/bsky/tests/image/sharp.test.ts b/packages/bsky/tests/image/sharp.test.ts index 31fe4ba3297..ff928b9caf0 100644 --- a/packages/bsky/tests/image/sharp.test.ts +++ b/packages/bsky/tests/image/sharp.test.ts @@ -1,5 +1,11 @@ -import { createReadStream } from 'fs' -import { Options, getInfo, resize } from '../../src/image/sharp' +import { createReadStream } from 'node:fs' +import { pipeline } from 'node:stream/promises' +import { + Options, + createImageProcessor, + createImageUpscaler, + getInfo, +} from '../../src/image/sharp' describe('sharp image processor', () => { it('scales up to cover.', async () => { @@ -179,7 +185,14 @@ describe('sharp image processor', () => { async function processFixture(fixture: string, options: Options) { const image = createReadStream(`../dev-env/assets/${fixture}`) - const resized = await resize(image, options) - return await getInfo(resized) + const upscaler = createImageUpscaler(options) + const processor = createImageProcessor(options) + + const [info] = await Promise.all([ + getInfo(processor), + pipeline([image, upscaler, processor]), + ]) + + return info } }) diff --git a/packages/bsky/tests/label-hydration.test.ts b/packages/bsky/tests/label-hydration.test.ts index dedaf7577ae..9198c9fe29e 100644 --- a/packages/bsky/tests/label-hydration.test.ts +++ b/packages/bsky/tests/label-hydration.test.ts @@ -1,6 +1,5 @@ import { AtpAgent } from '@atproto/api' import { TestNetwork, SeedClient, basicSeed } from '@atproto/dev-env' -import axios from 'axios' describe('label hydration', () => { let network: TestNetwork @@ -81,15 +80,16 @@ describe('label hydration', () => { }) it('defaults to service labels when no labeler header is provided', async () => { - const res = await axios.get( + const res = await fetch( `${network.pds.url}/xrpc/app.bsky.actor.getProfile?actor=${carol}`, { headers: sc.getHeaders(bob) }, ) - expect(res.data.labels?.length).toBe(1) - expect(res.data.labels?.[0].src).toBe(labelerDid) - expect(res.data.labels?.[0].val).toBe('misleading') + const data = await res.json() + expect(data.labels?.length).toBe(1) + expect(data.labels?.[0].src).toBe(labelerDid) + expect(data.labels?.[0].val).toBe('misleading') - expect(res.headers['atproto-content-labelers']).toEqual( + expect(res.headers.get('atproto-content-labelers')).toEqual( network.bsky.ctx.cfg.labelsFromIssuerDids .map((did) => `${did};redact`) .join(','), diff --git a/packages/bsky/tests/server.test.ts b/packages/bsky/tests/server.test.ts index f1bdafeb16d..e10e0f66bb1 100644 --- a/packages/bsky/tests/server.test.ts +++ b/packages/bsky/tests/server.test.ts @@ -1,9 +1,11 @@ -import { AddressInfo } from 'net' -import express from 'express' -import axios, { AxiosError } from 'axios' import { TestNetwork, basicSeed } from '@atproto/dev-env' +import express from 'express' +import { once } from 'node:events' +import { AddressInfo } from 'node:net' +import { finished } from 'node:stream/promises' +import { request } from 'undici' import { handler as errorHandler } from '../src/error' -import { once } from 'events' +import { startServer } from './_util' describe('server', () => { let network: TestNetwork @@ -24,8 +26,8 @@ describe('server', () => { }) it('preserves 404s.', async () => { - const promise = axios.get(`${network.bsky.url}/unknown`) - await expect(promise).rejects.toThrow('failed with status code 404') + const response = await fetch(`${network.bsky.url}/unknown`) + expect(response.status).toEqual(404) }) it('error handler turns unknown errors into 500s.', async () => { @@ -34,99 +36,82 @@ describe('server', () => { throw new Error('Oops!') }) app.use(errorHandler) - const srv = app.listen() - const port = (srv.address() as AddressInfo).port - const promise = axios.get(`http://localhost:${port}/oops`) - await expect(promise).rejects.toThrow('failed with status code 500') - srv.close() + const { origin, stop } = await startServer(app) try { - await promise - } catch (err: unknown) { - const axiosError = err as AxiosError - expect(axiosError.response?.status).toEqual(500) - expect(axiosError.response?.data).toEqual({ + const response = await fetch(new URL(`/oops`, origin)) + expect(response.status).toEqual(500) + await expect(response.json()).resolves.toEqual({ error: 'InternalServerError', message: 'Internal Server Error', }) + } finally { + await stop() } }) it('healthcheck succeeds when database is available.', async () => { - const { data, status } = await axios.get(`${network.bsky.url}/xrpc/_health`) - expect(status).toEqual(200) - expect(data).toEqual({ version: 'unknown' }) + const response = await fetch(`${network.bsky.url}/xrpc/_health`) + expect(response.status).toEqual(200) + await expect(response.json()).resolves.toEqual({ version: 'unknown' }) }) // TODO(bsky) check on a different endpoint that accepts json, currently none. it.skip('limits size of json input.', async () => { - let error: AxiosError - try { - await axios.post( - `${network.bsky.url}/xrpc/com.atproto.repo.createRecord`, - { - data: 'x'.repeat(100 * 1024), // 100kb - }, - // { headers: sc.getHeaders(alice) }, - ) - throw new Error('Request should have failed') - } catch (err) { - if (axios.isAxiosError(err)) { - error = err - } else { - throw err - } - } - expect(error.response?.status).toEqual(413) - expect(error.response?.data).toEqual({ + const response = await fetch( + `${network.bsky.url}/xrpc/com.atproto.repo.createRecord`, + { + body: 'x'.repeat(100 * 1024), // 100kb + }, + ) + + expect(response.status).toEqual(413) + await expect(response.json()).resolves.toEqual({ error: 'PayloadTooLargeError', message: 'request entity too large', }) }) it('compresses large json responses', async () => { - const res = await axios.get( + const res = await request( `${network.bsky.url}/xrpc/app.bsky.feed.getTimeline`, { - decompress: false, headers: { ...(await network.serviceHeaders(alice, 'app.bsky.feed.getTimeline')), 'accept-encoding': 'gzip', }, }, ) + + await finished(res.body.resume()) + expect(res.headers['content-encoding']).toEqual('gzip') }) it('does not compress small payloads', async () => { - const res = await axios.get(`${network.bsky.url}/xrpc/_health`, { - decompress: false, + const res = await request(`${network.bsky.url}/xrpc/_health`, { headers: { 'accept-encoding': 'gzip' }, }) + + await finished(res.body.resume()) + expect(res.headers['content-encoding']).toBeUndefined() }) it('healthcheck fails when dataplane is unavailable.', async () => { const { port } = network.bsky.dataplane.server.address() as AddressInfo await network.bsky.dataplane.destroy() - let error: AxiosError + try { - await axios.get(`${network.bsky.url}/xrpc/_health`) - throw new Error('Healthcheck should have failed') - } catch (err) { - if (axios.isAxiosError(err)) { - error = err - } else { - throw err - } + const response = await fetch(`${network.bsky.url}/xrpc/_health`) + expect(response.status).toEqual(503) + await expect(response.json()).resolves.toEqual({ + version: 'unknown', + error: 'Service Unavailable', + }) } finally { // restart dataplane server to allow test suite to cleanup network.bsky.dataplane.server.listen(port) await once(network.bsky.dataplane.server, 'listening') } - expect(error.response?.status).toEqual(503) - expect(error.response?.data).toEqual({ - version: 'unknown', - error: 'Service Unavailable', - }) }) }) diff --git a/packages/common-web/src/async.ts b/packages/common-web/src/async.ts index e6da0fa6200..b9b1cd3e8cc 100644 --- a/packages/common-web/src/async.ts +++ b/packages/common-web/src/async.ts @@ -152,24 +152,75 @@ export class AsyncBufferFullError extends Error { } } -export const handleAllSettledErrors = ( +/** + * Utility function that behaves like {@link Promise.allSettled} but returns the + * same result as {@link Promise.all} in case every promise is fulfilled, and + * throws an {@link AggregateError} if there are more than one errors. + */ +export function allFulfilled( + promises: T, +): Promise<{ -readonly [P in keyof T]: Awaited }> +export function allFulfilled( + promises: Iterable>, +): Promise[]> +export function allFulfilled( + promises: Iterable>, +): Promise { + return Promise.allSettled(promises).then(handleAllSettledErrors) +} + +export function handleAllSettledErrors< + T extends readonly PromiseSettledResult[] | [], +>( + results: T, +): { + -readonly [P in keyof T]: T[P] extends PromiseSettledResult + ? U + : never +} +export function handleAllSettledErrors( + results: PromiseSettledResult[], +): T[] +export function handleAllSettledErrors( results: PromiseSettledResult[], -) => { - const errors = results.filter(isRejected).map((res) => res.reason) +): unknown[] { + const errors = results.filter(isRejectedResult).map(extractReason) if (errors.length === 0) { - return + // No need to filter here, it is safe to assume that all promises are fulfilled + return (results as PromiseFulfilledResult[]).map(extractValue) } if (errors.length === 1) { throw errors[0] } throw new AggregateError( errors, - 'Multiple errors: ' + errors.map((err) => err?.message).join('\n'), + `Multiple errors: ${errors.map(stringifyReason).join('\n')}`, ) } -const isRejected = ( +export function isRejectedResult( result: PromiseSettledResult, -): result is PromiseRejectedResult => { +): result is PromiseRejectedResult { return result.status === 'rejected' } + +function extractReason(result: PromiseRejectedResult): unknown { + return result.reason +} + +export function isFulfilledResult( + result: PromiseSettledResult, +): result is PromiseFulfilledResult { + return result.status === 'fulfilled' +} + +function extractValue(result: PromiseFulfilledResult): T { + return result.value +} + +function stringifyReason(reason: unknown): string { + if (reason instanceof Error) { + return reason.message + } + return String(reason) +} diff --git a/packages/common-web/src/retry.ts b/packages/common-web/src/retry.ts index 357e765e873..50d7ba009cc 100644 --- a/packages/common-web/src/retry.ts +++ b/packages/common-web/src/retry.ts @@ -3,12 +3,13 @@ import { wait } from './util' export type RetryOptions = { maxRetries?: number getWaitMs?: (n: number) => number | null - retryable?: (err: unknown) => boolean } export async function retry( fn: () => Promise, - opts: RetryOptions = {}, + opts: RetryOptions & { + retryable?: (err: unknown) => boolean + } = {}, ): Promise { const { maxRetries = 3, retryable = () => true, getWaitMs = backoffMs } = opts let retries = 0 @@ -33,6 +34,11 @@ export async function retry( throw doneError } +export function createRetryable(retryable: (err: unknown) => boolean) { + return async (fn: () => Promise, opts?: RetryOptions) => + retry(fn, { ...opts, retryable }) +} + // Waits exponential backoff with max and jitter: ~100, ~200, ~400, ~800, ~1000, ~1000, ... export function backoffMs(n: number, multiplier = 100, max = 1000) { const exponentialMs = Math.pow(2, n) * multiplier diff --git a/packages/dev-env/package.json b/packages/dev-env/package.json index 5b3f8429bff..5274021b240 100644 --- a/packages/dev-env/package.json +++ b/packages/dev-env/package.json @@ -35,12 +35,12 @@ "@atproto/xrpc-server": "workspace:^", "@did-plc/lib": "^0.0.1", "@did-plc/server": "^0.0.1", - "axios": "^0.27.2", "dotenv": "^16.0.3", "express": "^4.18.2", "get-port": "^5.1.1", "multiformats": "^9.9.0", - "uint8arrays": "3.0.0" + "uint8arrays": "3.0.0", + "undici": "^6.14.1" }, "devDependencies": { "@types/express": "^4.17.13", diff --git a/packages/dev-env/src/bsky.ts b/packages/dev-env/src/bsky.ts index fd61bb7c9c0..eb2c5261d8c 100644 --- a/packages/dev-env/src/bsky.ts +++ b/packages/dev-env/src/bsky.ts @@ -65,6 +65,7 @@ export class TestBsky { modServiceDid: cfg.modServiceDid ?? 'did:example:invalidMod', labelsFromIssuerDids: [EXAMPLE_LABELER], bigThreadUris: new Set(), + disableSsrfProtection: true, ...cfg, adminPasswords: [ADMIN_PASSWORD], }) diff --git a/packages/dev-env/src/util.ts b/packages/dev-env/src/util.ts index 3c7276ef330..d791d702f5b 100644 --- a/packages/dev-env/src/util.ts +++ b/packages/dev-env/src/util.ts @@ -1,4 +1,4 @@ -import axios from 'axios' +import { request } from 'undici' import * as plc from '@did-plc/lib' import { IdResolver } from '@atproto/identity' import { Secp256k1Keypair } from '@atproto/crypto' @@ -41,10 +41,15 @@ export const mockResolvers = (idResolver: IdResolver, pds: TestPds) => { return origResolveHandleDns.call(idResolver.handle, handle) } - const url = `${pds.url}/.well-known/atproto-did` + const url = new URL(`/.well-known/atproto-did`, pds.url) try { - const res = await axios.get(url, { headers: { host: handle } }) - return res.data + const res = await request(url, { headers: { host: handle } }) + if (res.statusCode !== 200) { + res.body.destroy() + return undefined + } + + return res.body.text() } catch (err) { return undefined } diff --git a/packages/identity/package.json b/packages/identity/package.json index 0e5f8ce1fa9..5b8b250c822 100644 --- a/packages/identity/package.json +++ b/packages/identity/package.json @@ -23,8 +23,7 @@ }, "dependencies": { "@atproto/common-web": "workspace:^", - "@atproto/crypto": "workspace:^", - "axios": "^0.27.2" + "@atproto/crypto": "workspace:^" }, "devDependencies": { "@did-plc/lib": "^0.0.1", diff --git a/packages/identity/src/did/plc-resolver.ts b/packages/identity/src/did/plc-resolver.ts index cdac4eb98f8..de82f72ec6f 100644 --- a/packages/identity/src/did/plc-resolver.ts +++ b/packages/identity/src/did/plc-resolver.ts @@ -1,6 +1,6 @@ -import axios, { AxiosError } from 'axios' -import BaseResolver from './base-resolver' import { DidCache } from '../types' +import BaseResolver from './base-resolver' +import { timed } from './util' export class DidPlcResolver extends BaseResolver { constructor( @@ -12,16 +12,22 @@ export class DidPlcResolver extends BaseResolver { } async resolveNoCheck(did: string): Promise { - try { - const res = await axios.get(`${this.plcUrl}/${encodeURIComponent(did)}`, { - timeout: this.timeout, + return timed(this.timeout, async (signal) => { + const url = new URL(`/${encodeURIComponent(did)}`, this.plcUrl) + const res = await fetch(url, { + redirect: 'error', + headers: { accept: 'application/did+ld+json,application/json' }, + signal, }) - return res.data - } catch (err) { - if (err instanceof AxiosError && err.response?.status === 404) { - return null // Positively not found, versus due to e.g. network error + + // Positively not found, versus due to e.g. network error + if (res.status === 404) return null + + if (!res.ok) { + throw Object.assign(new Error(res.statusText), { status: res.status }) } - throw err - } + + return res.json() + }) } } diff --git a/packages/identity/src/did/util.ts b/packages/identity/src/did/util.ts new file mode 100644 index 00000000000..6845da21887 --- /dev/null +++ b/packages/identity/src/did/util.ts @@ -0,0 +1,15 @@ +export async function timed unknown>( + ms: number, + fn: F, +): Promise>> { + const abortController = new AbortController() + const timer = setTimeout(() => abortController.abort(), ms) + const signal = abortController.signal + + try { + return (await fn(signal)) as Awaited> + } finally { + clearTimeout(timer) + abortController.abort() + } +} diff --git a/packages/identity/src/did/web-resolver.ts b/packages/identity/src/did/web-resolver.ts index 201d70c902c..e09b3a572be 100644 --- a/packages/identity/src/did/web-resolver.ts +++ b/packages/identity/src/did/web-resolver.ts @@ -1,7 +1,7 @@ -import axios, { AxiosError } from 'axios' -import BaseResolver from './base-resolver' -import { DidCache } from '../types' import { PoorlyFormattedDidError, UnsupportedDidWebPathError } from '../errors' +import { DidCache } from '../types' +import BaseResolver from './base-resolver' +import { timed } from './util' export const DOC_PATH = '/.well-known/did.json' @@ -32,17 +32,17 @@ export class DidWebResolver extends BaseResolver { url.protocol = 'http' } - try { - const res = await axios.get(url.toString(), { - responseType: 'json', - timeout: this.timeout, + return timed(this.timeout, async (signal) => { + const res = await fetch(url, { + signal, + redirect: 'error', + headers: { accept: 'application/did+ld+json,application/json' }, }) - return res.data - } catch (err) { - if (err instanceof AxiosError && err.response) { - return null // Positively not found, versus due to e.g. network error - } - throw err - } + + // Positively not found, versus due to e.g. network error + if (!res.ok) return null + + return res.json() + }) } } diff --git a/packages/internal/did-resolver/src/methods/plc.ts b/packages/internal/did-resolver/src/methods/plc.ts index 22a22de842a..00125db1ef7 100644 --- a/packages/internal/did-resolver/src/methods/plc.ts +++ b/packages/internal/did-resolver/src/methods/plc.ts @@ -45,7 +45,7 @@ export class DidPlcMethod implements DidMethod<'plc'> { // should still check if the msid is valid. assertDidPlc(did) - const url = new URL(`/${did}`, this.plcDirectoryUrl) + const url = new URL(`/${encodeURIComponent(did)}`, this.plcDirectoryUrl) return this.fetch(url, { redirect: 'error', diff --git a/packages/internal/xrpc-utils/package.json b/packages/internal/xrpc-utils/package.json new file mode 100644 index 00000000000..173fa626e29 --- /dev/null +++ b/packages/internal/xrpc-utils/package.json @@ -0,0 +1,44 @@ +{ + "name": "@atproto-labs/xrpc-utils", + "version": "0.0.0", + "license": "MIT", + "description": "XRPC server utilities for Node.JS", + "keywords": [ + "atproto", + "node", + "xrpc", + "server", + "utilities", + "content", + "negotiation" + ], + "homepage": "https://atproto.com", + "repository": { + "type": "git", + "url": "https://github.com/bluesky-social/atproto", + "directory": "packages/internal/xrpc-utils" + }, + "type": "commonjs", + "main": "dist/index.js", + "types": "dist/index.d.ts", + "exports": { + ".": { + "types": "./dist/index.d.ts", + "default": "./dist/index.js" + }, + "./accept": { + "types": "./dist/accept.d.ts", + "default": "./dist/accept.js" + } + }, + "dependencies": { + "@atproto/xrpc": "workspace:^", + "@atproto/xrpc-server": "workspace:^" + }, + "devDependencies": { + "typescript": "^5.6.3" + }, + "scripts": { + "build": "tsc --build tsconfig.json" + } +} diff --git a/packages/internal/xrpc-utils/src/accept.ts b/packages/internal/xrpc-utils/src/accept.ts new file mode 100644 index 00000000000..f3668b0c2ba --- /dev/null +++ b/packages/internal/xrpc-utils/src/accept.ts @@ -0,0 +1,143 @@ +import { ResponseType } from '@atproto/xrpc' +import { + InvalidRequestError, + XRPCError as XRPCServerError, +} from '@atproto/xrpc-server' + +export type AcceptFlags = { q: number } +export type Accept = [name: string, flags: AcceptFlags] + +export const ACCEPT_ENCODING_COMPRESSED: readonly [Accept, ...Accept[]] = [ + ['gzip', { q: 1.0 }], + ['deflate', { q: 0.9 }], + ['br', { q: 0.8 }], + ['identity', { q: 0.1 }], +] + +export const ACCEPT_ENCODING_UNCOMPRESSED: readonly [Accept, ...Accept[]] = [ + ['identity', { q: 1.0 }], + ['gzip', { q: 0.3 }], + ['deflate', { q: 0.2 }], + ['br', { q: 0.1 }], +] + +// accept-encoding defaults to "identity with lowest priority" +const ACCEPT_ENC_DEFAULT = ['identity', { q: 0.001 }] as const satisfies Accept +const ACCEPT_FORBID_STAR = ['*', { q: 0 }] as const satisfies Accept + +export function buildProxiedContentEncoding( + acceptHeader: undefined | string | string[], + preferCompressed: boolean, +): string { + return negotiateContentEncoding( + acceptHeader, + preferCompressed + ? ACCEPT_ENCODING_COMPRESSED + : ACCEPT_ENCODING_UNCOMPRESSED, + ) +} + +export function negotiateContentEncoding( + acceptHeader: undefined | string | string[], + preferences: readonly Accept[], +): string { + const acceptMap = Object.fromEntries( + parseAcceptEncoding(acceptHeader), + ) + + // Make sure the default (identity) is covered by the preferences + if (!preferences.some(coversIdentityAccept)) { + preferences = [...preferences, ACCEPT_ENC_DEFAULT] + } + + const common = preferences.filter(([name]) => { + const acceptQ = (acceptMap[name] ?? acceptMap['*'])?.q + // Per HTTP/1.1, "identity" is always acceptable unless explicitly rejected + if (name === 'identity') { + return acceptQ == null || acceptQ > 0 + } else { + return acceptQ != null && acceptQ > 0 + } + }) + + // Since "identity" was present in the preferences, a missing "identity" in + // the common array means that the client explicitly rejected it. Let's reflect + // this by adding it to the common array. + if (!common.some(coversIdentityAccept)) { + common.push(ACCEPT_FORBID_STAR) + } + + // If no common encodings are acceptable, throw a 406 Not Acceptable error + if (!common.some(isAllowedAccept)) { + throw new XRPCServerError( + ResponseType.NotAcceptable, + 'this service does not support any of the requested encodings', + ) + } + + return formatAcceptHeader(common as [Accept, ...Accept[]]) +} + +function coversIdentityAccept([name]: Accept): boolean { + return name === 'identity' || name === '*' +} + +function isAllowedAccept([, flags]: Accept): boolean { + return flags.q > 0 +} + +/** + * @see {@link https://developer.mozilla.org/en-US/docs/Glossary/Quality_values} + */ +export function formatAcceptHeader( + accept: readonly [Accept, ...Accept[]], +): string { + return accept.map(formatAcceptPart).join(',') +} + +function formatAcceptPart([name, flags]: Accept): string { + return `${name};q=${flags.q}` +} + +function parseAcceptEncoding( + acceptEncodings: undefined | string | string[], +): Accept[] { + if (!acceptEncodings?.length) return [] + + return Array.isArray(acceptEncodings) + ? acceptEncodings.flatMap(parseAcceptEncoding) + : acceptEncodings.split(',').map(parseAcceptEncodingDefinition) +} + +function parseAcceptEncodingDefinition(def: string): Accept { + const { length, 0: encoding, 1: params } = def.trim().split(';', 3) + + if (length > 2) { + throw new InvalidRequestError(`Invalid accept-encoding: "${def}"`) + } + + if (!encoding || encoding.includes('=')) { + throw new InvalidRequestError(`Invalid accept-encoding: "${def}"`) + } + + const flags = { q: 1 } + if (length === 2) { + const { length, 0: key, 1: value } = params.split('=', 3) + if (length !== 2) { + throw new InvalidRequestError(`Invalid accept-encoding: "${def}"`) + } + + if (key === 'q' || key === 'Q') { + const q = parseFloat(value) + if (q === 0 || (Number.isFinite(q) && q <= 1 && q >= 0.001)) { + flags.q = q + } else { + throw new InvalidRequestError(`Invalid accept-encoding: "${def}"`) + } + } else { + throw new InvalidRequestError(`Invalid accept-encoding: "${def}"`) + } + } + + return [encoding.toLowerCase(), flags] +} diff --git a/packages/internal/xrpc-utils/src/index.ts b/packages/internal/xrpc-utils/src/index.ts new file mode 100644 index 00000000000..49ada8addff --- /dev/null +++ b/packages/internal/xrpc-utils/src/index.ts @@ -0,0 +1 @@ +export * from './accept.js' diff --git a/packages/internal/xrpc-utils/tsconfig.build.json b/packages/internal/xrpc-utils/tsconfig.build.json new file mode 100644 index 00000000000..ea00aba058e --- /dev/null +++ b/packages/internal/xrpc-utils/tsconfig.build.json @@ -0,0 +1,8 @@ +{ + "extends": ["../../../tsconfig/node.json"], + "compilerOptions": { + "outDir": "dist", + "rootDir": "src" + }, + "include": ["src"] +} diff --git a/packages/internal/xrpc-utils/tsconfig.json b/packages/internal/xrpc-utils/tsconfig.json new file mode 100644 index 00000000000..e84b8178b47 --- /dev/null +++ b/packages/internal/xrpc-utils/tsconfig.json @@ -0,0 +1,4 @@ +{ + "include": [], + "references": [{ "path": "./tsconfig.build.json" }] +} diff --git a/packages/ozone/package.json b/packages/ozone/package.json index 81e9f0e99a3..dbd58801d71 100644 --- a/packages/ozone/package.json +++ b/packages/ozone/package.json @@ -35,7 +35,6 @@ "@atproto/xrpc": "workspace:^", "@atproto/xrpc-server": "workspace:^", "@did-plc/lib": "^0.0.1", - "axios": "^1.6.7", "compression": "^1.7.4", "cors": "^2.8.5", "express": "^4.17.2", @@ -48,7 +47,8 @@ "pino-http": "^8.2.1", "structured-headers": "^1.0.1", "typed-emitter": "^2.1.0", - "uint8arrays": "3.0.0" + "uint8arrays": "3.0.0", + "undici": "^6.14.1" }, "devDependencies": { "@atproto/lex-cli": "workspace:^", @@ -59,7 +59,6 @@ "@types/express-serve-static-core": "^4.17.36", "@types/pg": "^8.6.6", "@types/qs": "^6.9.7", - "axios": "^0.27.2", "jest": "^28.1.2", "ts-node": "^10.8.2", "typescript": "^5.6.3" diff --git a/packages/ozone/src/daemon/blob-diverter.ts b/packages/ozone/src/daemon/blob-diverter.ts index ad3e0af19e7..585c318d789 100644 --- a/packages/ozone/src/daemon/blob-diverter.ts +++ b/packages/ozone/src/daemon/blob-diverter.ts @@ -1,16 +1,19 @@ import { - VerifyCidTransform, - forwardStreamErrors, + createDecoders, getPdsEndpoint, + VerifyCidTransform, + allFulfilled, } from '@atproto/common' import { IdResolver } from '@atproto/identity' -import axios from 'axios' -import { Readable } from 'stream' +import { ResponseType, XRPCError } from '@atproto/xrpc' import { CID } from 'multiformats/cid' +import { Readable } from 'node:stream' +import { finished, pipeline } from 'node:stream/promises' +import * as undici from 'undici' +import { BlobDivertConfig } from '../config' import Database from '../db' import { retryHttp } from '../util' -import { BlobDivertConfig } from '../config' export class BlobDiverter { serviceConfig: BlobDivertConfig @@ -27,126 +30,158 @@ export class BlobDiverter { this.idResolver = services.idResolver } - private async getBlob({ - pds, - did, - cid, - }: { - pds: string - did: string - cid: string - }) { - const blobResponse = await axios.get( - `${pds}/xrpc/com.atproto.sync.getBlob`, - { - params: { did, cid }, - decompress: true, - responseType: 'stream', - timeout: 5000, // 5sec of inactivity on the connection - }, - ) - const imageStream: Readable = blobResponse.data - const verifyCid = new VerifyCidTransform(CID.parse(cid)) - forwardStreamErrors(imageStream, verifyCid) - - return { - contentType: - blobResponse.headers['content-type'] || 'application/octet-stream', - imageStream: imageStream.pipe(verifyCid), + /** + * @throws {XRPCError} so that retryHttp can handle retries + */ + async getBlob(options: GetBlobOptions): Promise { + const blobUrl = getBlobUrl(options) + + const blobResponse = await undici + .request(blobUrl, { + headersTimeout: 10e3, + bodyTimeout: 30e3, + }) + .catch((err) => { + throw asXrpcClientError(err, `Error fetching blob ${options.cid}`) + }) + + if (blobResponse.statusCode !== 200) { + blobResponse.body.destroy() + throw new XRPCError( + blobResponse.statusCode, + undefined, + `Error downloading blob ${options.cid}`, + ) } - } - - async sendImage({ - url, - imageStream, - contentType, - }: { - url: string - imageStream: Readable - contentType: string - }) { - const result = await axios(url, { - method: 'POST', - data: imageStream, - headers: { - Authorization: basicAuth('admin', this.serviceConfig.adminPassword), - 'Content-Type': contentType, - }, - }) - return result.status === 200 + try { + const type = blobResponse.headers['content-type'] + const encoding = blobResponse.headers['content-encoding'] + + const verifier = new VerifyCidTransform(CID.parse(options.cid)) + + void pipeline([ + blobResponse.body, + ...createDecoders(encoding), + verifier, + ]).catch((_err) => {}) + + return { + type: typeof type === 'string' ? type : 'application/octet-stream', + stream: verifier, + } + } catch (err) { + // Typically un-supported content encoding + blobResponse.body.destroy() + throw err + } } - private async uploadBlob( - { - imageStream, - contentType, - }: { imageStream: Readable; contentType: string }, - { - subjectDid, - subjectUri, - }: { subjectDid: string; subjectUri: string | null }, - ) { - const url = new URL( - `${this.serviceConfig.url}/xrpc/com.atproto.unspecced.reportBlob`, - ) - url.searchParams.set('did', subjectDid) - if (subjectUri) url.searchParams.set('uri', subjectUri) - const result = await this.sendImage({ - url: url.toString(), - imageStream, - contentType, - }) + /** + * @throws {XRPCError} so that retryHttp can handle retries + */ + async uploadBlob(blob: Blob, report: ReportBlobOptions) { + const uploadUrl = reportBlobUrl(this.serviceConfig.url, report) + + const result = await undici + .request(uploadUrl, { + method: 'POST', + body: blob.stream, + headersTimeout: 30e3, + bodyTimeout: 10e3, + headers: { + Authorization: basicAuth('admin', this.serviceConfig.adminPassword), + 'content-type': blob.type, + }, + }) + .catch((err) => { + throw asXrpcClientError(err, `Error uploading blob ${report.did}`) + }) + + if (result.statusCode !== 200) { + result.body.destroy() + throw new XRPCError( + result.statusCode, + undefined, + `Error uploading blob ${report.did}`, + ) + } - return result + await finished(result.body.resume()) } async uploadBlobOnService({ - subjectDid, - subjectUri, + subjectDid: did, + subjectUri: uri, subjectBlobCids, }: { subjectDid: string subjectUri: string | null subjectBlobCids: string[] - }): Promise { - const didDoc = await this.idResolver.did.resolve(subjectDid) - - if (!didDoc) { - throw new Error('Error resolving DID') - } + }): Promise { + const didDoc = await this.idResolver.did.resolve(did) + if (!didDoc) throw new Error('Error resolving DID') const pds = getPdsEndpoint(didDoc) + if (!pds) throw new Error('Error resolving PDS') - if (!pds) { - throw new Error('Error resolving PDS') - } - - // attempt to download and upload within the same retry block since the imageStream is not reusable - const uploadResult = await Promise.all( + await allFulfilled( subjectBlobCids.map((cid) => retryHttp(async () => { - const { imageStream, contentType } = await this.getBlob({ - pds, - cid, - did: subjectDid, - }) - return this.uploadBlob( - { imageStream, contentType }, - { subjectDid, subjectUri }, - ) + // attempt to download and upload within the same retry block since + // the blob stream is not reusable + const blob = await this.getBlob({ pds, cid, did }) + return this.uploadBlob(blob, { did, uri }) }), ), - ) - - if (uploadResult.includes(false)) { - throw new Error(`Error uploading blob ${subjectUri}`) - } - - return true + ).catch((err) => { + throw new XRPCError( + ResponseType.UpstreamFailure, + undefined, + 'Failed to process blobs', + undefined, + { cause: err }, + ) + }) } } const basicAuth = (username: string, password: string) => { return 'Basic ' + Buffer.from(`${username}:${password}`).toString('base64') } + +type Blob = { + type: string + stream: Readable +} + +type GetBlobOptions = { + pds: string + did: string + cid: string +} + +function getBlobUrl({ pds, did, cid }: GetBlobOptions): URL { + const url = new URL(`/xrpc/com.atproto.sync.getBlob`, pds) + url.searchParams.set('did', did) + url.searchParams.set('cid', cid) + return url +} + +type ReportBlobOptions = { + did: string + uri: string | null +} + +function reportBlobUrl(service: string, { did, uri }: ReportBlobOptions): URL { + const url = new URL(`/xrpc/com.atproto.unspecced.reportBlob`, service) + url.searchParams.set('did', did) + if (uri != null) url.searchParams.set('uri', uri) + return url +} + +function asXrpcClientError(err: unknown, message: string) { + return new XRPCError(ResponseType.Unknown, undefined, message, undefined, { + cause: err, + }) +} diff --git a/packages/ozone/src/util.ts b/packages/ozone/src/util.ts index 865f872e2db..717316f5c96 100644 --- a/packages/ozone/src/util.ts +++ b/packages/ozone/src/util.ts @@ -1,7 +1,6 @@ -import { AxiosError } from 'axios' +import { createRetryable } from '@atproto/common' +import { ResponseType, XRPCError } from '@atproto/xrpc' import { parseList } from 'structured-headers' -import { XRPCError, ResponseType } from '@atproto/xrpc' -import { RetryOptions, retry } from '@atproto/common' import Database from './db' export const getSigningKeyId = async ( @@ -24,28 +23,17 @@ export const getSigningKeyId = async ( return insertRes.id } -export async function retryHttp( - fn: () => Promise, - opts: RetryOptions = {}, -): Promise { - return retry(fn, { retryable: retryableHttp, ...opts }) -} +export const RETRYABLE_HTTP_STATUS_CODES = new Set([ + 408, 425, 429, 500, 502, 503, 504, 522, 524, +]) -export function retryableHttp(err: unknown) { +export const retryHttp = createRetryable((err: unknown) => { if (err instanceof XRPCError) { if (err.status === ResponseType.Unknown) return true - return retryableHttpStatusCodes.has(err.status) - } - if (err instanceof AxiosError) { - if (!err.response) return true - return retryableHttpStatusCodes.has(err.response.status) + return RETRYABLE_HTTP_STATUS_CODES.has(err.status) } return false -} - -const retryableHttpStatusCodes = new Set([ - 408, 425, 429, 500, 502, 503, 504, 522, 524, -]) +}) export type ParsedLabelers = { dids: string[] diff --git a/packages/ozone/tests/_util.ts b/packages/ozone/tests/_util.ts index 991569ef4e7..be1b855cc40 100644 --- a/packages/ozone/tests/_util.ts +++ b/packages/ozone/tests/_util.ts @@ -1,3 +1,6 @@ +import { type Express } from 'express' +import { Server } from 'node:http' +import { AddressInfo } from 'node:net' import { AtUri } from '@atproto/syntax' import { lexToJson } from '@atproto/lexicon' import { CID } from 'multiformats/cid' @@ -195,3 +198,46 @@ export const stripViewerFromThread = (thread: T): T => { } return thread } + +export async function startServer(app: Express) { + return new Promise<{ + origin: string + server: Server + stop: () => Promise + }>((resolve, reject) => { + const onListen = () => { + const port = (server.address() as AddressInfo).port + resolve({ + server, + origin: `http://localhost:${port}`, + stop: () => stopServer(server), + }) + cleanup() + } + const onError = (err: Error) => { + reject(err) + cleanup() + } + const cleanup = () => { + server.removeListener('listening', onListen) + server.removeListener('error', onError) + } + + const server = app + .listen(0) + .once('listening', onListen) + .once('error', onError) + }) +} + +export async function stopServer(server: Server) { + return new Promise((resolve, reject) => { + server.close((err) => { + if (err) { + reject(err) + } else { + resolve() + } + }) + }) +} diff --git a/packages/ozone/tests/blob-divert.test.ts b/packages/ozone/tests/blob-divert.test.ts index 9c335f20ec3..117c59d8044 100644 --- a/packages/ozone/tests/blob-divert.test.ts +++ b/packages/ozone/tests/blob-divert.test.ts @@ -1,10 +1,11 @@ -import assert from 'node:assert' import { ModeratorClient, SeedClient, TestNetwork, basicSeed, } from '@atproto/dev-env' +import { ResponseType, XRPCError } from '@atproto/xrpc' +import assert from 'node:assert' import { forSnapshot } from './_util' describe('blob divert', () => { @@ -30,13 +31,16 @@ describe('blob divert', () => { await network.close() }) - const mockReportServiceResponse = (result: boolean) => { + const mockReportServiceResponse = (succeeds: boolean) => { const blobDiverter = network.ozone.ctx.blobDiverter assert(blobDiverter) return jest - .spyOn(blobDiverter, 'sendImage') + .spyOn(blobDiverter, 'uploadBlob') .mockImplementation(async () => { - return result + if (!succeeds) { + // Using an XRPCError to trigger retries + throw new XRPCError(ResponseType.Unknown, undefined) + } }) } @@ -46,6 +50,8 @@ describe('blob divert', () => { cid: sc.posts[sc.dids.carol][0].ref.cidStr, }) + const getImages = () => sc.posts[sc.dids.carol][0].images + const emitDivertEvent = async () => modClient.emitEvent( { @@ -55,9 +61,7 @@ describe('blob divert', () => { comment: 'Diverting for test', }, createdBy: sc.dids.alice, - subjectBlobCids: sc.posts[sc.dids.carol][0].images.map((img) => - img.image.ref.toString(), - ), + subjectBlobCids: getImages().map((img) => img.image.ref.toString()), }, 'moderator', ) @@ -65,25 +69,32 @@ describe('blob divert', () => { it('fails and keeps attempt count when report service fails to accept upload.', async () => { // Simulate failure to fail upload const reportServiceRequest = mockReportServiceResponse(false) + try { + await expect(emitDivertEvent()).rejects.toThrow('Failed to process blobs') - await expect(emitDivertEvent()).rejects.toThrow() - - expect(reportServiceRequest).toHaveBeenCalled() + // 1 initial attempt + 3 retries + expect(reportServiceRequest).toHaveBeenCalledTimes(getImages().length * 4) + } finally { + reportServiceRequest.mockRestore() + } }) it('sends blobs to configured divert service and marks divert date', async () => { - // Simulate failure to accept upload + // Simulate success to accept upload const reportServiceRequest = mockReportServiceResponse(true) + try { + const divertEvent = await emitDivertEvent() - const divertEvent = await emitDivertEvent() + expect(reportServiceRequest).toHaveBeenCalledTimes(getImages().length) + expect(forSnapshot(divertEvent)).toMatchSnapshot() - expect(reportServiceRequest).toHaveBeenCalled() - expect(forSnapshot(divertEvent)).toMatchSnapshot() - - const { subjectStatuses } = await modClient.queryStatuses({ - subject: getSubject().uri, - }) + const { subjectStatuses } = await modClient.queryStatuses({ + subject: getSubject().uri, + }) - expect(subjectStatuses[0].takendown).toBe(true) + expect(subjectStatuses[0].takendown).toBe(true) + } finally { + reportServiceRequest.mockRestore() + } }) }) diff --git a/packages/ozone/tests/moderation.test.ts b/packages/ozone/tests/moderation.test.ts index 9f4937935df..b3eae62a7d5 100644 --- a/packages/ozone/tests/moderation.test.ts +++ b/packages/ozone/tests/moderation.test.ts @@ -816,7 +816,9 @@ describe('moderation', () => { it.skip('prevents image blob from being served, even when cached.', async () => { const fetchImage = await fetch(imageUri) expect(fetchImage.status).toEqual(404) - expect(await fetchImage.json()).toEqual({ message: 'Image not found' }) + expect(await fetchImage.json()).toMatchObject({ + message: 'Blob not found', + }) }) it('invalidates the image in the cdn', async () => { diff --git a/packages/ozone/tests/server.test.ts b/packages/ozone/tests/server.test.ts index 16abab44958..7899b5ff2ef 100644 --- a/packages/ozone/tests/server.test.ts +++ b/packages/ozone/tests/server.test.ts @@ -1,8 +1,7 @@ -import { AddressInfo } from 'net' +import { TestNetwork, TestOzone } from '@atproto/dev-env' import express from 'express' -import axios, { AxiosError } from 'axios' -import { TestOzone, TestNetwork } from '@atproto/dev-env' import { handler as errorHandler } from '../src/error' +import { startServer } from './_util' describe('server', () => { let network: TestNetwork @@ -20,56 +19,45 @@ describe('server', () => { }) it('preserves 404s.', async () => { - const promise = axios.get(`${ozone.url}/unknown`) - await expect(promise).rejects.toThrow('failed with status code 404') + const response = await fetch(`${ozone.url}/unknown`) + expect(response.status).toEqual(404) }) it('error handler turns unknown errors into 500s.', async () => { const app = express() - app.get('/oops', () => { - throw new Error('Oops!') - }) - app.use(errorHandler) - const srv = app.listen() - const port = (srv.address() as AddressInfo).port - const promise = axios.get(`http://localhost:${port}/oops`) - await expect(promise).rejects.toThrow('failed with status code 500') - srv.close() + .get('/oops', () => { + throw new Error('Oops!') + }) + .use(errorHandler) + + const { origin, stop } = await startServer(app) try { - await promise - } catch (err: unknown) { - const axiosError = err as AxiosError - expect(axiosError.response?.status).toEqual(500) - expect(axiosError.response?.data).toEqual({ + const response = await fetch(new URL(`/oops`, origin)) + expect(response.status).toEqual(500) + await expect(response.json()).resolves.toEqual({ error: 'InternalServerError', message: 'Internal Server Error', }) + } finally { + await stop() } }) it('healthcheck succeeds when database is available.', async () => { - const { data, status } = await axios.get(`${ozone.url}/xrpc/_health`) - expect(status).toEqual(200) - expect(data).toEqual({ version: '0.0.0' }) + const response = await fetch(`${network.bsky.url}/xrpc/_health`) + expect(response.status).toEqual(200) + await expect(response.json()).resolves.toEqual({ version: 'unknown' }) }) it('healthcheck fails when database is unavailable.', async () => { // destroy sequencer to release connection that would prevent the db from closing await ozone.ctx.sequencer.destroy() await ozone.ctx.db.close() - let error: AxiosError - try { - await axios.get(`${ozone.url}/xrpc/_health`) - throw new Error('Healthcheck should have failed') - } catch (err) { - if (axios.isAxiosError(err)) { - error = err - } else { - throw err - } - } - expect(error.response?.status).toEqual(503) - expect(error.response?.data).toEqual({ + + const res = await fetch(`${ozone.url}/xrpc/_health`) + + expect(res.status).toEqual(503) + await expect(res.json()).resolves.toEqual({ version: '0.0.0', error: 'Service Unavailable', }) diff --git a/packages/pds/package.json b/packages/pds/package.json index 9f3eb668881..0edd5fb13c8 100644 --- a/packages/pds/package.json +++ b/packages/pds/package.json @@ -31,6 +31,7 @@ }, "dependencies": { "@atproto-labs/fetch-node": "workspace:*", + "@atproto-labs/xrpc-utils": "workspace:*", "@atproto/api": "workspace:^", "@atproto/aws": "workspace:^", "@atproto/common": "workspace:^", @@ -83,7 +84,6 @@ "@types/express-serve-static-core": "^4.17.36", "@types/nodemailer": "^6.4.6", "@types/qs": "^6.9.7", - "axios": "^0.27.2", "esbuild": "^0.14.48", "esbuild-plugin-handlebars": "^1.0.3", "get-port": "^6.1.2", diff --git a/packages/pds/src/pipethrough.ts b/packages/pds/src/pipethrough.ts index d96b33ac8c2..01e7c3a4b92 100644 --- a/packages/pds/src/pipethrough.ts +++ b/packages/pds/src/pipethrough.ts @@ -3,6 +3,7 @@ import { IncomingHttpHeaders, ServerResponse } from 'node:http' import { PassThrough, Readable } from 'node:stream' import { Dispatcher } from 'undici' +import { buildProxiedContentEncoding } from '@atproto-labs/xrpc-utils' import { decodeStream, getServiceEndpoint, @@ -101,20 +102,6 @@ export const proxyHandler = (ctx: AppContext): CatchallHandler => { } } -const ACCEPT_ENCODING_COMPRESSED = [ - ['gzip', { q: 1.0 }], - ['deflate', { q: 0.9 }], - ['br', { q: 0.8 }], - ['identity', { q: 0.1 }], -] as const satisfies Accept[] - -const ACCEPT_ENCODING_UNCOMPRESSED = [ - ['identity', { q: 1.0 }], - ['gzip', { q: 0.3 }], - ['deflate', { q: 0.2 }], - ['br', { q: 0.1 }], -] as const satisfies Accept[] - export type PipethroughOptions = { /** * Specify the issuer (requester) for service auth. If not provided, no @@ -176,11 +163,9 @@ export async function pipethrough( // upstream server for an encoding that both the requester and the PDS can // understand. Since we might have to do the decoding ourselves, we will // use our own preferences (and weight) to negotiate the encoding. - 'accept-encoding': negotiateContentEncoding( + 'accept-encoding': buildProxiedContentEncoding( req.headers['accept-encoding'], - ctx.cfg.proxy.preferCompressed - ? ACCEPT_ENCODING_COMPRESSED - : ACCEPT_ENCODING_UNCOMPRESSED, + ctx.cfg.proxy.preferCompressed, ), authorization: options?.iss @@ -367,116 +352,6 @@ function handleUpstreamRequestError( // Request parsing/forwarding // ------------------- -type AcceptFlags = { q: number } -type Accept = [name: string, flags: AcceptFlags] - -// accept-encoding defaults to "identity with lowest priority" -const ACCEPT_ENC_DEFAULT = ['identity', { q: 0.001 }] as const satisfies Accept -const ACCEPT_FORBID_STAR = ['*', { q: 0 }] as const satisfies Accept - -function negotiateContentEncoding( - acceptHeader: undefined | string | string[], - preferences: readonly Accept[], -): string { - const acceptMap = Object.fromEntries( - parseAcceptEncoding(acceptHeader), - ) - - // Make sure the default (identity) is covered by the preferences - if (!preferences.some(coversIdentityAccept)) { - preferences = [...preferences, ACCEPT_ENC_DEFAULT] - } - - const common = preferences.filter(([name]) => { - const acceptQ = (acceptMap[name] ?? acceptMap['*'])?.q - // Per HTTP/1.1, "identity" is always acceptable unless explicitly rejected - if (name === 'identity') { - return acceptQ == null || acceptQ > 0 - } else { - return acceptQ != null && acceptQ > 0 - } - }) - - // Since "identity" was present in the preferences, a missing "identity" in - // the common array means that the client explicitly rejected it. Let's reflect - // this by adding it to the common array. - if (!common.some(coversIdentityAccept)) { - common.push(ACCEPT_FORBID_STAR) - } - - // If no common encodings are acceptable, throw a 406 Not Acceptable error - if (!common.some(isAllowedAccept)) { - throw new XRPCServerError( - ResponseType.NotAcceptable, - 'this service does not support any of the requested encodings', - ) - } - - return formatAcceptHeader(common as [Accept, ...Accept[]]) -} - -function coversIdentityAccept([name]: Accept): boolean { - return name === 'identity' || name === '*' -} - -function isAllowedAccept([, flags]: Accept): boolean { - return flags.q > 0 -} - -/** - * @see {@link https://developer.mozilla.org/en-US/docs/Glossary/Quality_values} - */ -function formatAcceptHeader(accept: readonly [Accept, ...Accept[]]): string { - return accept.map(formatAcceptPart).join(',') -} - -function formatAcceptPart([name, flags]: Accept): string { - return `${name};q=${flags.q}` -} - -function parseAcceptEncoding( - acceptEncodings: undefined | string | string[], -): Accept[] { - if (!acceptEncodings?.length) return [] - - return Array.isArray(acceptEncodings) - ? acceptEncodings.flatMap(parseAcceptEncoding) - : acceptEncodings.split(',').map(parseAcceptEncodingDefinition) -} - -function parseAcceptEncodingDefinition(def: string): Accept { - const { length, 0: encoding, 1: params } = def.trim().split(';', 3) - - if (length > 2) { - throw new InvalidRequestError(`Invalid accept-encoding: "${def}"`) - } - - if (!encoding || encoding.includes('=')) { - throw new InvalidRequestError(`Invalid accept-encoding: "${def}"`) - } - - const flags = { q: 1 } - if (length === 2) { - const { length, 0: key, 1: value } = params.split('=', 3) - if (length !== 2) { - throw new InvalidRequestError(`Invalid accept-encoding: "${def}"`) - } - - if (key === 'q' || key === 'Q') { - const q = parseFloat(value) - if (q === 0 || (Number.isFinite(q) && q <= 1 && q >= 0.001)) { - flags.q = q - } else { - throw new InvalidRequestError(`Invalid accept-encoding: "${def}"`) - } - } else { - throw new InvalidRequestError(`Invalid accept-encoding: "${def}"`) - } - } - - return [encoding.toLowerCase(), flags] -} - export function isJsonContentType(contentType?: string): boolean | undefined { if (!contentType) return undefined return /application\/(?:\w+\+)?json/i.test(contentType) diff --git a/packages/pds/tests/_util.ts b/packages/pds/tests/_util.ts index ee57c62b9fe..cf5c2adec39 100644 --- a/packages/pds/tests/_util.ts +++ b/packages/pds/tests/_util.ts @@ -1,7 +1,10 @@ +import { lexToJson } from '@atproto/lexicon' import { AtUri } from '@atproto/syntax' +import { type Express } from 'express' import { CID } from 'multiformats/cid' +import { Server } from 'node:http' +import { AddressInfo } from 'node:net' import { FeedViewPost } from '../src/lexicon/types/app/bsky/feed/defs' -import { lexToJson } from '@atproto/lexicon' // Swap out identifiers and dates with stable // values for the purpose of snapshot testing @@ -156,3 +159,46 @@ export const paginateAll = async ( } while (cursor && results.length < limit) return results } + +export async function startServer(app: Express) { + return new Promise<{ + origin: string + server: Server + stop: () => Promise + }>((resolve, reject) => { + const onListen = () => { + const port = (server.address() as AddressInfo).port + resolve({ + server, + origin: `http://localhost:${port}`, + stop: () => stopServer(server), + }) + cleanup() + } + const onError = (err: Error) => { + reject(err) + cleanup() + } + const cleanup = () => { + server.removeListener('listening', onListen) + server.removeListener('error', onError) + } + + const server = app + .listen(0) + .once('listening', onListen) + .once('error', onError) + }) +} + +export async function stopServer(server: Server) { + return new Promise((resolve, reject) => { + server.close((err) => { + if (err) { + reject(err) + } else { + resolve() + } + }) + }) +} diff --git a/packages/pds/tests/proxied/proxy-header.test.ts b/packages/pds/tests/proxied/proxy-header.test.ts index 1dfdc2aff47..131d4a28aee 100644 --- a/packages/pds/tests/proxied/proxy-header.test.ts +++ b/packages/pds/tests/proxied/proxy-header.test.ts @@ -1,14 +1,13 @@ -import http from 'node:http' -import assert from 'node:assert' -import express from 'express' -import axios from 'axios' -import * as plc from '@did-plc/lib' -import { SeedClient, TestNetworkNoAppView, usersSeed } from '@atproto/dev-env' import { Keypair } from '@atproto/crypto' +import { SeedClient, TestNetworkNoAppView, usersSeed } from '@atproto/dev-env' import { verifyJwt } from '@atproto/xrpc-server' -import { parseProxyHeader } from '../../src/pipethrough' +import * as plc from '@did-plc/lib' +import express from 'express' +import assert from 'node:assert' import { once } from 'node:events' +import http from 'node:http' import { AddressInfo } from 'node:net' +import { parseProxyHeader } from '../../src/pipethrough' describe('proxy header', () => { let network: TestNetworkNoAppView @@ -40,19 +39,6 @@ describe('proxy header', () => { await network.close() }) - const assertAxiosErr = async (promise: Promise, msg: string) => { - try { - await promise - } catch (err) { - if (!axios.isAxiosError(err)) { - throw err - } - expect(err.response?.data?.['message']).toEqual(msg) - return - } - throw new Error('no error thrown') - } - it('parses proxy header', async () => { expect(parseProxyHeader(network.pds.ctx, `#atproto_test`)).rejects.toThrow( 'no did specified in proxy header', @@ -84,7 +70,7 @@ describe('proxy header', () => { it('proxies requests based on header', async () => { const path = `/xrpc/app.bsky.actor.getProfile?actor=${alice}` - await axios.get(`${network.pds.url}${path}`, { + await fetch(`${network.pds.url}${path}`, { headers: { ...sc.getHeaders(alice), 'atproto-proxy': `${proxyServer.did}#atproto_test`, @@ -106,37 +92,49 @@ describe('proxy header', () => { it('fails on a non-existant did', async () => { const path = `/xrpc/app.bsky.actor.getProfile?actor=${alice}` - const attempt = axios.get(`${network.pds.url}${path}`, { + const response = await fetch(`${network.pds.url}${path}`, { headers: { ...sc.getHeaders(alice), 'atproto-proxy': `did:plc:12345678123456781234578#atproto_test`, }, }) - await assertAxiosErr(attempt, 'could not resolve proxy did') + + await expect(response.json()).resolves.toMatchObject({ + message: 'could not resolve proxy did', + }) + expect(proxyServer.requests.length).toBe(1) }) it('fails when a service is not specified', async () => { const path = `/xrpc/app.bsky.actor.getProfile?actor=${alice}` - const attempt = axios.get(`${network.pds.url}${path}`, { + const response = await fetch(`${network.pds.url}${path}`, { headers: { ...sc.getHeaders(alice), 'atproto-proxy': proxyServer.did, }, }) - await assertAxiosErr(attempt, 'no service id specified in proxy header') + + await expect(response.json()).resolves.toMatchObject({ + message: 'no service id specified in proxy header', + }) + expect(proxyServer.requests.length).toBe(1) }) it('fails on a non-existant service', async () => { const path = `/xrpc/app.bsky.actor.getProfile?actor=${alice}` - const attempt = axios.get(`${network.pds.url}${path}`, { + const response = await fetch(`${network.pds.url}${path}`, { headers: { ...sc.getHeaders(alice), 'atproto-proxy': `${proxyServer.did}#atproto_bad`, }, }) - await assertAxiosErr(attempt, 'could not resolve proxy did service url') + + await expect(response.json()).resolves.toMatchObject({ + message: 'could not resolve proxy did service url', + }) + expect(proxyServer.requests.length).toBe(1) }) }) diff --git a/packages/pds/tests/server.test.ts b/packages/pds/tests/server.test.ts index 90defe391b1..ea36d4b6b44 100644 --- a/packages/pds/tests/server.test.ts +++ b/packages/pds/tests/server.test.ts @@ -1,11 +1,12 @@ -import { AddressInfo } from 'net' -import express from 'express' -import axios, { AxiosError } from 'axios' -import { SeedClient, TestNetworkNoAppView } from '@atproto/dev-env' import { AtpAgent, AtUri } from '@atproto/api' +import { randomStr } from '@atproto/crypto' +import { SeedClient, TestNetworkNoAppView } from '@atproto/dev-env' +import express from 'express' +import { finished } from 'node:stream/promises' +import { request } from 'undici' import { handler as errorHandler } from '../src/error' +import { startServer } from './_util' import basicSeed from './seeds/basic' -import { randomStr } from '@atproto/crypto' describe('server', () => { let network: TestNetworkNoAppView @@ -31,53 +32,42 @@ describe('server', () => { }) it('preserves 404s.', async () => { - const promise = axios.get(`${network.pds.url}/unknown`) - await expect(promise).rejects.toThrow('failed with status code 404') + const res = await fetch(`${network.pds.url}/unknown`) + expect(res.status).toEqual(404) }) it('error handler turns unknown errors into 500s.', async () => { const app = express() - app.get('/oops', () => { - throw new Error('Oops!') - }) - app.use(errorHandler) - const srv = app.listen() - const port = (srv.address() as AddressInfo).port - const promise = axios.get(`http://localhost:${port}/oops`) - await expect(promise).rejects.toThrow('failed with status code 500') - srv.close() + .get('/oops', () => { + throw new Error('Oops!') + }) + .use(errorHandler) + + const { origin, stop } = await startServer(app) try { - await promise - } catch (err: unknown) { - const axiosError = err as AxiosError - expect(axiosError.response?.status).toEqual(500) - expect(axiosError.response?.data).toEqual({ + const res = await fetch(new URL(`/oops`, origin)) + expect(res.status).toEqual(500) + await expect(res.json()).resolves.toEqual({ error: 'InternalServerError', message: 'Internal Server Error', }) + } finally { + await stop() } }) it('limits size of json input.', async () => { - let error: AxiosError - try { - await axios.post( - `${network.pds.url}/xrpc/com.atproto.repo.createRecord`, - { - data: 'x'.repeat(150 * 1024), // 150kb - }, - { headers: sc.getHeaders(alice) }, - ) - throw new Error('Request should have failed') - } catch (err) { - if (axios.isAxiosError(err)) { - error = err - } else { - throw err - } - } - expect(error.response?.status).toEqual(413) - expect(error.response?.data).toEqual({ + const res = await fetch( + `${network.pds.url}/xrpc/com.atproto.repo.createRecord`, + { + method: 'POST', + body: 'x'.repeat(150 * 1024), // 150kb + headers: sc.getHeaders(alice), + }, + ) + + expect(res.status).toEqual(413) + await expect(res.json()).resolves.toEqual({ error: 'PayloadTooLargeError', message: 'request entity too large', }) @@ -102,56 +92,53 @@ describe('server', () => { ) const uri = new AtUri(createRes.data.uri) - const res = await axios.get( + const res = await request( `${network.pds.url}/xrpc/com.atproto.repo.getRecord?repo=${uri.host}&collection=${uri.collection}&rkey=${uri.rkey}`, { - decompress: false, headers: { ...sc.getHeaders(alice), 'accept-encoding': 'gzip' }, }, ) + await finished(res.body.resume()) + expect(res.headers['content-encoding']).toEqual('gzip') }) it('compresses large car file responses', async () => { - const res = await axios.get( + const res = await request( `${network.pds.url}/xrpc/com.atproto.sync.getRepo?did=${alice}`, - { decompress: false, headers: { 'accept-encoding': 'gzip' } }, + { headers: { 'accept-encoding': 'gzip' } }, ) + + await finished(res.body.resume()) + expect(res.headers['content-encoding']).toEqual('gzip') }) it('does not compress small payloads', async () => { - const res = await axios.get(`${network.pds.url}/xrpc/_health`, { - decompress: false, + const res = await request(`${network.pds.url}/xrpc/_health`, { headers: { 'accept-encoding': 'gzip' }, }) + + await finished(res.body.resume()) + expect(res.headers['content-encoding']).toBeUndefined() }) it('healthcheck succeeds when database is available.', async () => { - const { data, status } = await axios.get(`${network.pds.url}/xrpc/_health`) - expect(status).toEqual(200) - expect(data).toEqual({ version: '0.0.0' }) + const res = await fetch(`${network.pds.url}/xrpc/_health`) + expect(res.status).toEqual(200) + await expect(res.json()).resolves.toEqual({ version: '0.0.0' }) }) // @TODO this is hanging for some unknown reason it.skip('healthcheck fails when database is unavailable.', async () => { await network.pds.ctx.accountManager.db.close() - let error: AxiosError - try { - await axios.get(`${network.pds.url}/xrpc/_health`) - throw new Error('Healthcheck should have failed') - } catch (err) { - if (axios.isAxiosError(err)) { - error = err - } else { - throw err - } - } - expect(error.response?.status).toEqual(503) - expect(error.response?.data).toEqual({ - version: '0.0.0', + + const response = await fetch(`${network.pds.url}/xrpc/_health`) + expect(response.status).toEqual(503) + await expect(response.json()).resolves.toEqual({ + version: 'unknown', error: 'Service Unavailable', }) }) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 0abdcf7ada4..acf08a4db61 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -27,8 +27,8 @@ importers: specifier: ^28.1.4 version: 28.1.4 '@types/node': - specifier: ^18.19.56 - version: 18.19.56 + specifier: ^18.19.67 + version: 18.19.67 '@typescript-eslint/eslint-plugin': specifier: ^7.4.0 version: 7.4.0(@typescript-eslint/parser@7.4.0)(eslint@8.57.0)(typescript@5.6.3) @@ -49,7 +49,7 @@ importers: version: 5.1.3(eslint-config-prettier@9.1.0)(eslint@8.57.0)(prettier@3.2.5) jest: specifier: ^28.1.2 - version: 28.1.2(@types/node@18.19.56)(ts-node@10.8.2) + version: 28.1.2(@types/node@18.19.67)(ts-node@10.8.2) node-gyp: specifier: ^9.3.1 version: 9.3.1 @@ -98,7 +98,7 @@ importers: version: link:../lex-cli jest: specifier: ^28.1.2 - version: 28.1.2(@types/node@18.19.56)(ts-node@10.8.2) + version: 28.1.2(@types/node@18.19.67)(ts-node@10.8.2) prettier: specifier: ^3.2.5 version: 3.2.5 @@ -148,6 +148,12 @@ importers: packages/bsky: dependencies: + '@atproto-labs/fetch-node': + specifier: workspace:* + version: link:../internal/fetch-node + '@atproto-labs/xrpc-utils': + specifier: workspace:* + version: link:../internal/xrpc-utils '@atproto/api': specifier: workspace:^ version: link:../api @@ -157,6 +163,9 @@ importers: '@atproto/crypto': specifier: workspace:^ version: link:../crypto + '@atproto/did': + specifier: workspace:^ + version: link:../did '@atproto/identity': specifier: workspace:^ version: link:../identity @@ -190,6 +199,9 @@ importers: '@did-plc/lib': specifier: ^0.0.1 version: 0.0.1 + '@types/http-errors': + specifier: ^2.0.1 + version: 2.0.1 compression: specifier: ^1.7.4 version: 1.7.4 @@ -250,6 +262,9 @@ importers: uint8arrays: specifier: 3.0.0 version: 3.0.0 + undici: + specifier: ^6.19.8 + version: 6.19.8 devDependencies: '@atproto/lex-cli': specifier: workspace:^ @@ -287,15 +302,12 @@ importers: '@types/qs': specifier: ^6.9.7 version: 6.9.7 - axios: - specifier: ^0.27.2 - version: 0.27.2 jest: specifier: ^28.1.2 - version: 28.1.2(@types/node@18.19.56)(ts-node@10.8.2) + version: 28.1.2(@types/node@18.19.67)(ts-node@10.8.2) ts-node: specifier: ^10.8.2 - version: 10.8.2(@swc/core@1.3.42)(@types/node@18.19.56)(typescript@5.6.3) + version: 10.8.2(@swc/core@1.3.42)(@types/node@18.19.67)(typescript@5.6.3) typescript: specifier: ^5.6.3 version: 5.6.3 @@ -350,10 +362,10 @@ importers: version: 5.1.1 jest: specifier: ^28.1.2 - version: 28.1.2(@types/node@18.19.56)(ts-node@10.8.2) + version: 28.1.2(@types/node@18.19.67)(ts-node@10.8.2) ts-node: specifier: ^10.8.2 - version: 10.8.2(@swc/core@1.3.42)(@types/node@18.19.56)(typescript@5.6.3) + version: 10.8.2(@swc/core@1.3.42)(@types/node@18.19.67)(typescript@5.6.3) typescript: specifier: ^5.6.3 version: 5.6.3 @@ -381,7 +393,7 @@ importers: devDependencies: jest: specifier: ^28.1.2 - version: 28.1.2(@types/node@18.19.56)(ts-node@10.8.2) + version: 28.1.2(@types/node@18.19.67)(ts-node@10.8.2) typescript: specifier: ^5.6.3 version: 5.6.3 @@ -406,7 +418,7 @@ importers: devDependencies: jest: specifier: ^28.1.2 - version: 28.1.2(@types/node@18.19.56)(ts-node@10.8.2) + version: 28.1.2(@types/node@18.19.67)(ts-node@10.8.2) typescript: specifier: ^5.6.3 version: 5.6.3 @@ -428,7 +440,7 @@ importers: version: link:../common jest: specifier: ^28.1.2 - version: 28.1.2(@types/node@18.19.56)(ts-node@10.8.2) + version: 28.1.2(@types/node@18.19.67)(ts-node@10.8.2) typescript: specifier: ^5.6.3 version: 5.6.3 @@ -477,9 +489,6 @@ importers: '@did-plc/server': specifier: ^0.0.1 version: 0.0.1 - axios: - specifier: ^0.27.2 - version: 0.27.2 dotenv: specifier: ^16.0.3 version: 16.0.3 @@ -495,6 +504,9 @@ importers: uint8arrays: specifier: 3.0.0 version: 3.0.0 + undici: + specifier: ^6.14.1 + version: 6.19.8 devDependencies: '@types/express': specifier: ^4.17.13 @@ -521,9 +533,6 @@ importers: '@atproto/crypto': specifier: workspace:^ version: link:../crypto - axios: - specifier: ^0.27.2 - version: 0.27.2 devDependencies: '@did-plc/lib': specifier: ^0.0.1 @@ -542,7 +551,7 @@ importers: version: 6.1.2 jest: specifier: ^28.1.2 - version: 28.1.2(@types/node@18.19.56)(ts-node@10.8.2) + version: 28.1.2(@types/node@18.19.67)(ts-node@10.8.2) typescript: specifier: ^5.6.3 version: 5.6.3 @@ -700,6 +709,19 @@ importers: specifier: ^5.6.3 version: 5.6.3 + packages/internal/xrpc-utils: + dependencies: + '@atproto/xrpc': + specifier: workspace:^ + version: link:../../xrpc + '@atproto/xrpc-server': + specifier: workspace:^ + version: link:../../xrpc-server + devDependencies: + typescript: + specifier: ^5.6.3 + version: 5.6.3 + packages/lex-cli: dependencies: '@atproto/lexicon': @@ -751,7 +773,7 @@ importers: devDependencies: jest: specifier: ^28.1.2 - version: 28.1.2(@types/node@18.19.56)(ts-node@10.8.2) + version: 28.1.2(@types/node@18.19.67)(ts-node@10.8.2) typescript: specifier: ^5.6.3 version: 5.6.3 @@ -1144,9 +1166,6 @@ importers: '@did-plc/lib': specifier: ^0.0.1 version: 0.0.1 - axios: - specifier: ^1.6.7 - version: 1.6.7 compression: specifier: ^1.7.4 version: 1.7.4 @@ -1186,6 +1205,9 @@ importers: uint8arrays: specifier: 3.0.0 version: 3.0.0 + undici: + specifier: ^6.14.1 + version: 6.19.8 devDependencies: '@atproto/lex-cli': specifier: workspace:^ @@ -1213,10 +1235,10 @@ importers: version: 6.9.7 jest: specifier: ^28.1.2 - version: 28.1.2(@types/node@18.19.56)(ts-node@10.8.2) + version: 28.1.2(@types/node@18.19.67)(ts-node@10.8.2) ts-node: specifier: ^10.8.2 - version: 10.8.2(@swc/core@1.3.42)(@types/node@18.19.56)(typescript@5.6.3) + version: 10.8.2(@swc/core@1.3.42)(@types/node@18.19.67)(typescript@5.6.3) typescript: specifier: ^5.6.3 version: 5.6.3 @@ -1226,6 +1248,9 @@ importers: '@atproto-labs/fetch-node': specifier: workspace:* version: link:../internal/fetch-node + '@atproto-labs/xrpc-utils': + specifier: workspace:* + version: link:../internal/xrpc-utils '@atproto/api': specifier: workspace:^ version: link:../api @@ -1374,9 +1399,6 @@ importers: '@types/qs': specifier: ^6.9.7 version: 6.9.7 - axios: - specifier: ^0.27.2 - version: 0.27.2 esbuild: specifier: ^0.14.48 version: 0.14.48 @@ -1388,13 +1410,13 @@ importers: version: 6.1.2 jest: specifier: ^28.1.2 - version: 28.1.2(@types/node@18.19.56)(ts-node@10.8.2) + version: 28.1.2(@types/node@18.19.67)(ts-node@10.8.2) puppeteer: specifier: ^23.5.2 version: 23.5.3(typescript@5.6.3) ts-node: specifier: ^10.8.2 - version: 10.8.2(@swc/core@1.3.42)(@types/node@18.19.56)(typescript@5.6.3) + version: 10.8.2(@swc/core@1.3.42)(@types/node@18.19.67)(typescript@5.6.3) typescript: specifier: ^5.6.3 version: 5.6.3 @@ -1434,7 +1456,7 @@ importers: devDependencies: jest: specifier: ^28.1.2 - version: 28.1.2(@types/node@18.19.56)(ts-node@10.8.2) + version: 28.1.2(@types/node@18.19.67)(ts-node@10.8.2) typescript: specifier: ^5.6.3 version: 5.6.3 @@ -1471,7 +1493,7 @@ importers: devDependencies: jest: specifier: ^28.1.2 - version: 28.1.2(@types/node@18.19.56)(ts-node@10.8.2) + version: 28.1.2(@types/node@18.19.67)(ts-node@10.8.2) typescript: specifier: ^5.6.3 version: 5.6.3 @@ -1480,7 +1502,7 @@ importers: devDependencies: jest: specifier: ^28.1.2 - version: 28.1.2(@types/node@18.19.56)(ts-node@10.8.2) + version: 28.1.2(@types/node@18.19.67)(ts-node@10.8.2) typescript: specifier: ^5.6.3 version: 5.6.3 @@ -1554,7 +1576,7 @@ importers: version: 6.1.2 jest: specifier: ^28.1.2 - version: 28.1.2(@types/node@18.19.56)(ts-node@10.8.2) + version: 28.1.2(@types/node@18.19.67)(ts-node@10.8.2) jose: specifier: ^4.15.4 version: 4.15.4 @@ -4865,7 +4887,7 @@ packages: '@atproto/common': 0.1.0 '@atproto/crypto': 0.1.0 '@ipld/dag-cbor': 7.0.3 - axios: 1.6.2 + axios: 1.6.7 multiformats: 9.9.0 uint8arrays: 3.0.0 zod: 3.23.8 @@ -4891,7 +4913,7 @@ packages: '@atproto/common': 0.1.0 '@atproto/crypto': 0.1.0 '@did-plc/lib': 0.0.4 - axios: 1.4.0 + axios: 1.6.7 cors: 2.8.5 express: 4.18.2 express-async-errors: 3.1.1(express@4.18.2) @@ -5245,7 +5267,7 @@ packages: engines: {node: ^12.13.0 || ^14.15.0 || ^16.10.0 || >=17.0.0} dependencies: '@jest/types': 28.1.3 - '@types/node': 18.19.56 + '@types/node': 18.19.67 chalk: 4.1.2 jest-message-util: 28.1.3 jest-util: 28.1.3 @@ -5266,14 +5288,14 @@ packages: '@jest/test-result': 28.1.3 '@jest/transform': 28.1.3 '@jest/types': 28.1.3 - '@types/node': 18.19.56 + '@types/node': 18.19.67 ansi-escapes: 4.3.2 chalk: 4.1.2 ci-info: 3.8.0 exit: 0.1.2 graceful-fs: 4.2.11 jest-changed-files: 28.1.3 - jest-config: 28.1.3(@types/node@18.19.56)(ts-node@10.8.2) + jest-config: 28.1.3(@types/node@18.19.67)(ts-node@10.8.2) jest-haste-map: 28.1.3 jest-message-util: 28.1.3 jest-regex-util: 28.0.2 @@ -5308,7 +5330,7 @@ packages: dependencies: '@jest/fake-timers': 28.1.3 '@jest/types': 28.1.3 - '@types/node': 18.19.56 + '@types/node': 18.19.67 jest-mock: 28.1.3 dev: true @@ -5335,7 +5357,7 @@ packages: dependencies: '@jest/types': 28.1.3 '@sinonjs/fake-timers': 9.1.2 - '@types/node': 18.19.56 + '@types/node': 18.19.67 jest-message-util: 28.1.3 jest-mock: 28.1.3 jest-util: 28.1.3 @@ -5367,7 +5389,7 @@ packages: '@jest/transform': 28.1.3 '@jest/types': 28.1.3 '@jridgewell/trace-mapping': 0.3.19 - '@types/node': 18.19.56 + '@types/node': 18.19.67 chalk: 4.1.2 collect-v8-coverage: 1.0.2 exit: 0.1.2 @@ -5455,7 +5477,7 @@ packages: dependencies: '@types/istanbul-lib-coverage': 2.0.4 '@types/istanbul-reports': 3.0.1 - '@types/node': 18.19.56 + '@types/node': 18.19.67 '@types/yargs': 16.0.5 chalk: 4.1.2 dev: true @@ -5467,7 +5489,7 @@ packages: '@jest/schemas': 28.1.3 '@types/istanbul-lib-coverage': 2.0.4 '@types/istanbul-reports': 3.0.1 - '@types/node': 18.19.56 + '@types/node': 18.19.67 '@types/yargs': 17.0.24 chalk: 4.1.2 dev: true @@ -6226,18 +6248,18 @@ packages: /@types/bn.js@5.1.1: resolution: {integrity: sha512-qNrYbZqMx0uJAfKnKclPh+dTwK33KfLHYqtyODwd5HnXOjnkhc4qgn3BrK6RWyGZm5+sIFE7Q7Vz6QQtJB7w7g==} dependencies: - '@types/node': 18.19.56 + '@types/node': 18.19.67 /@types/body-parser@1.19.2: resolution: {integrity: sha512-ALYone6pm6QmwZoAgeyNksccT9Q4AWZQ6PvfwR37GT6r6FWUPguq6sUmNGSMV2Wr761oQoBxwGGa6DR5o1DC9g==} dependencies: '@types/connect': 3.4.35 - '@types/node': 18.19.56 + '@types/node': 18.19.67 /@types/connect@3.4.35: resolution: {integrity: sha512-cdeYyv4KWoEgpBISTxWvqYsVy444DOqehiF3fM3ne10AmJ62RSyNkUnxMJXHQWRQQX2eR94m5y1IZyDwBjV9FQ==} dependencies: - '@types/node': 18.19.56 + '@types/node': 18.19.67 /@types/cookie@0.6.0: resolution: {integrity: sha512-4Kh9a6B2bQciAhf7FSuMRRkUWecJgJu9nPnx3yzpsfXX/c50REIqpHY4C82bXP90qrLtXtkDxTZosYO3UpOwlA==} @@ -6284,7 +6306,7 @@ packages: /@types/graceful-fs@4.1.6: resolution: {integrity: sha512-Sig0SNORX9fdW+bQuTEovKj3uHcUL6LQKbCrrqb1X7J6/ReAbhCXRAhc+SMejhLELFj2QcyuxmUooZ4bt5ReSw==} dependencies: - '@types/node': 18.19.56 + '@types/node': 18.19.67 dev: true /@types/http-errors@2.0.1: @@ -6346,6 +6368,11 @@ packages: dependencies: undici-types: 5.26.5 + /@types/node@18.19.67: + resolution: {integrity: sha512-wI8uHusga+0ZugNp0Ol/3BqQfEcCCNfojtO6Oou9iVNGPTL6QNSdnUdqq85fRgIorLhLMuPIKpsN98QE9Nh+KQ==} + dependencies: + undici-types: 5.26.5 + /@types/nodemailer@6.4.6: resolution: {integrity: sha512-pD6fL5GQtUKvD2WnPmg5bC2e8kWCAPDwMPmHe/ohQbW+Dy0EcHgZ2oCSuPlWNqk74LS5BVMig1SymQbFMPPK3w==} dependencies: @@ -6407,7 +6434,7 @@ packages: resolution: {integrity: sha512-Cwo8LE/0rnvX7kIIa3QHCkcuF21c05Ayb0ZfxPiv0W8VRiZiNW/WuRupHKpqqGVGf7SUA44QSOUKaEd9lIrd/Q==} dependencies: '@types/mime': 1.3.2 - '@types/node': 18.19.56 + '@types/node': 18.19.67 /@types/send@0.17.4: resolution: {integrity: sha512-x2EM6TJOybec7c52BX0ZspPodMsQUd5L6PRwOunVyVUhXiBSKf3AezDL8Dgvgt5o0UfKNfuA0eMLr2wLT4AiBA==} @@ -6421,7 +6448,7 @@ packages: dependencies: '@types/http-errors': 2.0.1 '@types/mime': 3.0.1 - '@types/node': 18.19.56 + '@types/node': 18.19.67 /@types/shimmer@1.0.5: resolution: {integrity: sha512-9Hp0ObzwwO57DpLFF0InUjUm/II8GmKAvzbefxQTihCb7KI6yc9yzf0nLc4mVdby5N4DRCgQM2wCup9KTieeww==} @@ -6457,7 +6484,7 @@ packages: resolution: {integrity: sha512-oJoftv0LSuaDZE3Le4DbKX+KS9G36NzOeSap90UIK0yMA/NhKJhqlSGtNDORNRaIbQfzjXDrQa0ytJ6mNRGz/Q==} requiresBuild: true dependencies: - '@types/node': 18.19.56 + '@types/node': 18.19.67 dev: true optional: true @@ -6899,32 +6926,6 @@ packages: resolution: {integrity: sha512-aDczADvlvTGajTDjcjpJMqRkOF6Qdz3YbPZm/PyW6tKPkx2hlYBzxMhEywM/tU72HrVZjgl5VCdRuMlA7pZ8Gw==} dev: false - /axios@0.27.2: - resolution: {integrity: sha512-t+yRIyySRTp/wua5xEr+z1q60QmLq8ABsS5O9Me1AsE5dfKqgnCFzwiCZZ/cGNd1lq4/7akDWMxdhVlucjmnOQ==} - dependencies: - follow-redirects: 1.15.2 - form-data: 4.0.0 - transitivePeerDependencies: - - debug - - /axios@1.4.0: - resolution: {integrity: sha512-S4XCWMEmzvo64T9GfvQDOXgYRDJ/wsSZc7Jvdgx5u1sd0JwsuPLqb3SYmusag+edF6ziyMensPVqLTSc1PiSEA==} - dependencies: - follow-redirects: 1.15.2 - form-data: 4.0.0 - proxy-from-env: 1.1.0 - transitivePeerDependencies: - - debug - - /axios@1.6.2: - resolution: {integrity: sha512-7i24Ri4pmDRfJTR7LDBhsOTtcm+9kjX5WiY1X3wIisx6G9So3pfMkEiU7emUBe46oceVImccTEM3k6C5dbVW8A==} - dependencies: - follow-redirects: 1.15.3 - form-data: 4.0.0 - proxy-from-env: 1.1.0 - transitivePeerDependencies: - - debug - /axios@1.6.7: resolution: {integrity: sha512-/hDJGff6/c7u0hDkvkGxR/oy6CbCs8ziCsC7SqmhjfozqiJGc8Z11wrv9z9lYfY4K8l+H9TpjcMDX0xOZmx+RA==} dependencies: @@ -8898,24 +8899,6 @@ packages: resolution: {integrity: sha512-5nqDSxl8nn5BSNxyR3n4I6eDmbolI6WT+QqR547RwxQapgjQBmtktdP+HTBb/a/zLsbzERTONyUB5pefh5TtjQ==} dev: true - /follow-redirects@1.15.2: - resolution: {integrity: sha512-VQLG33o04KaQ8uYi2tVNbdrWp1QWxNNea+nmIB4EVM28v0hmP17z7aG1+wAkNzVq4KeXTq3221ye5qTJP91JwA==} - engines: {node: '>=4.0'} - peerDependencies: - debug: '*' - peerDependenciesMeta: - debug: - optional: true - - /follow-redirects@1.15.3: - resolution: {integrity: sha512-1VzOtuEM8pC9SFU1E+8KfTjZyMztRsgEfwQl44z8A25uy13jSzTj6dyK2Df52iV0vgHCfBwLhDWevLn95w5v6Q==} - engines: {node: '>=4.0'} - peerDependencies: - debug: '*' - peerDependenciesMeta: - debug: - optional: true - /follow-redirects@1.15.5: resolution: {integrity: sha512-vSFWUON1B+yAw1VN4xMfxgn5fTUiaOzAJCKBwIIgT/+7CuGy9+r+5gITvP62j3RmaD5Ph65UaERdOSRGUzZtgw==} engines: {node: '>=4.0'} @@ -9842,7 +9825,7 @@ packages: '@jest/expect': 28.1.3 '@jest/test-result': 28.1.3 '@jest/types': 28.1.3 - '@types/node': 18.19.56 + '@types/node': 18.19.67 chalk: 4.1.2 co: 4.6.0 dedent: 0.7.0 @@ -9861,7 +9844,7 @@ packages: - supports-color dev: true - /jest-cli@28.1.3(@types/node@18.19.56)(ts-node@10.8.2): + /jest-cli@28.1.3(@types/node@18.19.67)(ts-node@10.8.2): resolution: {integrity: sha512-roY3kvrv57Azn1yPgdTebPAXvdR2xfezaKKYzVxZ6It/5NCxzJym6tUI5P1zkdWhfUYkxEI9uZWcQdaFLo8mJQ==} engines: {node: ^12.13.0 || ^14.15.0 || ^16.10.0 || >=17.0.0} hasBin: true @@ -9878,7 +9861,7 @@ packages: exit: 0.1.2 graceful-fs: 4.2.11 import-local: 3.1.0 - jest-config: 28.1.3(@types/node@18.19.56)(ts-node@10.8.2) + jest-config: 28.1.3(@types/node@18.19.67)(ts-node@10.8.2) jest-util: 28.1.3 jest-validate: 28.1.3 prompts: 2.4.2 @@ -9889,7 +9872,7 @@ packages: - ts-node dev: true - /jest-config@28.1.3(@types/node@18.19.56)(ts-node@10.8.2): + /jest-config@28.1.3(@types/node@18.19.67)(ts-node@10.8.2): resolution: {integrity: sha512-MG3INjByJ0J4AsNBm7T3hsuxKQqFIiRo/AUqb1q9LRKI5UU6Aar9JHbr9Ivn1TVwfUD9KirRoM/T6u8XlcQPHQ==} engines: {node: ^12.13.0 || ^14.15.0 || ^16.10.0 || >=17.0.0} peerDependencies: @@ -9904,7 +9887,7 @@ packages: '@babel/core': 7.18.6 '@jest/test-sequencer': 28.1.3 '@jest/types': 28.1.3 - '@types/node': 18.19.56 + '@types/node': 18.19.67 babel-jest: 28.1.3(@babel/core@7.18.6) chalk: 4.1.2 ci-info: 3.8.0 @@ -9924,7 +9907,7 @@ packages: pretty-format: 28.1.3 slash: 3.0.0 strip-json-comments: 3.1.1 - ts-node: 10.8.2(@swc/core@1.3.42)(@types/node@18.19.56)(typescript@5.6.3) + ts-node: 10.8.2(@swc/core@1.3.42)(@types/node@18.19.67)(typescript@5.6.3) transitivePeerDependencies: - supports-color dev: true @@ -9971,7 +9954,7 @@ packages: '@jest/environment': 28.1.3 '@jest/fake-timers': 28.1.3 '@jest/types': 28.1.3 - '@types/node': 18.19.56 + '@types/node': 18.19.67 jest-mock: 28.1.3 jest-util: 28.1.3 dev: true @@ -9987,7 +9970,7 @@ packages: dependencies: '@jest/types': 28.1.3 '@types/graceful-fs': 4.1.6 - '@types/node': 18.19.56 + '@types/node': 18.19.67 anymatch: 3.1.3 fb-watchman: 2.0.2 graceful-fs: 4.2.11 @@ -10038,7 +10021,7 @@ packages: engines: {node: ^12.13.0 || ^14.15.0 || ^16.10.0 || >=17.0.0} dependencies: '@jest/types': 28.1.3 - '@types/node': 18.19.56 + '@types/node': 18.19.67 dev: true /jest-pnp-resolver@1.2.3(jest-resolve@28.1.3): @@ -10092,7 +10075,7 @@ packages: '@jest/test-result': 28.1.3 '@jest/transform': 28.1.3 '@jest/types': 28.1.3 - '@types/node': 18.19.56 + '@types/node': 18.19.67 chalk: 4.1.2 emittery: 0.10.2 graceful-fs: 4.2.11 @@ -10178,7 +10161,7 @@ packages: engines: {node: ^12.13.0 || ^14.15.0 || ^16.10.0 || >=17.0.0} dependencies: '@jest/types': 28.1.3 - '@types/node': 18.19.56 + '@types/node': 18.19.67 chalk: 4.1.2 ci-info: 3.8.0 graceful-fs: 4.2.11 @@ -10203,7 +10186,7 @@ packages: dependencies: '@jest/test-result': 28.1.3 '@jest/types': 28.1.3 - '@types/node': 18.19.56 + '@types/node': 18.19.67 ansi-escapes: 4.3.2 chalk: 4.1.2 emittery: 0.10.2 @@ -10215,12 +10198,12 @@ packages: resolution: {integrity: sha512-CqRA220YV/6jCo8VWvAt1KKx6eek1VIHMPeLEbpcfSfkEeWyBNppynM/o6q+Wmw+sOhos2ml34wZbSX3G13//g==} engines: {node: ^12.13.0 || ^14.15.0 || ^16.10.0 || >=17.0.0} dependencies: - '@types/node': 18.19.56 + '@types/node': 18.19.67 merge-stream: 2.0.0 supports-color: 8.1.1 dev: true - /jest@28.1.2(@types/node@18.19.56)(ts-node@10.8.2): + /jest@28.1.2(@types/node@18.19.67)(ts-node@10.8.2): resolution: {integrity: sha512-Tuf05DwLeCh2cfWCQbcz9UxldoDyiR1E9Igaei5khjonKncYdc6LDfynKCEWozK0oLE3GD+xKAo2u8x/0s6GOg==} engines: {node: ^12.13.0 || ^14.15.0 || ^16.10.0 || >=17.0.0} hasBin: true @@ -10233,7 +10216,7 @@ packages: '@jest/core': 28.1.3(ts-node@10.8.2) '@jest/types': 28.1.3 import-local: 3.1.0 - jest-cli: 28.1.3(@types/node@18.19.56)(ts-node@10.8.2) + jest-cli: 28.1.3(@types/node@18.19.67)(ts-node@10.8.2) transitivePeerDependencies: - '@types/node' - supports-color @@ -12020,7 +12003,7 @@ packages: '@protobufjs/path': 1.1.2 '@protobufjs/pool': 1.1.0 '@protobufjs/utf8': 1.1.0 - '@types/node': 18.19.56 + '@types/node': 18.19.67 long: 5.2.3 dev: false @@ -13316,7 +13299,7 @@ packages: code-block-writer: 11.0.3 dev: false - /ts-node@10.8.2(@swc/core@1.3.42)(@types/node@18.19.56)(typescript@5.6.3): + /ts-node@10.8.2(@swc/core@1.3.42)(@types/node@18.19.67)(typescript@5.6.3): resolution: {integrity: sha512-LYdGnoGddf1D6v8REPtIH+5iq/gTDuZqv2/UJUU7tKjuEU8xVZorBM+buCGNjj+pGEud+sOoM4CX3/YzINpENA==} hasBin: true peerDependencies: @@ -13336,7 +13319,7 @@ packages: '@tsconfig/node12': 1.0.11 '@tsconfig/node14': 1.0.3 '@tsconfig/node16': 1.0.4 - '@types/node': 18.19.56 + '@types/node': 18.19.67 acorn: 8.10.0 acorn-walk: 8.2.0 arg: 4.1.3 diff --git a/tsconfig.json b/tsconfig.json index bf7dd01c153..a0e1801d0c0 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -20,6 +20,7 @@ { "path": "./packages/internal/rollup-plugin-bundle-manifest" }, { "path": "./packages/internal/simple-store" }, { "path": "./packages/internal/simple-store-memory" }, + { "path": "./packages/internal/xrpc-utils" }, { "path": "./packages/lex-cli" }, { "path": "./packages/lexicon" }, { "path": "./packages/oauth/jwk" },