Skip to content
This repository has been archived by the owner on Dec 30, 2019. It is now read-only.

Commit

Permalink
Fix two timeout races (#109)
Browse files Browse the repository at this point in the history
  • Loading branch information
johanneswuerbach authored and brianc committed Dec 14, 2018
1 parent 140f9a1 commit 35a285c
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 24 deletions.
67 changes: 43 additions & 24 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,6 @@ const EventEmitter = require('events').EventEmitter

const NOOP = function () { }

const remove = (list, value) => {
const i = list.indexOf(value)

if (i !== -1) {
list.splice(i, 1)
}
}

const removeWhere = (list, predicate) => {
const i = list.findIndex(predicate)

Expand All @@ -26,6 +18,12 @@ class IdleItem {
}
}

class PendingItem {
constructor (callback) {
this.callback = callback
}
}

function throwOnRelease () {
throw new Error('Release called on client which has already been released to the pool.')
}
Expand Down Expand Up @@ -85,6 +83,7 @@ class Pool extends EventEmitter {
this._pendingQueue = []
this._endCallback = undefined
this.ending = false
this.ended = false
}

_isFull () {
Expand All @@ -93,6 +92,10 @@ class Pool extends EventEmitter {

_pulseQueue () {
this.log('pulse queue')
if (this.ended) {
this.log('pulse queue ended')
return
}
if (this.ending) {
this.log('pulse queue on ending')
if (this._idle.length) {
Expand All @@ -101,6 +104,7 @@ class Pool extends EventEmitter {
})
}
if (!this._clients.length) {
this.ended = true
this._endCallback()
}
return
Expand All @@ -121,10 +125,10 @@ class Pool extends EventEmitter {
const client = idleItem.client
client.release = release.bind(this, client)
this.emit('acquire', client)
return waiter(undefined, client, client.release)
return waiter.callback(undefined, client, client.release)
}
if (!this._isFull()) {
return this.connect(waiter)
return this.newClient(waiter)
}
throw new Error('unexpected condition')
}
Expand All @@ -150,18 +154,18 @@ class Pool extends EventEmitter {
return cb ? cb(err) : this.Promise.reject(err)
}

const response = promisify(this.Promise, cb)
const result = response.result

// if we don't have to connect a new client, don't do so
if (this._clients.length >= this.options.max || this._idle.length) {
const response = promisify(this.Promise, cb)
const result = response.result

// if we have idle clients schedule a pulse immediately
if (this._idle.length) {
process.nextTick(() => this._pulseQueue())
}

if (!this.options.connectionTimeoutMillis) {
this._pendingQueue.push(response.callback)
this._pendingQueue.push(new PendingItem(response.callback))
return result
}

Expand All @@ -170,18 +174,27 @@ class Pool extends EventEmitter {
response.callback(err, res, done)
}

const pendingItem = new PendingItem(queueCallback)

// set connection timeout on checking out an existing client
const tid = setTimeout(() => {
// remove the callback from pending waiters because
// we're going to call it with a timeout error
remove(this._pendingQueue, queueCallback)
removeWhere(this._pendingQueue, (i) => i.callback === queueCallback)
pendingItem.timedOut = true
response.callback(new Error('timeout exceeded when trying to connect'))
}, this.options.connectionTimeoutMillis)

this._pendingQueue.push(queueCallback)
this._pendingQueue.push(pendingItem)
return result
}

this.newClient(new PendingItem(response.callback))

return result
}

newClient (pendingItem) {
const client = new this.Client(this.options)
this._clients.push(client)
const idleListener = (err) => {
Expand Down Expand Up @@ -210,9 +223,6 @@ class Pool extends EventEmitter {
}, this.options.connectionTimeoutMillis)
}

const response = promisify(this.Promise, cb)
cb = response.callback

this.log('connecting new client')
client.connect((err) => {
if (tid) {
Expand All @@ -230,20 +240,29 @@ class Pool extends EventEmitter {
// this client won’t be released, so move on immediately
this._pulseQueue()

cb(err, undefined, NOOP)
if (!pendingItem.timedOut) {
pendingItem.callback(err, undefined, NOOP)
}
} else {
this.log('new client connected')
client.release = release.bind(this, client)
this.emit('connect', client)
this.emit('acquire', client)
if (this.options.verify) {
this.options.verify(client, cb)
if (!pendingItem.timedOut) {
if (this.options.verify) {
this.options.verify(client, pendingItem.callback)
} else {
pendingItem.callback(undefined, client, client.release)
}
} else {
cb(undefined, client, client.release)
if (this.options.verify) {
this.options.verify(client, client.release)
} else {
client.release()
}
}
}
})
return response.result
}

query (text, values, cb) {
Expand Down
92 changes: 92 additions & 0 deletions test/connection-timeout.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ const after = require('mocha').after
const Pool = require('../')

describe('connection timeout', () => {
const connectionFailure = new Error('Temporary connection failure')

before((done) => {
this.server = net.createServer((socket) => {
})
Expand Down Expand Up @@ -126,4 +128,94 @@ describe('connection timeout', () => {
})
})
})

it('continues processing after a connection failure', (done) => {
const Client = require('pg').Client
const orgConnect = Client.prototype.connect
let called = false

Client.prototype.connect = function (cb) {
// Simulate a failure on first call
if (!called) {
called = true

return setTimeout(() => {
cb(connectionFailure)
}, 100)
}
// And pass-through the second call
orgConnect.call(this, cb)
}

const pool = new Pool({
Client: Client,
connectionTimeoutMillis: 1000,
max: 1
})

pool.connect((err, client, release) => {
expect(err).to.be(connectionFailure)

pool.query('select $1::text as name', ['brianc'], (err, res) => {
expect(err).to.be(undefined)
expect(res.rows).to.have.length(1)
pool.end(done)
})
})
})

it('releases newly connected clients if the queued already timed out', (done) => {
const Client = require('pg').Client

const orgConnect = Client.prototype.connect

let connection = 0

Client.prototype.connect = function (cb) {
// Simulate a failure on first call
if (connection === 0) {
connection++

return setTimeout(() => {
cb(connectionFailure)
}, 300)
}

// And second connect taking > connection timeout
if (connection === 1) {
connection++

return setTimeout(() => {
orgConnect.call(this, cb)
}, 1000)
}

orgConnect.call(this, cb)
}

const pool = new Pool({
Client: Client,
connectionTimeoutMillis: 1000,
max: 1
})

// Direct connect
pool.connect((err, client, release) => {
expect(err).to.be(connectionFailure)
})

// Queued
let called = 0
pool.connect((err, client, release) => {
// Verify the callback is only called once
expect(called++).to.be(0)
expect(err).to.be.an(Error)

pool.query('select $1::text as name', ['brianc'], (err, res) => {
expect(err).to.be(undefined)
expect(res.rows).to.have.length(1)
pool.end(done)
})
})
})
})

0 comments on commit 35a285c

Please sign in to comment.