-
Notifications
You must be signed in to change notification settings - Fork 68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce kudo reader. #2578
Introduce kudo reader. #2578
Conversation
Signed-off-by: liurenjie1024 <[email protected]>
Signed-off-by: liurenjie1024 <[email protected]>
Signed-off-by: liurenjie1024 <[email protected]>
Blocked by rapidsai/cudf#17265 |
Signed-off-by: liurenjie1024 <[email protected]>
src/main/java/com/nvidia/spark/rapids/jni/kudo/ColumnOffsetInfo.java
Outdated
Show resolved
Hide resolved
src/main/java/com/nvidia/spark/rapids/jni/kudo/ColumnViewInfo.java
Outdated
Show resolved
Hide resolved
src/main/java/com/nvidia/spark/rapids/jni/kudo/KudoHostMergeResult.java
Outdated
Show resolved
Hide resolved
src/main/java/com/nvidia/spark/rapids/jni/kudo/KudoSerializer.java
Outdated
Show resolved
Hide resolved
|
||
// Update destination byte with the bits from source byte | ||
destByte = (byte) ((destByte | (srcByte << curDestBitIdx)) & 0xFF); | ||
dest.setByte(curDestByteIdx, destByte); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as #2532 (comment). I'm OK if this is done as a followup issue for performance, but I would expect it to be approx 4x faster walking this word-by-word with an single-instruction for bitcount rather than a byte-by-byte loop and a table lookup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I perfer to do this optimization in a follow up issue: #2579, so that we have full tests covered and some benmarks for measuring the benefits.
int curDestByteIdx = startBit / 8; | ||
int curDestBitIdx = startBit % 8; | ||
|
||
while (curIdx < totalRowCount) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as #2532 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See discussion above.
src/main/java/com/nvidia/spark/rapids/jni/kudo/KudoTableMerger.java
Outdated
Show resolved
Hide resolved
src/main/java/com/nvidia/spark/rapids/jni/kudo/KudoTableMerger.java
Outdated
Show resolved
Hide resolved
src/main/java/com/nvidia/spark/rapids/jni/kudo/KudoTableMerger.java
Outdated
Show resolved
Hide resolved
Signed-off-by: liurenjie1024 <[email protected]>
Signed-off-by: liurenjie1024 <[email protected]>
I will review this today as well |
src/main/java/com/nvidia/spark/rapids/jni/kudo/KudoHostMergeResult.java
Outdated
Show resolved
Hide resolved
src/main/java/com/nvidia/spark/rapids/jni/kudo/MergedInfoCalc.java
Outdated
Show resolved
Hide resolved
src/main/java/com/nvidia/spark/rapids/jni/kudo/ColumnViewInfo.java
Outdated
Show resolved
Hide resolved
src/main/java/com/nvidia/spark/rapids/jni/kudo/TableBuilder.java
Outdated
Show resolved
Hide resolved
private final long[] currentOffsetOffsets; | ||
private final long[] currentDataOffset; | ||
private final Deque<SliceInfo>[] sliceInfoStack; | ||
private final Deque<Long> totalRowCountStack; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cannot support a row count larger than an int. I'm fine while we're accumulating to use a long to capture overflow conditions (e.g.: within the constructor when it accumulates and checks for overflow). I see no reason to track computed total rows that are larger than an int because the overflow check should have already happened in the constructor and in updateOffsets which is missing an overflow check.
Signed-off-by: liurenjie1024 <[email protected]>
Signed-off-by: liurenjie1024 <[email protected]>
build |
The blossom ci failure is know issue, due to failure sync of cudf. |
build |
1 similar comment
build |
re-kicked, previous trigger failed due to github server-side reset the connection
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor nit but otherwise lgtm. Would be good to hear from @abellina before merging since he mentioned he intended to review.
|
||
startRow += sliceInfo.getRowCount(); | ||
} | ||
return toIntExact(nullCountTotal); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nullCountTotal is already an int, so this is wasteful. There should already be a row count overflow check elsewhere (and we cannot overflow nullcounts without also overflowing row counts), so seems like this should simply return nullCountTotal directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd still like to do more passes, but don't block for me.
|
||
KudoHostMergeResult(Schema schema, HostMemoryBuffer hostBuf, List<ColumnViewInfo> columnInfoList) { | ||
requireNonNull(schema, "schema is null"); | ||
requireNonNull(columnInfoList, "columnOffsets is null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
requireNonNull(columnInfoList, "columnOffsets is null"); | |
requireNonNull(columnInfoList, "columnInfoList is null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
if (hostBuf != null) { | ||
hostBuf.close(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This close() checks for null, but it doesn't null out hostBuff
.
if (hostBuf != null) { | |
hostBuf.close(); | |
} | |
if (hostBuf != null) { | |
hostBuf.close(); | |
hostBuf = null; | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just realized that we have checked non null in constructor.
Cuda.DEFAULT_STREAM.sync(); | ||
return t; | ||
} catch (Exception e) { | ||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to wrap this in a RuntimeException
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So that we could use a concise syntax here:
spark-rapids-jni/src/main/java/com/nvidia/spark/rapids/jni/kudo/KudoSerializer.java
Line 286 in 73b9ac9
builder::convertToTableTime); |
|
||
return Pair.of(table, builder.build()); | ||
} catch (Exception e) { | ||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here. I suppose there's an exception we are throwing here, and the prior case, that we would need to declare in the interface (and it makes the api a little odd?)
It might be nice to add a comment here and the other case about why.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a big fan of java's checked exception since it makes modern syntax such as lambda more difficult to use. But I agree that we should add it here since it's entrance of a public api, so it makes since to remind user the exceptions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
this.sliceInfoStack[i] = new ArrayDeque<>(16); | ||
this.sliceInfoStack[i].add(new SliceInfo(header.getOffset(), header.getNumRows())); | ||
} | ||
long totalRowCount = tables.stream().mapToLong(t -> t.getHeader().getNumRows()).sum(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, you could be keeping track of totalRowCount
in the loop above, rather than re-stream here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
|
||
T t = doVisit(primitiveType); | ||
if (primitiveType.getType().hasOffsets()) { | ||
updateOffsets(true, true, false, -1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we add comments for each argument here?
updateOffsets(/*updateOffset*/ true, /*updateData*/ true, /*updateSliceInfo*/ false, /*sizeInBytes* -1)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
build |
This pr is part of #2532 , which introduces reader part of kudo format.