Skip to content

Commit ca44263

Browse files
macvincentmeta-codesync[bot]
authored andcommitted
Support Per Stream Chunking to Relieve Memory Pressure (facebookincubator#243)
Summary: Pull Request resolved: facebookincubator#243 This is an implementation of a detail in the new chunking policy described in this [doc](https://fburl.com/gdoc/gkdwwju1). Rather than chunking all eligible streams, we chunk individual streams in the order of their raw size until memory pressure is relieved. For our unit tests, the maximum number of chunks produced is identical to the previous implementation. But there may be differences for large file sizes. This requires more experimentation and tuning to determine the right threshold value that takes advantage of this. Differential Revision: D81715655
1 parent ee8c934 commit ca44263

File tree

4 files changed

+69
-9
lines changed

4 files changed

+69
-9
lines changed

dwio/nimble/velox/VeloxWriter.cpp

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -799,7 +799,9 @@ void VeloxWriter::writeChunk(bool lastChunk) {
799799
<< ", chunk bytes: " << chunkSize;
800800
}
801801

802-
bool VeloxWriter::writeChunks(bool lastChunk) {
802+
bool VeloxWriter::writeChunks(
803+
std::span<const uint32_t> streamIndices,
804+
bool lastChunk) {
803805
uint64_t previousFlushWallTime = context_->stripeFlushTiming.wallNanos;
804806
std::atomic<uint64_t> chunkSize = 0;
805807
std::atomic<uint64_t> logicalSizeBeforeEncoding = 0;
@@ -866,15 +868,18 @@ bool VeloxWriter::writeChunks(bool lastChunk) {
866868
}
867869
};
868870

871+
const auto& streams = context_->streams();
869872
if (context_->options.encodingExecutor) {
870873
velox::dwio::common::ExecutorBarrier barrier{
871874
context_->options.encodingExecutor};
872-
for (auto& streamData : context_->streams()) {
875+
for (auto streamIndex : streamIndices) {
876+
auto& streamData = streams[streamIndex];
873877
barrier.add([&] { processStream(*streamData); });
874878
}
875879
barrier.waitAll();
876880
} else {
877-
for (auto& streamData : context_->streams()) {
881+
for (auto streamIndex : streamIndices) {
882+
auto& streamData = streams[streamIndex];
878883
processStream(*streamData);
879884
}
880885
}
@@ -911,8 +916,10 @@ bool VeloxWriter::writeStripe() {
911916
}
912917

913918
if (context_->options.enableChunking) {
914-
writeChunks(true);
915-
919+
// Chunk all streams.
920+
std::vector<uint32_t> streamIndices(context_->streams().size());
921+
std::iota(streamIndices.begin(), streamIndices.end(), 0);
922+
writeChunks(streamIndices, true);
916923
} else {
917924
writeChunk(true);
918925
}
@@ -998,8 +1005,32 @@ bool VeloxWriter::evalauateFlushPolicy() {
9981005
});
9991006
};
10001007

1001-
if (context_->options.enableChunking) {
1002-
while (shouldChunk() && writeChunks(false)) {
1008+
if (context_->options.enableChunking && shouldChunk()) {
1009+
const auto& streams = context_->streams();
1010+
const size_t streamCount = streams.size();
1011+
// Sort streams for chunking based on raw memory usage.
1012+
// TODO(T240072104): Improve performance by bucketing the streams by size
1013+
// (most significant bit) instead of sorting.
1014+
std::vector<uint32_t> streamIndices(streamCount);
1015+
std::iota(streamIndices.begin(), streamIndices.end(), 0);
1016+
std::sort(
1017+
streamIndices.begin(),
1018+
streamIndices.end(),
1019+
[&](const uint32_t& a, const uint32_t& b) {
1020+
return streams[a]->memoryUsed() > streams[b]->memoryUsed();
1021+
});
1022+
1023+
// Chunk streams in batches.
1024+
const auto batchSize = context_->options.chunkedStreamBatchSize;
1025+
for (size_t index = 0; index < streamCount; index += batchSize) {
1026+
const size_t currentBatchSize = std::min(batchSize, streamCount - index);
1027+
std::span<const uint32_t> batchIndices(
1028+
streamIndices.begin() + index, currentBatchSize);
1029+
// Stop attempting chunking once streams are too small to chunk or
1030+
// memory pressure is relieved.
1031+
if (!(writeChunks(batchIndices, false) && shouldChunk())) {
1032+
break;
1033+
}
10031034
}
10041035
}
10051036

dwio/nimble/velox/VeloxWriter.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,9 @@ class VeloxWriter {
9090
bool writeStripe();
9191
void writeChunk(bool lastChunk = true);
9292
// Returns 'true' if chunks were written.
93-
bool writeChunks(bool lastChunk = true);
93+
bool writeChunks(
94+
std::span<const uint32_t> streamIndices,
95+
bool lastChunk = true);
9496
};
9597

9698
} // namespace facebook::nimble

dwio/nimble/velox/VeloxWriterOptions.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,10 @@ struct VeloxWriterOptions {
9696
// Note: this threshold is ignored when it is time to flush a stripe.
9797
uint64_t minStreamChunkRawSize = 1024;
9898

99+
// Number of streams to try chunking between memory pressure evaluations.
100+
// Note: this is ignored when it is time to flush a stripe.
101+
size_t chunkedStreamBatchSize = 1024;
102+
99103
// The factory function that produces the root encoding selection policy.
100104
// Encoding selection policy is the way to balance the tradeoffs of
101105
// different performance factors (at both read and write times). Heuristics

dwio/nimble/velox/tests/VeloxWriterTests.cpp

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1947,6 +1947,7 @@ struct ChunkFlushPolicyTestCase {
19471947
const uint32_t expectedStripeCount{0};
19481948
const uint32_t expectedMaxChunkCount{0};
19491949
const uint32_t expectedMinChunkCount{0};
1950+
const uint32_t chunkedStreamBatchSize{2};
19501951
};
19511952

19521953
class ChunkFlushPolicyTest
@@ -1958,6 +1959,7 @@ TEST_P(ChunkFlushPolicyTest, ChunkFlushPolicyIntegration) {
19581959
{{"BIGINT", velox::BIGINT()}, {"SMALLINT", velox::SMALLINT()}});
19591960
nimble::VeloxWriterOptions writerOptions{
19601961
.minStreamChunkRawSize = GetParam().minStreamChunkRawSize,
1962+
.chunkedStreamBatchSize = GetParam().chunkedStreamBatchSize,
19611963
.flushPolicyFactory = GetParam().enableChunking
19621964
? []() -> std::unique_ptr<nimble::FlushPolicy> {
19631965
return std::make_unique<nimble::ChunkFlushPolicy>(
@@ -2075,6 +2077,7 @@ INSTANTIATE_TEST_CASE_P(
20752077
.expectedStripeCount = 4,
20762078
.expectedMaxChunkCount = 1,
20772079
.expectedMinChunkCount = 1,
2080+
.chunkedStreamBatchSize = 2,
20782081
},
20792082
// Baseline with default settings (has chunking)
20802083
ChunkFlushPolicyTestCase{
@@ -2088,6 +2091,7 @@ INSTANTIATE_TEST_CASE_P(
20882091
.expectedStripeCount = 7,
20892092
.expectedMaxChunkCount = 2,
20902093
.expectedMinChunkCount = 1,
2094+
.chunkedStreamBatchSize = 2,
20912095
},
20922096
// High memory regression threshold and no compression
20932097
// Produces file identical to RawStripeSizeFlushPolicy
@@ -2102,6 +2106,7 @@ INSTANTIATE_TEST_CASE_P(
21022106
.expectedStripeCount = 4,
21032107
.expectedMaxChunkCount = 1,
21042108
.expectedMinChunkCount = 1,
2109+
.chunkedStreamBatchSize = 2,
21052110
},
21062111
// Low memory regression threshold
21072112
// Produces file with more min chunks per stripe
@@ -2116,6 +2121,7 @@ INSTANTIATE_TEST_CASE_P(
21162121
.expectedStripeCount = 10,
21172122
.expectedMaxChunkCount = 2,
21182123
.expectedMinChunkCount = 2, // +1 chunk
2124+
.chunkedStreamBatchSize = 2,
21192125
},
21202126
// High target stripe size bytes (with disabled memory pressure
21212127
// optimization) produces fewer stripes. Single chunks.
@@ -2130,6 +2136,8 @@ INSTANTIATE_TEST_CASE_P(
21302136
.expectedStripeCount = 1, // -3 stripes
21312137
.expectedMaxChunkCount = 1,
21322138
.expectedMinChunkCount = 1,
2139+
.chunkedStreamBatchSize = 2,
2140+
21332141
},
21342142
// Low target stripe size bytes (with disabled memory pressure
21352143
// optimization) produces more stripes. Single chunks.
@@ -2144,5 +2152,20 @@ INSTANTIATE_TEST_CASE_P(
21442152
.expectedStripeCount = 7, // +6 stripes
21452153
.expectedMaxChunkCount = 1,
21462154
.expectedMinChunkCount = 1,
2147-
}));
2155+
.chunkedStreamBatchSize = 2,
2156+
2157+
},
2158+
// Higher chunked stream batch size (no change in policy)
2159+
ChunkFlushPolicyTestCase{
2160+
.batchCount = 20,
2161+
.enableChunking = true,
2162+
.targetStripeSizeBytes = 250 << 10, // 250KB
2163+
.writerMemoryHighThresholdBytes = 80 << 10,
2164+
.writerMemoryLowThresholdBytes = 75 << 10,
2165+
.estimatedCompressionFactor = 1.0,
2166+
.minStreamChunkRawSize = 100,
2167+
.expectedStripeCount = 7,
2168+
.expectedMaxChunkCount = 2,
2169+
.expectedMinChunkCount = 1,
2170+
.chunkedStreamBatchSize = 10}));
21482171
} // namespace facebook

0 commit comments

Comments
 (0)