From ae805f3bacf12d38475065f8445cd497f89f32d0 Mon Sep 17 00:00:00 2001 From: MacVincent Agha-Oko Date: Tue, 4 Nov 2025 10:29:15 -0800 Subject: [PATCH] Support Per Stream Chunking to Relieve Memory Pressure (#243) Summary: This is an implementation of a detail in the new chunking policy described in this [doc](https://fburl.com/gdoc/gkdwwju1). Rather than chunking all eligible streams, we chunk individual streams in the order of their raw size until memory pressure is relieved. For our unit tests, the maximum number of chunks produced is identical to the previous implementation. But there may be differences for large file sizes. This requires more experimentation and tuning to determine the right threshold value that takes advantage of this. Reviewed By: helfman Differential Revision: D81715655 --- dwio/nimble/velox/VeloxWriter.cpp | 45 ++++++-- dwio/nimble/velox/VeloxWriter.h | 4 +- dwio/nimble/velox/VeloxWriterOptions.h | 4 + dwio/nimble/velox/tests/VeloxWriterTests.cpp | 108 ++++++++++++++++++- 4 files changed, 151 insertions(+), 10 deletions(-) diff --git a/dwio/nimble/velox/VeloxWriter.cpp b/dwio/nimble/velox/VeloxWriter.cpp index f720551..3a24173 100644 --- a/dwio/nimble/velox/VeloxWriter.cpp +++ b/dwio/nimble/velox/VeloxWriter.cpp @@ -806,7 +806,9 @@ void VeloxWriter::writeChunk(bool lastChunk) { << ", chunk bytes: " << chunkSize; } -bool VeloxWriter::writeChunks(bool lastChunk) { +bool VeloxWriter::writeChunks( + std::span streamIndices, + bool lastChunk) { uint64_t previousFlushWallTime = context_->stripeFlushTiming.wallNanos; std::atomic chunkSize = 0; std::atomic logicalSizeBeforeEncoding = 0; @@ -873,15 +875,18 @@ bool VeloxWriter::writeChunks(bool lastChunk) { } }; + const auto& streams = context_->streams(); if (context_->options.encodingExecutor) { velox::dwio::common::ExecutorBarrier barrier{ context_->options.encodingExecutor}; - for (auto& streamData : context_->streams()) { + for (auto streamIndex : streamIndices) { + auto& streamData = streams[streamIndex]; barrier.add([&] { processStream(*streamData); }); } barrier.waitAll(); } else { - for (auto& streamData : context_->streams()) { + for (auto streamIndex : streamIndices) { + auto& streamData = streams[streamIndex]; processStream(*streamData); } } @@ -910,8 +915,10 @@ bool VeloxWriter::writeStripe() { } if (context_->options.enableChunking) { - writeChunks(true); - + // Chunk all streams. + std::vector streamIndices(context_->streams().size()); + std::iota(streamIndices.begin(), streamIndices.end(), 0); + writeChunks(streamIndices, true); } else { writeChunk(true); } @@ -989,8 +996,32 @@ bool VeloxWriter::evalauateFlushPolicy() { }); }; - if (context_->options.enableChunking) { - while (shouldChunk() && writeChunks(false)) { + if (context_->options.enableChunking && shouldChunk()) { + const auto& streams = context_->streams(); + const size_t streamCount = streams.size(); + // Sort streams for chunking based on raw memory usage. + // TODO(T240072104): Improve performance by bucketing the streams by size + // (most significant bit) instead of sorting. + std::vector streamIndices(streamCount); + std::iota(streamIndices.begin(), streamIndices.end(), 0); + std::sort( + streamIndices.begin(), + streamIndices.end(), + [&](const uint32_t& a, const uint32_t& b) { + return streams[a]->memoryUsed() > streams[b]->memoryUsed(); + }); + + // Chunk streams in batches. + const auto batchSize = context_->options.chunkedStreamBatchSize; + for (size_t index = 0; index < streamCount; index += batchSize) { + const size_t currentBatchSize = std::min(batchSize, streamCount - index); + std::span batchIndices( + streamIndices.begin() + index, currentBatchSize); + // Stop attempting chunking once streams are too small to chunk or + // memory pressure is relieved. + if (!(writeChunks(batchIndices, false) && shouldChunk())) { + break; + } } } diff --git a/dwio/nimble/velox/VeloxWriter.h b/dwio/nimble/velox/VeloxWriter.h index 6129d83..aa49f52 100644 --- a/dwio/nimble/velox/VeloxWriter.h +++ b/dwio/nimble/velox/VeloxWriter.h @@ -90,7 +90,9 @@ class VeloxWriter { bool writeStripe(); void writeChunk(bool lastChunk = true); // Returns 'true' if chunks were written. - bool writeChunks(bool lastChunk = true); + bool writeChunks( + std::span streamIndices, + bool lastChunk = true); }; } // namespace facebook::nimble diff --git a/dwio/nimble/velox/VeloxWriterOptions.h b/dwio/nimble/velox/VeloxWriterOptions.h index 28e49a1..f4fbbc0 100644 --- a/dwio/nimble/velox/VeloxWriterOptions.h +++ b/dwio/nimble/velox/VeloxWriterOptions.h @@ -96,6 +96,10 @@ struct VeloxWriterOptions { // Note: this threshold is ignored when it is time to flush a stripe. uint64_t minStreamChunkRawSize = 1024; + // Number of streams to try chunking between memory pressure evaluations. + // Note: this is ignored when it is time to flush a stripe. + size_t chunkedStreamBatchSize = 1024; + // The factory function that produces the root encoding selection policy. // Encoding selection policy is the way to balance the tradeoffs of // different performance factors (at both read and write times). Heuristics diff --git a/dwio/nimble/velox/tests/VeloxWriterTests.cpp b/dwio/nimble/velox/tests/VeloxWriterTests.cpp index 1bed20d..9f6024a 100644 --- a/dwio/nimble/velox/tests/VeloxWriterTests.cpp +++ b/dwio/nimble/velox/tests/VeloxWriterTests.cpp @@ -2020,6 +2020,7 @@ struct ChunkFlushPolicyTestCase { const uint32_t expectedStripeCount{0}; const uint32_t expectedMaxChunkCount{0}; const uint32_t expectedMinChunkCount{0}; + const uint32_t chunkedStreamBatchSize{2}; }; class ChunkFlushPolicyTest @@ -2031,6 +2032,7 @@ TEST_P(ChunkFlushPolicyTest, ChunkFlushPolicyIntegration) { {{"BIGINT", velox::BIGINT()}, {"SMALLINT", velox::SMALLINT()}}); nimble::VeloxWriterOptions writerOptions{ .minStreamChunkRawSize = GetParam().minStreamChunkRawSize, + .chunkedStreamBatchSize = GetParam().chunkedStreamBatchSize, .flushPolicyFactory = GetParam().enableChunking ? []() -> std::unique_ptr { return std::make_unique( @@ -2168,6 +2170,87 @@ TEST_F(VeloxWriterTests, FuzzComplex) { } } +TEST_F(VeloxWriterTests, BatchedChunkingRelievesMemoryPressure) { + // Verify we stop chunking early when chunking relieves memory pressure. + const uint32_t seed = FLAGS_writer_tests_seed > 0 ? FLAGS_writer_tests_seed + : folly::Random::rand32(); + LOG(INFO) << "seed: " << seed; + std::mt19937 rng{seed}; + const uint32_t rowCount = + std::uniform_int_distribution(1, 4096)(rng); + + velox::VectorFuzzer fuzzer({.vectorSize = rowCount}, leafPool_.get(), seed); + const auto stringColumn = fuzzer.fuzzFlat(velox::VARCHAR()); + const auto intColumn = fuzzer.fuzzFlat(velox::INTEGER()); + + nimble::RawSizeContext context; + nimble::OrderedRanges ranges; + ranges.add(0, rowCount); + const uint64_t stringColumnRawSize = + nimble::getRawSizeFromVector(stringColumn, ranges, context) + + sizeof(std::string_view) * rowCount; + const uint64_t intColumnRawSize = + nimble::getRawSizeFromVector(intColumn, ranges, context); + + constexpr size_t kColumnCount = 20; + constexpr size_t kBatchSize = 4; + std::vector children(kColumnCount); + std::vector columnNames(kColumnCount); + uint64_t totalRawSize = 0; + for (size_t i = 0; i < kColumnCount; i += 2) { + columnNames[i] = fmt::format("string_column_{}", i); + columnNames[i + 1] = fmt::format("int_column_{}", i); + children[i] = stringColumn; + children[i + 1] = intColumn; + totalRawSize += intColumnRawSize + stringColumnRawSize; + } + + velox::test::VectorMaker vectorMaker{leafPool_.get()}; + const auto rowVector = vectorMaker.rowVector(columnNames, children); + + // We will return true twice and false once + const std::vector expectedChunkingDecisions{true, true, false}; + std::vector actualChunkingDecisions; + + // We will be chunking the large streams in the first two batches. 8 string + // streams in total. We set the expected rawSize after chunking these two + // batches as our memory threshold. + const uint64_t memoryPressureThreshold = + totalRawSize - (2 * kBatchSize * stringColumnRawSize); + + nimble::VeloxWriterOptions writerOptions; + writerOptions.chunkedStreamBatchSize = kBatchSize; + writerOptions.enableChunking = true; + writerOptions.minStreamChunkRawSize = intColumnRawSize / 2; + writerOptions.flushPolicyFactory = + [&]() -> std::unique_ptr { + return std::make_unique( + /* shouldFlush */ [](const auto&) { return true; }, + /* shouldChunk */ + [&](const nimble::StripeProgress& stripeProgress) { + const bool shouldChunk = + stripeProgress.stripeRawSize > memoryPressureThreshold; + actualChunkingDecisions.push_back(shouldChunk); + return shouldChunk; + }); + }; + + std::string file; + auto writeFile = std::make_unique(&file); + nimble::VeloxWriter writer( + *rootPool_, rowVector->type(), std::move(writeFile), writerOptions); + writer.write(rowVector); + writer.close(); + + EXPECT_THAT( + actualChunkingDecisions, + ::testing::ElementsAreArray(expectedChunkingDecisions)); + + velox::InMemoryReadFile readFile(file); + nimble::VeloxReader reader(*leafPool_, &readFile); + validateChunkSize(reader, writerOptions.minStreamChunkRawSize); +} + INSTANTIATE_TEST_CASE_P( StripeRawSizeFlushPolicyTestSuite, StripeRawSizeFlushPolicyTest, @@ -2209,6 +2292,7 @@ INSTANTIATE_TEST_CASE_P( .expectedStripeCount = 4, .expectedMaxChunkCount = 1, .expectedMinChunkCount = 1, + .chunkedStreamBatchSize = 2, }, // Baseline with default settings (has chunking) ChunkFlushPolicyTestCase{ @@ -2222,6 +2306,7 @@ INSTANTIATE_TEST_CASE_P( .expectedStripeCount = 7, .expectedMaxChunkCount = 2, .expectedMinChunkCount = 1, + .chunkedStreamBatchSize = 2, }, // High memory regression threshold and no compression // Produces file identical to RawStripeSizeFlushPolicy @@ -2238,6 +2323,7 @@ INSTANTIATE_TEST_CASE_P( .expectedStripeCount = 4, .expectedMaxChunkCount = 1, .expectedMinChunkCount = 1, + .chunkedStreamBatchSize = 2, }, // Low memory regression threshold // Produces file with more min chunks per stripe @@ -2253,7 +2339,8 @@ INSTANTIATE_TEST_CASE_P( .minStreamChunkRawSize = 100, .expectedStripeCount = 10, .expectedMaxChunkCount = 2, - .expectedMinChunkCount = 2, + .expectedMinChunkCount = 2, // +1 chunk + .chunkedStreamBatchSize = 2, }, // High target stripe size bytes (with disabled memory pressure // optimization) produces fewer stripes. Single chunks. @@ -2271,6 +2358,8 @@ INSTANTIATE_TEST_CASE_P( .expectedStripeCount = 1, .expectedMaxChunkCount = 1, .expectedMinChunkCount = 1, + .chunkedStreamBatchSize = 2, + }, // Low target stripe size bytes (with disabled memory pressure // optimization) produces more stripes. Single chunks. @@ -2288,5 +2377,20 @@ INSTANTIATE_TEST_CASE_P( .expectedStripeCount = 7, .expectedMaxChunkCount = 1, .expectedMinChunkCount = 1, - })); + .chunkedStreamBatchSize = 2, + + }, + // Higher chunked stream batch size (no change in policy) + ChunkFlushPolicyTestCase{ + .batchCount = 20, + .enableChunking = true, + .targetStripeSizeBytes = 250 << 10, // 250KB + .writerMemoryHighThresholdBytes = 80 << 10, + .writerMemoryLowThresholdBytes = 75 << 10, + .estimatedCompressionFactor = 1.0, + .minStreamChunkRawSize = 100, + .expectedStripeCount = 7, + .expectedMaxChunkCount = 2, + .expectedMinChunkCount = 1, + .chunkedStreamBatchSize = 10})); } // namespace facebook