Skip to content
This repository has been archived by the owner on Dec 27, 2019. It is now read-only.

Have the option to rely on PG's 'row' event rather than polling with cursor #42

Open
flesler opened this issue Mar 21, 2018 · 1 comment

Comments

@flesler
Copy link

flesler commented Mar 21, 2018

I would like to stream large amounts of rows from a database without being powered behind the scenes by having multiple queries with a cursor. Currently using this is super slow in comparison to just loading 100K rows and processing them when ready by ~5 times.

The pg library already emits a 'row' event for each row. In fact, if there's no need for the whole set of rows at the end, it won't even buffer them in memory (which is a big plus).

Initially reported at: knex/knex#2535

Thanks

@felipeochoa
Copy link

felipeochoa commented Sep 15, 2019

@flesler There was some discussion of this on the other thread, but the core issue is that PG's connection is an event emitter rather than a readable.

At its heart though lies a socket, which does offer a Readable interface with backpressure. I don't see why pg couldn't be refactored into a stream instead, but you would need to change things all the way up. (Not that I'm offering to this work 😄...)

If I'm reading the code right, once we get to the data fetching part of the query starting here, it's basically a processing chain with a switch:

The main difficulty would be in the demuxing of events that the connection does. I imagine you could maintain the event-emitter interface with all the events and additionally provide a Readable that emits rows. Then in the connection you route all non-row messages straight to the event emitter (no backpressure) and you keep all the row events in the stream. You would then conceptually do something like:

socket
  .pipe(packetReaderTransform())
  .pipe(connection.rowTransorm())
  .pipe(cursor.recordRow()) // Sidebar: not sure I understand why _rows is needed
  .pipe(myTransformer())

So then backpressure would flow from myTransformer all the way to connection. connection could eagerly process non-row messages until it hits a row and stalls, at which point backpressure would flow up and into the socket.

The nice thing about this setup is that you could maintain your OOB error handling through the event emitter the way its currently done.

So, yeah, somebody do this...

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants