Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions dwio/nimble/velox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ target_link_libraries(nimble_velox_schema_builder nimble_velox_schema_reader

add_library(nimble_velox_stream_data StreamData.cpp)
target_link_libraries(nimble_velox_stream_data nimble_velox_schema_builder
nimble_common)
nimble_common nimble_velox_buffer_growth_policy)

add_library(nimble_velox_field_reader FieldReader.cpp)
target_link_libraries(
Expand All @@ -37,11 +37,13 @@ target_link_libraries(
add_library(nimble_velox_layout_planner LayoutPlanner.cpp)
target_link_libraries(nimble_velox_layout_planner nimble_velox_schema_reader)

add_library(nimble_velox_field_writer BufferGrowthPolicy.cpp
DeduplicationUtils.cpp FieldWriter.cpp)
add_library(nimble_velox_buffer_growth_policy BufferGrowthPolicy.cpp)
target_link_libraries(nimble_velox_buffer_growth_policy nimble_common)

add_library(nimble_velox_field_writer DeduplicationUtils.cpp FieldWriter.cpp)
target_link_libraries(
nimble_velox_field_writer nimble_velox_schema nimble_velox_stream_data
nimble_velox_schema_builder Folly::folly)
nimble_velox_schema_builder nimble_velox_buffer_growth_policy Folly::folly)

build_flatbuffers(
"${CMAKE_CURRENT_SOURCE_DIR}/Schema.fbs"
Expand Down Expand Up @@ -108,6 +110,7 @@ add_library(
ChunkedStreamWriter.cpp VeloxWriterDefaultMetadataOSS.cpp)
target_link_libraries(
nimble_velox_writer
nimble_velox_buffer_growth_policy
nimble_encodings
nimble_common
nimble_column_stats_utils
Expand Down
51 changes: 36 additions & 15 deletions dwio/nimble/velox/FieldWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
#include "dwio/nimble/velox/FieldWriter.h"
#include "dwio/nimble/common/Exceptions.h"
#include "dwio/nimble/common/Types.h"
#include "dwio/nimble/velox/DeduplicationUtils.h"
#include "dwio/nimble/velox/SchemaBuilder.h"
#include "dwio/nimble/velox/SchemaTypes.h"
Expand Down Expand Up @@ -296,7 +295,8 @@ class SimpleFieldWriter : public FieldWriter {
uint64_t nullCount = 0;

if (auto flat = vector->asFlatVector<SourceType>()) {
valuesStream_.ensureNullsCapacity(flat->mayHaveNulls(), size);
valuesStream_.ensureNullsCapacity(
flat->mayHaveNulls(), size, context_.inputBufferGrowthPolicy.get());
bool rangeCopied = false;
if (!flat->mayHaveNulls()) {
if constexpr (
Expand Down Expand Up @@ -342,7 +342,8 @@ class SimpleFieldWriter : public FieldWriter {
} else {
auto decodingContext = context_.getDecodingContext();
auto& decoded = decodingContext.decode(vector, ranges);
valuesStream_.ensureNullsCapacity(decoded.mayHaveNulls(), size);
valuesStream_.ensureNullsCapacity(
decoded.mayHaveNulls(), size, context_.inputBufferGrowthPolicy.get());
auto nonNullCount = iterateNonNullValues(
ranges,
valuesStream_.mutableNonNulls(),
Expand Down Expand Up @@ -408,7 +409,8 @@ class RowFieldWriter : public FieldWriter {
"Schema mismatch: expected {} fields, but got {} fields",
fields_.size(),
row->childrenSize()));
nullsStream_.ensureNullsCapacity(vector->mayHaveNulls(), size);
nullsStream_.ensureNullsCapacity(
vector->mayHaveNulls(), size, context_.inputBufferGrowthPolicy.get());
if (row->mayHaveNulls()) {
childRangesPtr = &childRanges;
auto nonNullCount = iterateNonNullIndices<true>(
Expand All @@ -432,7 +434,8 @@ class RowFieldWriter : public FieldWriter {
fields_.size(),
row->childrenSize()));
childRangesPtr = &childRanges;
nullsStream_.ensureNullsCapacity(decoded.mayHaveNulls(), size);
nullsStream_.ensureNullsCapacity(
decoded.mayHaveNulls(), size, context_.inputBufferGrowthPolicy.get());
auto nonNullCount = iterateNonNullIndices<true>(
ranges,
nullsStream_.mutableNonNulls(),
Expand Down Expand Up @@ -521,7 +524,8 @@ class MultiValueFieldWriter : public FieldWriter {
offsets = casted->rawOffsets();
lengths = casted->rawSizes();

lengthsStream_.ensureNullsCapacity(casted->mayHaveNulls(), size);
lengthsStream_.ensureNullsCapacity(
casted->mayHaveNulls(), size, context_.inputBufferGrowthPolicy.get());
auto nonNullCount = iterateNonNullIndices<true>(
ranges, lengthsStream_.mutableNonNulls(), Flat{vector}, proc);
nullCount = size - nonNullCount;
Expand All @@ -533,7 +537,8 @@ class MultiValueFieldWriter : public FieldWriter {
offsets = casted->rawOffsets();
lengths = casted->rawSizes();

lengthsStream_.ensureNullsCapacity(decoded.mayHaveNulls(), size);
lengthsStream_.ensureNullsCapacity(
decoded.mayHaveNulls(), size, context_.inputBufferGrowthPolicy.get());
auto nonNullCount = iterateNonNullIndices<true>(
ranges, lengthsStream_.mutableNonNulls(), Decoded{decoded}, proc);
nullCount = size - nonNullCount;
Expand Down Expand Up @@ -747,7 +752,10 @@ class SlidingWindowMapFieldWriter : public FieldWriter {
if (mapVector) {
rawOffsets = mapVector->rawOffsets();
rawLengths = mapVector->rawSizes();
offsetsStream_.ensureNullsCapacity(mapVector->mayHaveNulls(), size);
offsetsStream_.ensureNullsCapacity(
mapVector->mayHaveNulls(),
size,
context_.inputBufferGrowthPolicy.get());
Flat iterableVector{vector};
auto nonNullCount = iterateNonNullIndices<true>(
ranges,
Expand All @@ -762,7 +770,8 @@ class SlidingWindowMapFieldWriter : public FieldWriter {
NIMBLE_ASSERT(mapVector, "Unexpected vector type");
rawOffsets = mapVector->rawOffsets();
rawLengths = mapVector->rawSizes();
offsetsStream_.ensureNullsCapacity(decoded.mayHaveNulls(), size);
offsetsStream_.ensureNullsCapacity(
decoded.mayHaveNulls(), size, context_.inputBufferGrowthPolicy.get());
Decoded iterableVector{decoded};
auto nonNullCount = iterateNonNullIndices<true>(
ranges,
Expand Down Expand Up @@ -974,7 +983,8 @@ class FlatMapFieldWriter : public FieldWriter {
flatMap,
"Unexpected vector type. Vector must be a decoded ROW vector.");
const auto size = ranges.size();
nullsStream_.ensureNullsCapacity(flatMap->mayHaveNulls(), size);
nullsStream_.ensureNullsCapacity(
flatMap->mayHaveNulls(), size, context_.inputBufferGrowthPolicy.get());
const auto& keys = flatMap->type()->asRow().names();
const auto& values = flatMap->children();

Expand Down Expand Up @@ -1080,7 +1090,8 @@ class FlatMapFieldWriter : public FieldWriter {
offsets = map->rawOffsets();
lengths = map->rawSizes();

nullsStream_.ensureNullsCapacity(map->mayHaveNulls(), size);
nullsStream_.ensureNullsCapacity(
map->mayHaveNulls(), size, context_.inputBufferGrowthPolicy.get());
processVector(map, Flat{vector});
} else {
// Map is encoded. Decode.
Expand All @@ -1091,7 +1102,10 @@ class FlatMapFieldWriter : public FieldWriter {
offsets = map->rawOffsets();
lengths = map->rawSizes();

nullsStream_.ensureNullsCapacity(decodedMap.mayHaveNulls(), size);
nullsStream_.ensureNullsCapacity(
decodedMap.mayHaveNulls(),
size,
context_.inputBufferGrowthPolicy.get());
processVector(map, Decoded{decodedMap});
}

Expand Down Expand Up @@ -1357,7 +1371,10 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter {
const OrderedRanges& ranges,
OrderedRanges& filteredRanges) {
auto size = ranges.size();
offsetsStream_.ensureNullsCapacity(dictionaryVector.mayHaveNulls(), size);
offsetsStream_.ensureNullsCapacity(
dictionaryVector.mayHaveNulls(),
size,
context_.inputBufferGrowthPolicy.get());

auto& offsetsData = offsetsStream_.mutableData();
auto& lengthsData = lengthsStream_.mutableData();
Expand Down Expand Up @@ -1575,7 +1592,10 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter {
rawOffsets = arrayVector->rawOffsets();
rawLengths = arrayVector->rawSizes();

offsetsStream_.ensureNullsCapacity(arrayVector->mayHaveNulls(), size);
offsetsStream_.ensureNullsCapacity(
arrayVector->mayHaveNulls(),
size,
context_.inputBufferGrowthPolicy.get());
Flat iterableVector{vector};
iterateNonNullIndices<true>(
ranges, offsetsStream_.mutableNonNulls(), iterableVector, proc);
Expand All @@ -1589,7 +1609,8 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter {
rawOffsets = arrayVector->rawOffsets();
rawLengths = arrayVector->rawSizes();

offsetsStream_.ensureNullsCapacity(decoded.mayHaveNulls(), size);
offsetsStream_.ensureNullsCapacity(
decoded.mayHaveNulls(), size, context_.inputBufferGrowthPolicy.get());
Decoded iterableVector{decoded};
iterateNonNullIndices<true>(
ranges, offsetsStream_.mutableNonNulls(), iterableVector, proc);
Expand Down
9 changes: 7 additions & 2 deletions dwio/nimble/velox/StreamData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@

namespace facebook::nimble {

void NullsStreamData::ensureNullsCapacity(bool mayHaveNulls, uint32_t size) {
void NullsStreamData::ensureNullsCapacity(
bool mayHaveNulls,
uint32_t size,
InputBufferGrowthPolicy* growthPolicy) {
if (mayHaveNulls || hasNulls_) {
auto newSize = bufferedCount_ + size;
nonNulls_.reserve(newSize);
auto newCapacity = growthPolicy->getExtendedCapacity(
bufferedCount_ + size, nonNulls_.capacity());
nonNulls_.reserve(newCapacity);
if (!hasNulls_) {
hasNulls_ = true;
std::fill(nonNulls_.data(), nonNulls_.data() + bufferedCount_, true);
Expand Down
6 changes: 5 additions & 1 deletion dwio/nimble/velox/StreamData.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <string_view>

#include "dwio/nimble/common/Vector.h"
#include "dwio/nimble/velox/BufferGrowthPolicy.h"
#include "dwio/nimble/velox/SchemaBuilder.h"
#include "velox/common/memory/Memory.h"

Expand Down Expand Up @@ -160,7 +161,10 @@ class NullsStreamData : public StreamData {
}
}

void ensureNullsCapacity(bool mayHaveNulls, uint32_t size);
void ensureNullsCapacity(
bool mayHaveNulls,
uint32_t size,
InputBufferGrowthPolicy* growthPolicy);

protected:
Vector<bool> nonNulls_;
Expand Down
Loading