From ba3abfa39d9ae62b009aaa7004cfda084ebed954 Mon Sep 17 00:00:00 2001 From: Serge Druzkin Date: Fri, 12 Sep 2025 14:30:16 -0700 Subject: [PATCH] Fix writer regression caused by memory reallocation in ensureNullsCapacity (#247) Summary: Fix up to 32x writer regression caused by memory reallocation of the exact size without growing in ensureNullsCapacity by providing the grows policy. This regression was identified by DISCO runs on biggest tables and then narrowed by CPU profiling. Differential Revision: D82172070 --- dwio/nimble/velox/CMakeLists.txt | 11 ++++--- dwio/nimble/velox/FieldWriter.cpp | 51 ++++++++++++++++++++++--------- dwio/nimble/velox/StreamData.cpp | 9 ++++-- dwio/nimble/velox/StreamData.h | 6 +++- 4 files changed, 55 insertions(+), 22 deletions(-) diff --git a/dwio/nimble/velox/CMakeLists.txt b/dwio/nimble/velox/CMakeLists.txt index c99bbfe9..eae10bd9 100644 --- a/dwio/nimble/velox/CMakeLists.txt +++ b/dwio/nimble/velox/CMakeLists.txt @@ -27,7 +27,7 @@ target_link_libraries(nimble_velox_schema_builder nimble_velox_schema_reader add_library(nimble_velox_stream_data StreamData.cpp) target_link_libraries(nimble_velox_stream_data nimble_velox_schema_builder - nimble_common) + nimble_common nimble_velox_buffer_growth_policy) add_library(nimble_velox_field_reader FieldReader.cpp) target_link_libraries( @@ -37,11 +37,13 @@ target_link_libraries( add_library(nimble_velox_layout_planner LayoutPlanner.cpp) target_link_libraries(nimble_velox_layout_planner nimble_velox_schema_reader) -add_library(nimble_velox_field_writer BufferGrowthPolicy.cpp - DeduplicationUtils.cpp FieldWriter.cpp) +add_library(nimble_velox_buffer_growth_policy BufferGrowthPolicy.cpp) +target_link_libraries(nimble_velox_buffer_growth_policy nimble_common) + +add_library(nimble_velox_field_writer DeduplicationUtils.cpp FieldWriter.cpp) target_link_libraries( nimble_velox_field_writer nimble_velox_schema nimble_velox_stream_data - nimble_velox_schema_builder Folly::folly) + nimble_velox_schema_builder nimble_velox_buffer_growth_policy Folly::folly) build_flatbuffers( "${CMAKE_CURRENT_SOURCE_DIR}/Schema.fbs" @@ -108,6 +110,7 @@ add_library( ChunkedStreamWriter.cpp VeloxWriterDefaultMetadataOSS.cpp) target_link_libraries( nimble_velox_writer + nimble_velox_buffer_growth_policy nimble_encodings nimble_common nimble_column_stats_utils diff --git a/dwio/nimble/velox/FieldWriter.cpp b/dwio/nimble/velox/FieldWriter.cpp index 9a9db37b..9377272f 100644 --- a/dwio/nimble/velox/FieldWriter.cpp +++ b/dwio/nimble/velox/FieldWriter.cpp @@ -15,7 +15,6 @@ */ #include "dwio/nimble/velox/FieldWriter.h" #include "dwio/nimble/common/Exceptions.h" -#include "dwio/nimble/common/Types.h" #include "dwio/nimble/velox/DeduplicationUtils.h" #include "dwio/nimble/velox/SchemaBuilder.h" #include "dwio/nimble/velox/SchemaTypes.h" @@ -296,7 +295,8 @@ class SimpleFieldWriter : public FieldWriter { uint64_t nullCount = 0; if (auto flat = vector->asFlatVector()) { - valuesStream_.ensureNullsCapacity(flat->mayHaveNulls(), size); + valuesStream_.ensureNullsCapacity( + flat->mayHaveNulls(), size, context_.inputBufferGrowthPolicy.get()); bool rangeCopied = false; if (!flat->mayHaveNulls()) { if constexpr ( @@ -342,7 +342,8 @@ class SimpleFieldWriter : public FieldWriter { } else { auto decodingContext = context_.getDecodingContext(); auto& decoded = decodingContext.decode(vector, ranges); - valuesStream_.ensureNullsCapacity(decoded.mayHaveNulls(), size); + valuesStream_.ensureNullsCapacity( + decoded.mayHaveNulls(), size, context_.inputBufferGrowthPolicy.get()); auto nonNullCount = iterateNonNullValues( ranges, valuesStream_.mutableNonNulls(), @@ -408,7 +409,8 @@ class RowFieldWriter : public FieldWriter { "Schema mismatch: expected {} fields, but got {} fields", fields_.size(), row->childrenSize())); - nullsStream_.ensureNullsCapacity(vector->mayHaveNulls(), size); + nullsStream_.ensureNullsCapacity( + vector->mayHaveNulls(), size, context_.inputBufferGrowthPolicy.get()); if (row->mayHaveNulls()) { childRangesPtr = &childRanges; auto nonNullCount = iterateNonNullIndices( @@ -432,7 +434,8 @@ class RowFieldWriter : public FieldWriter { fields_.size(), row->childrenSize())); childRangesPtr = &childRanges; - nullsStream_.ensureNullsCapacity(decoded.mayHaveNulls(), size); + nullsStream_.ensureNullsCapacity( + decoded.mayHaveNulls(), size, context_.inputBufferGrowthPolicy.get()); auto nonNullCount = iterateNonNullIndices( ranges, nullsStream_.mutableNonNulls(), @@ -521,7 +524,8 @@ class MultiValueFieldWriter : public FieldWriter { offsets = casted->rawOffsets(); lengths = casted->rawSizes(); - lengthsStream_.ensureNullsCapacity(casted->mayHaveNulls(), size); + lengthsStream_.ensureNullsCapacity( + casted->mayHaveNulls(), size, context_.inputBufferGrowthPolicy.get()); auto nonNullCount = iterateNonNullIndices( ranges, lengthsStream_.mutableNonNulls(), Flat{vector}, proc); nullCount = size - nonNullCount; @@ -533,7 +537,8 @@ class MultiValueFieldWriter : public FieldWriter { offsets = casted->rawOffsets(); lengths = casted->rawSizes(); - lengthsStream_.ensureNullsCapacity(decoded.mayHaveNulls(), size); + lengthsStream_.ensureNullsCapacity( + decoded.mayHaveNulls(), size, context_.inputBufferGrowthPolicy.get()); auto nonNullCount = iterateNonNullIndices( ranges, lengthsStream_.mutableNonNulls(), Decoded{decoded}, proc); nullCount = size - nonNullCount; @@ -747,7 +752,10 @@ class SlidingWindowMapFieldWriter : public FieldWriter { if (mapVector) { rawOffsets = mapVector->rawOffsets(); rawLengths = mapVector->rawSizes(); - offsetsStream_.ensureNullsCapacity(mapVector->mayHaveNulls(), size); + offsetsStream_.ensureNullsCapacity( + mapVector->mayHaveNulls(), + size, + context_.inputBufferGrowthPolicy.get()); Flat iterableVector{vector}; auto nonNullCount = iterateNonNullIndices( ranges, @@ -762,7 +770,8 @@ class SlidingWindowMapFieldWriter : public FieldWriter { NIMBLE_ASSERT(mapVector, "Unexpected vector type"); rawOffsets = mapVector->rawOffsets(); rawLengths = mapVector->rawSizes(); - offsetsStream_.ensureNullsCapacity(decoded.mayHaveNulls(), size); + offsetsStream_.ensureNullsCapacity( + decoded.mayHaveNulls(), size, context_.inputBufferGrowthPolicy.get()); Decoded iterableVector{decoded}; auto nonNullCount = iterateNonNullIndices( ranges, @@ -974,7 +983,8 @@ class FlatMapFieldWriter : public FieldWriter { flatMap, "Unexpected vector type. Vector must be a decoded ROW vector."); const auto size = ranges.size(); - nullsStream_.ensureNullsCapacity(flatMap->mayHaveNulls(), size); + nullsStream_.ensureNullsCapacity( + flatMap->mayHaveNulls(), size, context_.inputBufferGrowthPolicy.get()); const auto& keys = flatMap->type()->asRow().names(); const auto& values = flatMap->children(); @@ -1080,7 +1090,8 @@ class FlatMapFieldWriter : public FieldWriter { offsets = map->rawOffsets(); lengths = map->rawSizes(); - nullsStream_.ensureNullsCapacity(map->mayHaveNulls(), size); + nullsStream_.ensureNullsCapacity( + map->mayHaveNulls(), size, context_.inputBufferGrowthPolicy.get()); processVector(map, Flat{vector}); } else { // Map is encoded. Decode. @@ -1091,7 +1102,10 @@ class FlatMapFieldWriter : public FieldWriter { offsets = map->rawOffsets(); lengths = map->rawSizes(); - nullsStream_.ensureNullsCapacity(decodedMap.mayHaveNulls(), size); + nullsStream_.ensureNullsCapacity( + decodedMap.mayHaveNulls(), + size, + context_.inputBufferGrowthPolicy.get()); processVector(map, Decoded{decodedMap}); } @@ -1357,7 +1371,10 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter { const OrderedRanges& ranges, OrderedRanges& filteredRanges) { auto size = ranges.size(); - offsetsStream_.ensureNullsCapacity(dictionaryVector.mayHaveNulls(), size); + offsetsStream_.ensureNullsCapacity( + dictionaryVector.mayHaveNulls(), + size, + context_.inputBufferGrowthPolicy.get()); auto& offsetsData = offsetsStream_.mutableData(); auto& lengthsData = lengthsStream_.mutableData(); @@ -1575,7 +1592,10 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter { rawOffsets = arrayVector->rawOffsets(); rawLengths = arrayVector->rawSizes(); - offsetsStream_.ensureNullsCapacity(arrayVector->mayHaveNulls(), size); + offsetsStream_.ensureNullsCapacity( + arrayVector->mayHaveNulls(), + size, + context_.inputBufferGrowthPolicy.get()); Flat iterableVector{vector}; iterateNonNullIndices( ranges, offsetsStream_.mutableNonNulls(), iterableVector, proc); @@ -1589,7 +1609,8 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter { rawOffsets = arrayVector->rawOffsets(); rawLengths = arrayVector->rawSizes(); - offsetsStream_.ensureNullsCapacity(decoded.mayHaveNulls(), size); + offsetsStream_.ensureNullsCapacity( + decoded.mayHaveNulls(), size, context_.inputBufferGrowthPolicy.get()); Decoded iterableVector{decoded}; iterateNonNullIndices( ranges, offsetsStream_.mutableNonNulls(), iterableVector, proc); diff --git a/dwio/nimble/velox/StreamData.cpp b/dwio/nimble/velox/StreamData.cpp index 451312c6..01e6556a 100644 --- a/dwio/nimble/velox/StreamData.cpp +++ b/dwio/nimble/velox/StreamData.cpp @@ -18,10 +18,15 @@ namespace facebook::nimble { -void NullsStreamData::ensureNullsCapacity(bool mayHaveNulls, uint32_t size) { +void NullsStreamData::ensureNullsCapacity( + bool mayHaveNulls, + uint32_t size, + InputBufferGrowthPolicy* growthPolicy) { if (mayHaveNulls || hasNulls_) { auto newSize = bufferedCount_ + size; - nonNulls_.reserve(newSize); + auto newCapacity = growthPolicy->getExtendedCapacity( + bufferedCount_ + size, nonNulls_.capacity()); + nonNulls_.reserve(newCapacity); if (!hasNulls_) { hasNulls_ = true; std::fill(nonNulls_.data(), nonNulls_.data() + bufferedCount_, true); diff --git a/dwio/nimble/velox/StreamData.h b/dwio/nimble/velox/StreamData.h index fb15469e..5fb39afe 100644 --- a/dwio/nimble/velox/StreamData.h +++ b/dwio/nimble/velox/StreamData.h @@ -20,6 +20,7 @@ #include #include "dwio/nimble/common/Vector.h" +#include "dwio/nimble/velox/BufferGrowthPolicy.h" #include "dwio/nimble/velox/SchemaBuilder.h" #include "velox/common/memory/Memory.h" @@ -160,7 +161,10 @@ class NullsStreamData : public StreamData { } } - void ensureNullsCapacity(bool mayHaveNulls, uint32_t size); + void ensureNullsCapacity( + bool mayHaveNulls, + uint32_t size, + InputBufferGrowthPolicy* growthPolicy); protected: Vector nonNulls_;