Skip to content

Commit 229fe1b

Browse files
committed
feat(server): introduce maxResultSize limit and fix pg errors handling
1 parent a08764d commit 229fe1b

File tree

7 files changed

+187
-34
lines changed

7 files changed

+187
-34
lines changed

src/lib/db.ts

Lines changed: 97 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
import pg, { PoolConfig } from 'pg'
2-
import { DatabaseError } from 'pg-protocol'
1+
import pg from 'pg'
32
import { parse as parseArray } from 'postgres-array'
4-
import { PostgresMetaResult } from './types.js'
3+
import { PostgresMetaResult, PoolConfig } from './types.js'
54

65
pg.types.setTypeParser(pg.types.builtins.INT8, (x) => {
76
const asNumber = Number(x)
@@ -21,6 +20,38 @@ pg.types.setTypeParser(1185, parseArray) // _timestamptz
2120
pg.types.setTypeParser(600, (x) => x) // point
2221
pg.types.setTypeParser(1017, (x) => x) // _point
2322

23+
// Ensure any query will have an appropriate error handler on the pool to prevent connections errors
24+
// to bubble up all the stack eventually killing the server
25+
const poolerQueryHandleError = (pool: pg.Pool, sql: string): Promise<pg.QueryResult<any>> => {
26+
return new Promise((resolve, reject) => {
27+
let rejected = false
28+
const connectionErrorHandler = (err: any) => {
29+
// If the error hasn't already be propagated to the catch
30+
if (!rejected) {
31+
rejected = true
32+
reject(err)
33+
}
34+
}
35+
// This listened avoid getting uncaught exceptions for errors happening at connection level within the stream
36+
// such as parse or RESULT_SIZE_EXCEEDED errors instead, handle the error gracefully by bubbling in up to the caller
37+
pool.once('error', connectionErrorHandler)
38+
pool
39+
.query(sql)
40+
.then((results: pg.QueryResult<any>) => {
41+
if (!rejected) {
42+
return resolve(results)
43+
}
44+
})
45+
.catch((err: any) => {
46+
// If the error hasn't already be handled within the error listener
47+
if (!rejected) {
48+
rejected = true
49+
return reject(err)
50+
}
51+
})
52+
})
53+
}
54+
2455
export const init: (config: PoolConfig) => {
2556
query: (sql: string) => Promise<PostgresMetaResult<any>>
2657
end: () => Promise<void>
@@ -60,26 +91,27 @@ export const init: (config: PoolConfig) => {
6091
// compromise: if we run `query` after `pool.end()` is called (i.e. pool is
6192
// `null`), we temporarily create a pool and close it right after.
6293
let pool: pg.Pool | null = new pg.Pool(config)
94+
6395
return {
6496
async query(sql) {
6597
try {
6698
if (!pool) {
6799
const pool = new pg.Pool(config)
68-
let res = await pool.query(sql)
100+
let res = await poolerQueryHandleError(pool, sql)
69101
if (Array.isArray(res)) {
70102
res = res.reverse().find((x) => x.rows.length !== 0) ?? { rows: [] }
71103
}
72104
await pool.end()
73105
return { data: res.rows, error: null }
74106
}
75107

76-
let res = await pool.query(sql)
108+
let res = await poolerQueryHandleError(pool, sql)
77109
if (Array.isArray(res)) {
78110
res = res.reverse().find((x) => x.rows.length !== 0) ?? { rows: [] }
79111
}
80112
return { data: res.rows, error: null }
81113
} catch (error: any) {
82-
if (error instanceof DatabaseError) {
114+
if (error.constructor.name === 'DatabaseError') {
83115
// Roughly based on:
84116
// - https://github.com/postgres/postgres/blob/fc4089f3c65a5f1b413a3299ba02b66a8e5e37d0/src/interfaces/libpq/fe-protocol3.c#L1018
85117
// - https://github.com/brianc/node-postgres/blob/b1a8947738ce0af004cb926f79829bb2abc64aa6/packages/pg/lib/native/query.js#L33
@@ -147,7 +179,60 @@ ${' '.repeat(5 + lineNumber.toString().length + 2 + lineOffset)}^
147179
}
148180
}
149181

150-
return { data: null, error: { message: error.message } }
182+
// Handle stream errors and result size exceeded errors
183+
if (error.code === 'RESULT_SIZE_EXCEEDED') {
184+
// Force kill the connection without waiting for graceful shutdown
185+
const _pool = pool
186+
pool = null
187+
try {
188+
if (_pool) {
189+
// Force kill the connection by destroying the socket
190+
const client = (_pool as any)._clients?.[0]
191+
if (client?.connection?.stream) {
192+
client.connection.stream.destroy()
193+
}
194+
}
195+
} catch (endError) {
196+
// Ignore any errors during cleanup
197+
}
198+
return {
199+
data: null,
200+
error: {
201+
message: `Query result size (${error.resultSize} bytes) exceeded the configured limit (${error.maxResultSize} bytes)`,
202+
code: error.code,
203+
resultSize: error.resultSize,
204+
maxResultSize: error.maxResultSize,
205+
},
206+
}
207+
}
208+
209+
// Handle other stream errors
210+
if (error.code === 'STREAM_ERROR') {
211+
// Force kill the connection without waiting for graceful shutdown
212+
const _pool = pool
213+
pool = null
214+
try {
215+
if (_pool) {
216+
// Force kill the connection by destroying the socket
217+
const client = (_pool as any)._clients?.[0]
218+
if (client?.connection?.stream) {
219+
client.connection.stream.destroy()
220+
}
221+
}
222+
} catch (endError) {
223+
// Ignore any errors during cleanup
224+
}
225+
return {
226+
data: null,
227+
error: {
228+
message: 'Stream error occurred while processing query',
229+
code: error.code,
230+
details: error.message,
231+
},
232+
}
233+
}
234+
235+
return { data: null, error: { ...error, message: error.message } }
151236
}
152237
},
153238

@@ -156,7 +241,11 @@ ${' '.repeat(5 + lineNumber.toString().length + 2 + lineOffset)}^
156241
pool = null
157242
// Gracefully wait for active connections to be idle, then close all
158243
// connections in the pool.
159-
if (_pool) await _pool.end()
244+
if (_pool) {
245+
// Remove all listeners before ending to prevent memory leaks
246+
_pool.removeAllListeners()
247+
await _pool.end()
248+
}
160249
},
161250
}
162251
}

src/lib/types.ts

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { Static, Type } from '@sinclair/typebox'
22
import { DatabaseError } from 'pg-protocol'
33
import type { Options as PrettierOptions } from 'prettier'
4+
import { PoolConfig as PgPoolConfig } from 'pg'
45

56
export interface FormatterOptions extends PrettierOptions {}
67

@@ -251,13 +252,7 @@ export const postgresPublicationSchema = Type.Object({
251252
publish_delete: Type.Boolean(),
252253
publish_truncate: Type.Boolean(),
253254
tables: Type.Union([
254-
Type.Array(
255-
Type.Object({
256-
id: Type.Integer(),
257-
name: Type.String(),
258-
schema: Type.String(),
259-
})
260-
),
255+
Type.Array(Type.Object({ id: Type.Integer(), name: Type.String(), schema: Type.String() })),
261256
Type.Null(),
262257
]),
263258
})
@@ -445,12 +440,7 @@ export const postgresTypeSchema = Type.Object({
445440
schema: Type.String(),
446441
format: Type.String(),
447442
enums: Type.Array(Type.String()),
448-
attributes: Type.Array(
449-
Type.Object({
450-
name: Type.String(),
451-
type_id: Type.Integer(),
452-
})
453-
),
443+
attributes: Type.Array(Type.Object({ name: Type.String(), type_id: Type.Integer() })),
454444
comment: Type.Union([Type.String(), Type.Null()]),
455445
})
456446
export type PostgresType = Static<typeof postgresTypeSchema>
@@ -596,3 +586,7 @@ export const postgresColumnPrivilegesRevokeSchema = Type.Object({
596586
]),
597587
})
598588
export type PostgresColumnPrivilegesRevoke = Static<typeof postgresColumnPrivilegesRevokeSchema>
589+
590+
export interface PoolConfig extends PgPoolConfig {
591+
maxResultSize?: number
592+
}

