Skip to content

Commit 91dbed7

Browse files
macvincentfacebook-github-bot
authored andcommitted
Support Per Stream Chunking to Relieve Memory Pressure (facebookincubator#243)
Summary: This is an implementation of a detail in the new chunking policy described in this [doc](https://fburl.com/gdoc/gkdwwju1). Rather than chunking all eligible streams, we chunk individual streams in the order of their raw size until memory pressure is relieved. For our unit tests, the maximum number of chunks produced is identical to the previous implementation. But there may be differences for large file sizes. This requires more experimentation and tuning to determine the right threshold value that takes advantage of this. Differential Revision: D81715655
1 parent 660170e commit 91dbed7

File tree

4 files changed

+78
-11
lines changed

4 files changed

+78
-11
lines changed

dwio/nimble/velox/VeloxWriter.cpp

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -641,7 +641,9 @@ void VeloxWriter::flush() {
641641
}
642642
}
643643

644-
bool VeloxWriter::writeChunk(bool lastChunk) {
644+
bool VeloxWriter::writeChunk(
645+
bool lastChunk,
646+
const std::unordered_set<uint32_t>& streamIndices) {
645647
uint64_t previousFlushWallTime = context_->stripeFlushTiming.wallNanos;
646648
std::atomic<uint64_t> chunkSize = 0;
647649
std::atomic<uint64_t> logicalSizeBeforeEncoding = 0;
@@ -746,9 +748,11 @@ bool VeloxWriter::writeChunk(bool lastChunk) {
746748
velox::dwio::common::ExecutorBarrier barrier{
747749
context_->options.encodingExecutor};
748750
for (auto& streamData : context_->streams()) {
749-
auto& streamSize =
750-
context_->columnStats[streamData->descriptor().offset()]
751-
.physicalSize;
751+
auto offset = streamData->descriptor().offset();
752+
auto& streamSize = context_->columnStats[offset].physicalSize;
753+
if (!streamIndices.empty() && !streamIndices.contains(offset)) {
754+
continue;
755+
}
752756
processStream(
753757
*streamData, [&](StreamData& innerStreamData, bool isNullStream) {
754758
barrier.add(
@@ -765,9 +769,11 @@ bool VeloxWriter::writeChunk(bool lastChunk) {
765769
barrier.waitAll();
766770
} else {
767771
for (auto& streamData : context_->streams()) {
768-
auto& streamSize =
769-
context_->columnStats[streamData->descriptor().offset()]
770-
.physicalSize;
772+
auto offset = streamData->descriptor().offset();
773+
auto& streamSize = context_->columnStats[offset].physicalSize;
774+
if (!streamIndices.empty() && !streamIndices.contains(offset)) {
775+
continue;
776+
}
771777
processStream(
772778
*streamData,
773779
[&encode, &streamSize](
@@ -871,8 +877,39 @@ bool VeloxWriter::tryWriteStripe(bool force) {
871877

872878
try {
873879
// TODO: we can improve merge the last chunk write with stripe
874-
if (context_->options.enableChunking) {
875-
while (shouldChunk() == ChunkDecision::Chunk && writeChunk(false)) {
880+
if (context_->options.enableChunking &&
881+
shouldChunk() == ChunkDecision::Chunk) {
882+
const auto& streams = context_->streams();
883+
// Sort streams for chunking based on raw memory usage.
884+
std::vector<uint32_t> streamIndices(streams.size());
885+
std::iota(streamIndices.begin(), streamIndices.end(), 0);
886+
std::sort(
887+
streamIndices.begin(),
888+
streamIndices.end(),
889+
[&](const uint32_t& a, const uint32_t& b) {
890+
return streams[a]->memoryUsed() > streams[b]->memoryUsed();
891+
});
892+
893+
// Chunk streams in batches.
894+
uint32_t currentIndex = 0;
895+
ChunkDecision decision = ChunkDecision::Chunk;
896+
NIMBLE_DASSERT(
897+
context_->options.chunkedStreamBatchSize > 0,
898+
"streamEncodingBatchSize must be greater than 0");
899+
while (currentIndex < streams.size() &&
900+
decision == ChunkDecision::Chunk) {
901+
uint32_t endStreamIndex = std::min(
902+
static_cast<uint32_t>(streams.size()),
903+
currentIndex + context_->options.chunkedStreamBatchSize);
904+
std::unordered_set<uint32_t> streamIndicesToChunk(
905+
streamIndices.begin() + currentIndex,
906+
streamIndices.begin() + endStreamIndex);
907+
currentIndex = endStreamIndex;
908+
// Stop attempting chunking once streams are too small to chunk.
909+
if (!writeChunk(false, streamIndicesToChunk)) {
910+
break;
911+
}
912+
decision = shouldChunk();
876913
}
877914
}
878915

dwio/nimble/velox/VeloxWriter.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,9 @@ class VeloxWriter {
8787
// Returning 'true' if stripe was written.
8888
bool tryWriteStripe(bool force = false);
8989
// Returns 'true' if chunk was written.
90-
bool writeChunk(bool lastChunk = true);
90+
bool writeChunk(
91+
bool lastChunk = true,
92+
const std::unordered_set<uint32_t>& streamIndices = {});
9193
uint32_t writeStripe();
9294
};
9395

dwio/nimble/velox/VeloxWriterOptions.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,10 @@ struct VeloxWriterOptions {
9696
// Note: this threshold is ignored when it is time to flush a stripe.
9797
uint64_t minStreamChunkRawSize = 1024;
9898

99+
// Number of streams to process in parallel during chunked encoding.
100+
// Note: this is ignored when it is time to flush a stripe.
101+
uint32_t chunkedStreamBatchSize = 1024;
102+
99103
// The factory function that produces the root encoding selection policy.
100104
// Encoding selection policy is the way to balance the tradeoffs of
101105
// different performance factors (at both read and write times). Heuristics

dwio/nimble/velox/tests/VeloxWriterTests.cpp

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1965,6 +1965,7 @@ struct ChunkFlushPolicyTestCase {
19651965
const uint32_t expectedStripeCount{0};
19661966
const uint32_t expectedMaxChunkCount{0};
19671967
const uint32_t expectedMinChunkCount{0};
1968+
const uint32_t chunkedStreamBatchSize{2};
19681969
};
19691970

19701971
class ChunkFlushPolicyTest
@@ -1976,6 +1977,7 @@ TEST_P(ChunkFlushPolicyTest, ChunkFlushPolicyIntegration) {
19761977
{{"BIGINT", velox::BIGINT()}, {"SMALLINT", velox::SMALLINT()}});
19771978
nimble::VeloxWriterOptions writerOptions{
19781979
.minStreamChunkRawSize = GetParam().minStreamChunkRawSize,
1980+
.chunkedStreamBatchSize = GetParam().chunkedStreamBatchSize,
19791981
.flushPolicyFactory = GetParam().enableChunking
19801982
? []() -> std::unique_ptr<nimble::FlushPolicy> {
19811983
return std::make_unique<nimble::ChunkFlushPolicy>(
@@ -2098,6 +2100,7 @@ INSTANTIATE_TEST_CASE_P(
20982100
.expectedStripeCount = 4,
20992101
.expectedMaxChunkCount = 1,
21002102
.expectedMinChunkCount = 1,
2103+
.chunkedStreamBatchSize = 10,
21012104
},
21022105
// Base case with default settings (has chunking)
21032106
ChunkFlushPolicyTestCase{
@@ -2111,6 +2114,7 @@ INSTANTIATE_TEST_CASE_P(
21112114
.expectedStripeCount = 3,
21122115
.expectedMaxChunkCount = 7,
21132116
.expectedMinChunkCount = 3,
2117+
.chunkedStreamBatchSize = 10,
21142118
},
21152119
// High memory regression threshold
21162120
// Produces file identical to RawStripeSizeFlushPolicy
@@ -2125,6 +2129,7 @@ INSTANTIATE_TEST_CASE_P(
21252129
.expectedStripeCount = 4,
21262130
.expectedMaxChunkCount = 1,
21272131
.expectedMinChunkCount = 1,
2132+
.chunkedStreamBatchSize = 10,
21282133
},
21292134
// Low memory regression threshold
21302135
// Produces file with more chunks per stripe
@@ -2139,6 +2144,7 @@ INSTANTIATE_TEST_CASE_P(
21392144
.expectedStripeCount = 3,
21402145
.expectedMaxChunkCount = 8,
21412146
.expectedMinChunkCount = 4,
2147+
.chunkedStreamBatchSize = 10,
21422148
},
21432149
// High target stripe size bytes (with disabled memory pressure
21442150
// optimization) produces fewer stripes. Single chunks.
@@ -2153,6 +2159,8 @@ INSTANTIATE_TEST_CASE_P(
21532159
.expectedStripeCount = 1, // -2 stripes
21542160
.expectedMaxChunkCount = 1,
21552161
.expectedMinChunkCount = 1,
2162+
.chunkedStreamBatchSize = 10,
2163+
21562164
},
21572165
// Low target stripe size bytes (with disabled memory pressure
21582166
// optimization) produces more stripes. Single chunks.
@@ -2167,5 +2175,21 @@ INSTANTIATE_TEST_CASE_P(
21672175
.expectedStripeCount = 7, // +6 stripes
21682176
.expectedMaxChunkCount = 1,
21692177
.expectedMinChunkCount = 1,
2170-
}));
2178+
.chunkedStreamBatchSize = 10,
2179+
2180+
},
2181+
// Higher chunked stream batch size (no change in policy)
2182+
ChunkFlushPolicyTestCase{
2183+
.batchCount = 20,
2184+
.enableChunking = true,
2185+
.targetStripeSizeBytes = 250 << 10, // 250KB
2186+
.writerMemoryHighThreshold = 80 << 10,
2187+
.writerMemoryLowThreshold = 75 << 10,
2188+
.compressionRatioFactor = 1.0,
2189+
.minStreamChunkRawSize = 100,
2190+
.expectedStripeCount = 3,
2191+
.expectedMaxChunkCount = 7,
2192+
.expectedMinChunkCount = 3,
2193+
.chunkedStreamBatchSize = 3} // +1
2194+
));
21712195
} // namespace facebook

0 commit comments

Comments
 (0)