Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipelining #3357

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 33 additions & 10 deletions packages/pg/lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@
encoding: this.connectionParameters.client_encoding || 'utf8',
})
this.queryQueue = []

// Client.sentQueryQueue is the queue of queries that have been sent on the wire
this.sentQueryQueue = []
// Client.pipelining can be set to true to enable experimental pipelining mode
this.pipelining = false

this.binary = c.binary || defaults.binary
this.processID = null
this.secretKey = null
Expand Down Expand Up @@ -286,6 +292,7 @@
const { activeQuery } = this
this.activeQuery = null
this.readyForQuery = true
this.handshakeDone = true
if (activeQuery) {
activeQuery.handleReadyForQuery(this.connection)
}
Expand Down Expand Up @@ -472,20 +479,36 @@
}

_pulseQueryQueue() {

if (!this.handshakeDone) {
return
}

while ((this.pipelining && !this.blocked) || (this.activeQuery === null && this.sentQueryQueue.length === 0)) {
var query = this.queryQueue.shift()
if (!query) break

const queryError = query.submit(this.connection)
if (queryError) {
process.nextTick(() => {
this.activeQuery.handleError(queryError, this.connection)
this.readyForQuery = true
this._pulseQueryQueue()

Check failure on line 496 in packages/pg/lib/client.js

View workflow job for this annotation

GitHub Actions / lint

Delete `⏎`
})
}
this.blocked = query.blocking
this.sentQueryQueue.push(query)
if (query.name) {
console.log(`we store that ${query.name} has been submitted`)
this.connection.submittedNamedStatements[query.name] = query.text
}
}

if (this.readyForQuery === true) {
this.activeQuery = this.queryQueue.shift()
this.activeQuery = this.sentQueryQueue.shift()
if (this.activeQuery) {
this.readyForQuery = false
this.hasExecuted = true

const queryError = this.activeQuery.submit(this.connection)
if (queryError) {
process.nextTick(() => {
this.activeQuery.handleError(queryError, this.connection)
this.readyForQuery = true
this._pulseQueryQueue()
})
}
} else if (this.hasExecuted) {
this.activeQuery = null
this.emit('drain')
Expand Down
2 changes: 2 additions & 0 deletions packages/pg/lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ class Connection extends EventEmitter {
this._keepAliveInitialDelayMillis = config.keepAliveInitialDelayMillis
this.lastBuffer = false
this.parsedStatements = {}
// to track preparation of statements submitted to server
this.submittedNamedStatements = {}
this.ssl = config.ssl || false
this._ending = false
this._emitMessage = false
Expand Down
6 changes: 6 additions & 0 deletions packages/pg/lib/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ class Query extends EventEmitter {
}

hasBeenParsed(connection) {
if (connection.submittedNamedStatements[this.name]) {
console.log(`-----------------------------------`)
console.log(`query.hasBeenParsed : This statement has already been prepared`)
console.log(`-----------------------------------`)
return true
}
return this.name && connection.parsedStatements[this.name]
}

Expand Down
32 changes: 32 additions & 0 deletions packages/pg/test/unit/client/simple-query-tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,38 @@
})
})

test("multiple in the queue, pipelining mode", function () {
var client = helper.client()
client.pipelining = true
var connection = client.connection

Check failure on line 66 in packages/pg/test/unit/client/simple-query-tests.js

View workflow job for this annotation

GitHub Actions / lint

Replace `"multiple·in·the·queue,·pipelining·mode"` with `'multiple·in·the·queue,·pipelining·mode'`
var queries = connection.queries
client.query('one')
client.query('two')
client.query('three')
assert.empty(queries)

test("after one ready for query", function () {
connection.emit('readyForQuery')
assert.lengthIs(queries, 3)
assert.equal(queries[0], "one")

Check failure on line 76 in packages/pg/test/unit/client/simple-query-tests.js

View workflow job for this annotation

GitHub Actions / lint

Replace `"after·one·ready·for·query"` with `'after·one·ready·for·query'`
})

test('after two ready for query', function () {

Check failure on line 79 in packages/pg/test/unit/client/simple-query-tests.js

View workflow job for this annotation

GitHub Actions / lint

Replace `"one"` with `'one'`
connection.emit('readyForQuery')
assert.lengthIs(queries, 3)
})

test("after a bunch more", function () {
connection.emit('readyForQuery')
connection.emit('readyForQuery')
connection.emit('readyForQuery')

Check failure on line 87 in packages/pg/test/unit/client/simple-query-tests.js

View workflow job for this annotation

GitHub Actions / lint

Replace `"after·a·bunch·more"` with `'after·a·bunch·more'`
assert.lengthIs(queries, 3)
assert.equal(queries[0], "one")
assert.equal(queries[1], 'two')
assert.equal(queries[2], 'three')
})

Check failure on line 92 in packages/pg/test/unit/client/simple-query-tests.js

View workflow job for this annotation

GitHub Actions / lint

Replace `"one"` with `'one'`
})

test('query event binding and flow', function () {
var client = helper.client()
var con = client.connection
Expand Down
Loading