-
Notifications
You must be signed in to change notification settings - Fork 990
Description
Is your feature request related to a problem? Please describe.
The high level goal is to be able to reduce the amount of data that we read from disk on parquet files with a ColumnIndex/PageIndex.
https://github.com/apache/parquet-format/blob/master/PageIndex.md
Describe the solution you'd like
In Spark we currently do some hacked up things when reading parquet files. We use parquet-mr to read the metadata about the file(s). We then let it do a predicate push down to find the row groups that fit the requested predicate. Finally we read the pages for the desired columns in those row groups and put it all back together as a new in-memory parquet file that we send down to CUDF. It is ugly, but it let us do add a lot of features in Spark before CUDF could support them. It still lets us read the data using the existing Hadoop File System interface, which because it is a "standard" that our customers can and do replace. We might be able to move to the Arrow FileSystem API, but I'll talk about that in the alternatives.
Ideally I would like an API where we can send compressed pages and metadata to CUDF for decoding. The metadata would include things like the file and row group that the pages came from and what range of column indicies within those pages we would like to be read.
The hard part with filtering using a ColumnIndex is that the pages within a row group are not split on the same row boundaries, like a row group is. An example might help here. Lets say we have two columns A and B in a row group. The predicate to push down is A > 100, which corresponds to page 5 in the row group. That page is for rows 500-599. Column B requires us to load 2 pages to cover that same range. In this case lets say pages 10 and 11 which cover 450 - 549 and 550 to 700 respectively. So we would have to hand CUDF the pages 5, 10, and 11 along with the metadata about the row group and file so CUDF can know how to decode the data, and information to say only decode the rows 500 to 599 and throw away anything else that is outside of that. In a real situation it is probably going to be a lot more complicated.
Ideally this would let us pass down row groups from multiple different files too. I am not 100% sure how the row group filtering works on a multi-file source_info.
Describe alternatives you've considered
The other alternative is for us to start using the Arrow FileSystem API and also have cudf implement row number filtering (need to check, but I think the row numbers are relative to the start of the row group and not total within the file) similar to the set_row_groups API that currently exists.
This is kind of hard for us to do.
- One of the main performance features that we have is overlapping I/O with computation. Spark likes to use lots of threads, more than we want to allow on the GPU at any point in time for memory reasons. So we let some onto the GPU, but we let the others read data to CPU memory. This lets us overlap the slow reading from a remote file system with computation on the GPU. We would need some kind of callback, or multiple APIs so that we could have CUDF read all of the needed data into host memory, and then we can wait until it is time to run at which point we can finish processing the data.
- JNI is very slow for moving data. We use a number of tricks/alternative APIs to get around this. This is especially true when calling back from native code into Java. So having an API that goes from java to C back to java so it can read data over a socket (through C again, but a slightly more optimized interface than JNI) is far from ideal especially if it is going to involve small reads.
Additional context
This is related to #9268, but the read side compared to the write side of it.
Metadata
Metadata
Assignees
Labels
Type
Projects
Status