diff --git a/packages/pg-cursor/index.js b/packages/pg-cursor/index.js index a6017d96c..b8777dddc 100644 --- a/packages/pg-cursor/index.js +++ b/packages/pg-cursor/index.js @@ -112,6 +112,12 @@ class Cursor extends EventEmitter { handleDataRow(msg) { const row = this._result.parseRow(msg.fields) this.emit('row', row, this._result) + + if (this._rows == null) { + const error = new Error('Received unexpected dataRow message from backend.') + this.handleError(error) + return + } this._rows.push(row) } diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index 527f62e4f..0c80b9c25 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -357,11 +357,21 @@ class Client extends EventEmitter { } _handleRowDescription(msg) { + if (this.activeQuery == null) { + const error = new Error('Received unexpected rowDescription message from backend.') + this._handleErrorEvent(error) + return + } // delegate rowDescription to active query this.activeQuery.handleRowDescription(msg) } _handleDataRow(msg) { + if (this.activeQuery == null) { + const error = new Error('Received unexpected dataRow message from backend.') + this._handleErrorEvent(error) + return + } // delegate dataRow to active query this.activeQuery.handleDataRow(msg) } diff --git a/packages/pg/test/integration/gh-issues/3174-tests.js b/packages/pg/test/integration/gh-issues/3174-tests.js index 49ac5905a..d2270b147 100644 --- a/packages/pg/test/integration/gh-issues/3174-tests.js +++ b/packages/pg/test/integration/gh-issues/3174-tests.js @@ -3,6 +3,7 @@ const buffers = require('../../test-buffers') const helper = require('../test-helper') const assert = require('assert') const cli = require('../../cli') +const Cursor = require('../../../../pg-cursor') const suite = new helper.Suite() @@ -103,9 +104,8 @@ const testErrorBuffer = (bufferName, errorBuffer) => { if (!cli.native) { assert(errorHit) // further queries on the client should fail since its in an invalid state - await assert.rejects(() => client.query('SELECTR NOW()'), 'Further queries on the client should reject') + await assert.rejects(() => client.query('SELECT NOW()'), 'Further queries on the client should reject') } - await closeServer() }) @@ -128,7 +128,7 @@ const testErrorBuffer = (bufferName, errorBuffer) => { if (!cli.native) { assert(errorHit) // further queries on the client should fail since its in an invalid state - await assert.rejects(() => client.query('SELECTR NOW()'), 'Further queries on the client should reject') + await assert.rejects(() => client.query('SELECT NOW()'), 'Further queries on the client should reject') } await client.end() @@ -159,9 +159,92 @@ const testErrorBuffer = (bufferName, errorBuffer) => { await pool.end() await closeServer() }) + + suite.testAsync(`Out of order ${bufferName} on simple query using cursors is catchable`, async () => { + const closeServer = await new Promise((resolve, reject) => { + return startMockServer(options.port, errorBuffer, (closeServer) => resolve(closeServer)) + }) + const client = new helper.Client(options) + await client.connect() + + let errorHit = false + client.on('error', () => { + errorHit = true + }) + + const cursor = await client.query(new Cursor('SELECT NOW()')) + cursor.read(100, () => {}) + await cursor.close() + await delay(50) + + // the native client only emits a notice message and keeps on its merry way + if (!cli.native) { + assert(errorHit) + // further queries on the client should fail since its in an invalid state + await assert.rejects(() => client.query('SELECT NOW()'), 'Further queries on the client should reject') + } + await closeServer() + }) + + suite.testAsync(`Out of order ${bufferName} on extended query using cursors is catchable`, async () => { + const closeServer = await new Promise((resolve, reject) => { + return startMockServer(options.port, errorBuffer, (closeServer) => resolve(closeServer)) + }) + const client = new helper.Client(options) + await client.connect() + + let errorHit = false + client.on('error', () => { + errorHit = true + }) + + const cursor = await client.query(new Cursor('SELECT $1', ['foo'])) + cursor.read(100, () => {}) + await cursor.close() + await delay(50) + + // the native client only emits a notice message and keeps on its merry way + if (!cli.native) { + assert(errorHit) + // further queries on the client should fail since its in an invalid state + await assert.rejects(() => client.query('SELECT NOW()'), 'Further queries on the client should reject') + } + + await client.end() + + await closeServer() + }) + + suite.testAsync(`Out of order ${bufferName} on pool using cursors is catchable`, async () => { + const closeServer = await new Promise((resolve, reject) => { + return startMockServer(options.port, errorBuffer, (closeServer) => resolve(closeServer)) + }) + const pool = new helper.pg.Pool(options) + + let errorHit = false + pool.on('error', () => { + errorHit = true + }) + + const cursor = await pool.query(new Cursor('SELECT $1', ['foo'])) + cursor.read(100, () => {}) + await cursor.close() + await delay(100) + + if (!cli.native) { + assert(errorHit) + assert.strictEqual(pool.idleCount, 0, 'Pool should have no idle clients') + assert.strictEqual(pool.totalCount, 0, 'Pool should have no connected clients') + } + + await pool.end() + await closeServer() + }) } if (!helper.args.native) { testErrorBuffer('parseComplete', buffers.parseComplete()) testErrorBuffer('commandComplete', buffers.commandComplete('f')) + testErrorBuffer('rowDescription', buffers.rowDescription()) + testErrorBuffer('dataRow', buffers.dataRow()) }