Skip to content

Commit 5629d57

Browse files
macvincentfacebook-github-bot
authored andcommitted
feat(Nimble): New Flush Policy Implementation With Chunking (#240)
Summary: X-link: facebookexternal/presto-facebook#3412 X-link: facebookincubator/velox#14846 This is an implementation of the new chunking policy described in this [doc](https://fburl.com/gdoc/gkdwwju1). It has two phases: **Phase 1 - Memory Pressure Management (shouldChunk)** The policy monitors total in-memory data size: * When memory usage exceeds the maximum threshold, initiates chunking to reduce memory footprint while continuing data ingestion * While memory remains above the minimum threshold, continues chunking to further reduce memory usage **Phase 2 - Storage Size Optimization (shouldFlush)** Implements compression-aware stripe size prediction: * When chunking fails to reduce memory usage effectively and memory stays above the maximum threshold, forces a full stripe flush to guarantee memory relief * Calculates the anticipated final compressed stripe size by applying the estimated compression ratio to unencoded data * Triggers stripe flush when the predicted compressed size reaches the target stripe size threshold `shouldChunk` is also now a separate method required by all flush policies. We updated all previous tests and code references. Reviewed By: helfman Differential Revision: D81516697
1 parent fbf6ba7 commit 5629d57

File tree

8 files changed

+365
-80
lines changed

8 files changed

+365
-80
lines changed

dwio/nimble/velox/FlushPolicy.cpp

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,35 @@
1616
#include "dwio/nimble/velox/FlushPolicy.h"
1717

1818
namespace facebook::nimble {
19+
// Relieve memory pressure with chunking.
20+
bool ChunkFlushPolicy::shouldChunk(const StripeProgress& stripeProgress) {
21+
const uint64_t inMemoryBytes =
22+
stripeProgress.stripeRawSize + stripeProgress.stripeEncodedSize;
23+
const auto writerMemoryThreshold = (lastChunkDecision_ == false)
24+
? config_.writerMemoryHighThresholdBytes
25+
: config_.writerMemoryLowThresholdBytes;
26+
lastChunkDecision_ = inMemoryBytes > writerMemoryThreshold;
27+
return lastChunkDecision_;
28+
}
29+
30+
// Optimize for expected storage stripe size.
31+
bool ChunkFlushPolicy::shouldFlush(const StripeProgress& stripeProgress) {
32+
// When chunking is unable to relieve memory pressure, we flush stripe.
33+
if (stripeProgress.stripeRawSize + stripeProgress.stripeEncodedSize >
34+
config_.writerMemoryHighThresholdBytes) {
35+
return true;
36+
}
1937

20-
FlushDecision StripeRawSizeFlushPolicy::shouldFlush(
21-
const StripeProgress& stripeProgress) {
22-
return stripeProgress.stripeRawSize >= stripeRawSize_ ? FlushDecision::Stripe
23-
: FlushDecision::None;
38+
double compressionFactor = config_.estimatedCompressionFactor;
39+
// Use historical compression ratio as a heuristic when available.
40+
if (stripeProgress.stripeEncodedSize > 0) {
41+
compressionFactor =
42+
static_cast<double>(stripeProgress.stripeEncodedLogicalSize) /
43+
stripeProgress.stripeEncodedSize;
44+
}
45+
double expectedEncodedStripeSize = stripeProgress.stripeEncodedSize +
46+
stripeProgress.stripeRawSize / std::max(compressionFactor, 1.0);
47+
return (expectedEncodedStripeSize >= config_.targetStripeSizeBytes);
2448
}
2549

2650
} // namespace facebook::nimble

dwio/nimble/velox/FlushPolicy.h

Lines changed: 54 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,31 +20,47 @@
2020

2121
namespace facebook::nimble {
2222

23+
// TODO: Set default values for these parameters based on DISCO experiments.
24+
// Use abitrary values for now.
25+
struct ChunkFlushPolicyConfig {
26+
// Threshold to trigger chunking to relieve memory pressure
27+
const uint64_t writerMemoryHighThresholdBytes{200 * 1024L * 1024L};
28+
// Threshold below which chunking stops and stripe size optimization resumes
29+
const uint64_t writerMemoryLowThresholdBytes{100 * 1024L * 1024L};
30+
// Target size for encoded stripes
31+
const uint64_t targetStripeSizeBytes{100 * 1024L * 1024L};
32+
// Expected ratio of raw to encoded data
33+
const double estimatedCompressionFactor{1.3};
34+
};
35+
2336
struct StripeProgress {
2437
// Size of the stripe data when it's fully decompressed and decoded
2538
const uint64_t stripeRawSize;
2639
// Size of the stripe after buffered data is encoded and optionally compressed
2740
const uint64_t stripeEncodedSize;
28-
};
29-
30-
enum class FlushDecision : uint8_t {
31-
None = 0,
32-
Stripe = 1,
33-
Chunk = 2,
41+
// Logical size of the now encoded stripe data
42+
const uint64_t stripeEncodedLogicalSize;
3443
};
3544

3645
class FlushPolicy {
3746
public:
3847
virtual ~FlushPolicy() = default;
39-
virtual FlushDecision shouldFlush(const StripeProgress& stripeProgress) = 0;
48+
virtual bool shouldFlush(const StripeProgress& stripeProgress) = 0;
49+
virtual bool shouldChunk(const StripeProgress& stripeProgress) = 0;
4050
};
4151

4252
class StripeRawSizeFlushPolicy final : public FlushPolicy {
4353
public:
4454
explicit StripeRawSizeFlushPolicy(uint64_t stripeRawSize)
4555
: stripeRawSize_{stripeRawSize} {}
4656

47-
FlushDecision shouldFlush(const StripeProgress& stripeProgress) override;
57+
bool shouldFlush(const StripeProgress& stripeProgress) override {
58+
return stripeProgress.stripeRawSize >= stripeRawSize_;
59+
}
60+
61+
bool shouldChunk(const StripeProgress&) override {
62+
return false;
63+
}
4864

4965
private:
5066
const uint64_t stripeRawSize_;
@@ -53,15 +69,40 @@ class StripeRawSizeFlushPolicy final : public FlushPolicy {
5369
class LambdaFlushPolicy : public FlushPolicy {
5470
public:
5571
explicit LambdaFlushPolicy(
56-
std::function<FlushDecision(const StripeProgress&)> lambda)
57-
: lambda_{std::move(lambda)} {}
72+
std::function<bool(const StripeProgress&)> flushLambda =
73+
[](const StripeProgress&) { return false; },
74+
std::function<bool(const StripeProgress&)> chunkLambda =
75+
[](const StripeProgress&) { return false; })
76+
: flushLambda_{std::move(flushLambda)},
77+
chunkLambda_{std::move(chunkLambda)} {}
78+
79+
bool shouldFlush(const StripeProgress& stripeProgress) override {
80+
return flushLambda_(stripeProgress);
81+
}
5882

59-
FlushDecision shouldFlush(const StripeProgress& stripeProgress) override {
60-
return lambda_(stripeProgress);
83+
bool shouldChunk(const StripeProgress& stripeProgress) override {
84+
return chunkLambda_(stripeProgress);
6185
}
6286

6387
private:
64-
std::function<FlushDecision(const StripeProgress&)> lambda_;
88+
std::function<bool(const StripeProgress&)> flushLambda_;
89+
std::function<bool(const StripeProgress&)> chunkLambda_;
90+
};
91+
92+
class ChunkFlushPolicy : public FlushPolicy {
93+
public:
94+
explicit ChunkFlushPolicy(ChunkFlushPolicyConfig config)
95+
: config_{std::move(config)}, lastChunkDecision_{false} {}
96+
97+
// Optimize for expected storage stripe size.
98+
bool shouldFlush(const StripeProgress& stripeProgress) override;
99+
100+
// Relieve memory pressure with chunking.
101+
bool shouldChunk(const StripeProgress& stripeProgress) override;
102+
103+
private:
104+
const ChunkFlushPolicyConfig config_;
105+
bool lastChunkDecision_;
65106
};
66107

67108
} // namespace facebook::nimble

dwio/nimble/velox/VeloxWriter.cpp

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -848,19 +848,20 @@ bool VeloxWriter::tryWriteStripe(bool force) {
848848
.stripeEncodedSize = context_->stripeSize});
849849
};
850850

851-
auto decision = force ? FlushDecision::Stripe : shouldFlush();
852-
if (decision == FlushDecision::None) {
853-
return false;
854-
}
851+
auto shouldChunk = [&]() {
852+
return context_->flushPolicy->shouldChunk(
853+
StripeProgress{
854+
.stripeRawSize = context_->memoryUsed,
855+
.stripeEncodedSize = context_->stripeSize});
856+
};
855857

856858
try {
857859
// TODO: we can improve merge the last chunk write with stripe
858-
if (decision == FlushDecision::Chunk && context_->options.enableChunking) {
860+
if (context_->options.enableChunking && shouldChunk()) {
859861
writeChunk(false);
860-
decision = shouldFlush();
861862
}
862863

863-
if (decision != FlushDecision::Stripe) {
864+
if (!(force || shouldFlush())) {
864865
return false;
865866
}
866867

dwio/nimble/velox/selective/tests/E2EFilterTest.cpp

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -128,14 +128,12 @@ class E2EFilterTest : public dwio::common::E2EFilterTestBase {
128128
writeSchema_ = rowType_;
129129
VeloxWriterOptions options;
130130
options.enableChunking = true;
131-
options.flushPolicyFactory = [] {
131+
auto i = 0;
132+
options.flushPolicyFactory = [&] {
132133
return std::make_unique<LambdaFlushPolicy>(
133-
[i = 0](const StripeProgress&) mutable {
134-
if (i++ % 3 == 2) {
135-
return FlushDecision::Stripe;
136-
}
137-
return FlushDecision::Chunk;
138-
});
134+
/*flushLambda=*/[&](const StripeProgress&) { return (i++ % 3 == 2); },
135+
/*chunkLambda=*/
136+
[&](const StripeProgress&) { return (i++ % 3 == 2); });
139137
};
140138
if (!flatMapColumns_.empty()) {
141139
setUpFlatMapColumns();

dwio/nimble/velox/selective/tests/SelectiveNimbleReaderTest.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -566,7 +566,8 @@ TEST_F(SelectiveNimbleReaderTest, multiChunkNulls) {
566566
options.minStreamChunkRawSize = 0;
567567
options.flushPolicyFactory = [] {
568568
return std::make_unique<LambdaFlushPolicy>(
569-
[](const StripeProgress&) { return FlushDecision::Chunk; });
569+
/*flushLambda=*/[](const StripeProgress&) { return false; },
570+
/*chunkLambda=*/[](const StripeProgress&) { return true; });
570571
};
571572
auto file = test::createNimbleFile(
572573
*rootPool(), {chunk1, chunk2, chunk3}, options, false);
@@ -612,7 +613,8 @@ TEST_F(SelectiveNimbleReaderTest, multiChunkInt16RowSetOverBoundary) {
612613
options.minStreamChunkRawSize = 0;
613614
options.flushPolicyFactory = [] {
614615
return std::make_unique<LambdaFlushPolicy>(
615-
[](const StripeProgress&) { return FlushDecision::Chunk; });
616+
/*flushLambda=*/[](const StripeProgress&) { return false; },
617+
/*chunkLambda=*/[](const StripeProgress&) { return true; });
616618
};
617619
auto file =
618620
test::createNimbleFile(*rootPool(), {chunk1, chunk2}, options, false);

0 commit comments

Comments
 (0)