src/server/app.ts

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,10 @@ import { PG_META_REQ_HEADER } from './constants.js'
55
import routes from './routes/index.js'
66
import { extractRequestForLogging } from './utils.js'
77
// Pseudo package declared only for this module
8-
import pkg from '#package.json' assert { type: 'json' }
8+
import pkg from '#package.json' with { type: 'json' }
99

1010
export const build = (opts: FastifyServerOptions = {}): FastifyInstance => {
11-
const app = fastify({
12-
disableRequestLogging: true,
13-
requestIdHeader: PG_META_REQ_HEADER,
14-
...opts,
15-
})
11+
const app = fastify({ disableRequestLogging: true, requestIdHeader: PG_META_REQ_HEADER, ...opts })
1612

1713
app.setErrorHandler((error, request, reply) => {
1814
app.log.error({ error: error.toString(), request: extractRequestForLogging(request) })

src/server/constants.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import crypto from 'crypto'
2-
import { PoolConfig } from 'pg'
2+
import { PoolConfig } from '../lib/types.js'
33
import { getSecret } from '../lib/secrets.js'
44
import { AccessControl } from './templates/swift.js'
5-
import pkg from '#package.json' assert { type: 'json' }
5+
import pkg from '#package.json' with { type: 'json' }
66

77
export const PG_META_HOST = process.env.PG_META_HOST || '0.0.0.0'
88
export const PG_META_PORT = Number(process.env.PG_META_PORT || 1337)
@@ -49,11 +49,16 @@ export const GENERATE_TYPES_SWIFT_ACCESS_CONTROL = process.env
4949
? (process.env.PG_META_GENERATE_TYPES_SWIFT_ACCESS_CONTROL as AccessControl)
5050
: 'internal'
5151

52+
export const PG_META_MAX_RESULT_SIZE = process.env.PG_META_MAX_RESULT_SIZE
53+
? parseInt(process.env.PG_META_MAX_RESULT_SIZE, 10)
54+
: 2 * 1024 * 1024 * 1024 // default to 2GB max query size result
55+
5256
export const DEFAULT_POOL_CONFIG: PoolConfig = {
5357
max: 1,
5458
connectionTimeoutMillis: PG_CONN_TIMEOUT_SECS * 1000,
5559
ssl: PG_META_DB_SSL_ROOT_CERT ? { ca: PG_META_DB_SSL_ROOT_CERT } : undefined,
5660
application_name: `postgres-meta ${pkg.version}`,
61+
maxResultSize: PG_META_MAX_RESULT_SIZE,
5762
}
5863

5964
export const PG_META_REQ_HEADER = process.env.PG_META_REQ_HEADER || 'request-id'

test/index.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,4 @@ import './server/query'
2222
import './server/ssl'
2323
import './server/table-privileges'
2424
import './server/typegen'
25+
import './server/result-size-limit'

test/lib/secrets.ts

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,7 @@ vi.mock('node:fs/promises', async (): Promise<typeof import('node:fs/promises')>
66
const originalModule =
77
await vi.importActual<typeof import('node:fs/promises')>('node:fs/promises')
88
const readFile = vi.fn()
9-
return {
10-
...originalModule,
11-
readFile,
12-
}
9+
return { ...originalModule, readFile }
1310
})
1411

1512
describe('getSecret', () => {
@@ -57,6 +54,6 @@ describe('getSecret', () => {
5754
const e: NodeJS.ErrnoException = new Error('permission denied')
5855
e.code = 'EACCES'
5956
vi.mocked(readFile).mockRejectedValueOnce(e)
60-
expect(getSecret('SECRET')).rejects.toThrow()
57+
await expect(getSecret('SECRET')).rejects.toThrow()
6158
})
6259
})

test/server/result-size-limit.ts

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
import { expect, test, beforeAll, afterAll, describe } from 'vitest'
2+
import { app } from './utils'
3+
import { pgMeta } from '../lib/utils'
4+
5+
describe('test max-result-size limit', () => {
6+
// Create a table with large data for testing
7+
beforeAll(async () => {
8+
// Create a table with a large text column
9+
await pgMeta.query(`
10+
CREATE TABLE large_data (
11+
id SERIAL PRIMARY KEY,
12+
data TEXT
13+
);
14+
`)
15+
await pgMeta.query(`
16+
CREATE TABLE small_data (
17+
id SERIAL PRIMARY KEY,
18+
data TEXT
19+
);
20+
`)
21+
22+
// Insert data that will exceed our limit in tests it's set around ~20MB
23+
await pgMeta.query(`
24+
INSERT INTO large_data (data)
25+
SELECT repeat('x', 1024 * 1024) -- 1MB of data per row
26+
FROM generate_series(1, 50);
27+
`)
28+
await pgMeta.query(`
29+
INSERT INTO small_data (data)
30+
SELECT repeat('x', 10 * 1024) -- 10KB per row
31+
FROM generate_series(1, 50);
32+
`)
33+
})
34+
35+
afterAll(async () => {
36+
// Clean up the test table
37+
await pgMeta.query('DROP TABLE large_data;')
38+
await pgMeta.query('DROP TABLE small_data;')
39+
})
40+
41+
test('query exceeding result size limit', async () => {
42+
// Set a small maxResultSize (50MB)
43+
const res = await app.inject({
44+
method: 'POST',
45+
path: '/query',
46+
headers: { pg: 'postgresql://postgres:postgres@localhost:5432/postgres' },
47+
payload: { query: 'SELECT * FROM large_data;' },
48+
})
49+
50+
// Check that we get the proper error response
51+
expect(res.statusCode).toBe(400)
52+
expect(res.json()).toMatchObject({
53+
error: expect.stringContaining('Query result size'),
54+
code: 'RESULT_SIZE_EXCEEDED',
55+
resultSize: expect.any(Number),
56+
maxResultSize: 20 * 1024 * 1024,
57+
})
58+
59+
// Verify that subsequent queries still work and the server isn't killed
60+
const nextRes = await app.inject({
61+
method: 'POST',
62+
path: '/query',
63+
headers: { pg: 'postgresql://postgres:postgres@localhost:5432/postgres' },
64+
payload: { query: 'SELECT * FROM small_data;' },
65+
})
66+
67+
expect(nextRes.statusCode).toBe(200)
68+
// Should have retrieve the 50 rows as expected
69+
expect(nextRes.json()).toHaveLength(50)
70+
})
71+
})

0 commit comments

Comments
 (0)