Skip to content

Commit 6d8dfd3

Browse files
macvincentmeta-codesync[bot]
authored andcommitted
feat(Nimble): New Flush Policy Implementation With Chunking (facebookincubator#240)
Summary: X-link: facebookincubator/velox#14846 Pull Request resolved: facebookincubator#240 This is an implementation of the new chunking policy described in this [doc](https://fburl.com/gdoc/gkdwwju1). It has two phases: **Phase 1 - Memory Pressure Management (shouldChunk)** The policy monitors total in-memory data size: * When memory usage exceeds the maximum threshold, initiates chunking to reduce memory footprint while continuing data ingestion * When previous chunking attempts succeeded and memory remains above the minimum threshold, continues chunking to further reduce memory usage **Phase 2 - Storage Size Optimization (shouldFlush)** Implements compression-aware stripe size prediction: * When chunking fails to reduce memory usage effectively and memory stays above the minimum threshold, forces a full stripe flush to guarantee memory relief * Calculates the anticipated final compressed stripe size by applying the estimated compression ratio to unencoded data * Triggers stripe flush when the predicted compressed size reaches the target stripe size threshold `shouldChunk` is also now a separate method required by all flush policies. We updated all previous tests and code references NOTE: The Velox repo change here is just test copied into an experimental directory that references the flush policy. Differential Revision: D81516697
1 parent b6c1c2a commit 6d8dfd3

File tree

8 files changed

+403
-68
lines changed

8 files changed

+403
-68
lines changed

dwio/nimble/velox/FlushPolicy.cpp

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,54 @@ FlushDecision StripeRawSizeFlushPolicy::shouldFlush(
2323
: FlushDecision::None;
2424
}
2525

26+
// Relieve memory pressure with chunking. Tracks state between calls.
27+
ChunkDecision ChunkFlushPolicy::shouldChunk(
28+
const StripeProgress& stripeProgress) {
29+
const auto relieveMemoryPressure = [&]() {
30+
uint64_t inMemoryBytes =
31+
stripeProgress.stripeRawSize + stripeProgress.stripeEncodedSize;
32+
if (lastChunkDecision_ == ChunkDecision::None) {
33+
return inMemoryBytes > config_.writerMemoryHighThresholdBytes
34+
? ChunkDecision::Chunk
35+
: ChunkDecision::None;
36+
}
37+
38+
// Continue chunking while memory pressure remains high but stripe raw
39+
// size is decreasing, indicating progress in memory reduction.
40+
if (lastStripeRawSize_ > stripeProgress.stripeRawSize &&
41+
inMemoryBytes > config_.writerMemoryLowThresholdBytes) {
42+
return ChunkDecision::Chunk;
43+
}
44+
return ChunkDecision::None;
45+
};
46+
47+
lastChunkDecision_ = relieveMemoryPressure();
48+
lastStripeRawSize_ = stripeProgress.stripeRawSize;
49+
return lastChunkDecision_;
50+
}
51+
52+
// Optimize for expected storage stripe size.
53+
// Does not track state between calls.
54+
FlushDecision ChunkFlushPolicy::shouldFlush(
55+
const StripeProgress& stripeProgress) {
56+
// When chunking is unable to relieve memory pressure, we flush stripe.
57+
if (stripeProgress.stripeRawSize + stripeProgress.stripeEncodedSize >
58+
config_.writerMemoryHighThresholdBytes) {
59+
return FlushDecision::Stripe;
60+
}
61+
62+
double compressionFactor = config_.estimatedCompressionFactor;
63+
// Use historical compression ratio as a heuristic when available.
64+
if (stripeProgress.stripeEncodedSize > 0) {
65+
compressionFactor =
66+
static_cast<double>(stripeProgress.stripeEncodedLogicalSize) /
67+
stripeProgress.stripeEncodedSize;
68+
}
69+
double expectedEncodedStripeSize = stripeProgress.stripeEncodedSize +
70+
stripeProgress.stripeRawSize / std::max(compressionFactor, 1.0);
71+
return (expectedEncodedStripeSize >= config_.targetStripeSizeBytes)
72+
? FlushDecision::Stripe
73+
: FlushDecision::None;
74+
}
75+
2676
} // namespace facebook::nimble

dwio/nimble/velox/FlushPolicy.h

Lines changed: 61 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,23 +20,45 @@
2020

2121
namespace facebook::nimble {
2222

23+
// TODO: Set default values for these parameters based on DISCO experiments.
24+
// Use abitrary values for now.
25+
struct ChunkFlushPolicyConfig {
26+
// Threshold to trigger chunking to relieve memory pressure
27+
const uint64_t writerMemoryHighThresholdBytes{200 * 1024L * 1024L};
28+
// Threshold below which chunking stops and stripe size optimization resumes
29+
const uint64_t writerMemoryLowThresholdBytes{100 * 1024L * 1024L};
30+
// Target size for encoded stripes
31+
const uint64_t targetStripeSizeBytes{100 * 1024L * 1024L};
32+
// Expected ratio of raw to encoded data
33+
const double estimatedCompressionFactor{1.3};
34+
};
35+
36+
enum class FlushDecision : bool {
37+
None = false,
38+
Stripe = true,
39+
};
40+
41+
enum class ChunkDecision : bool {
42+
None = false,
43+
Chunk = true,
44+
};
45+
2346
struct StripeProgress {
2447
// Size of the stripe data when it's fully decompressed and decoded
2548
const uint64_t stripeRawSize;
2649
// Size of the stripe after buffered data is encoded and optionally compressed
2750
const uint64_t stripeEncodedSize;
28-
};
29-
30-
enum class FlushDecision : uint8_t {
31-
None = 0,
32-
Stripe = 1,
33-
Chunk = 2,
51+
// Logical size of the now encoded stripe data
52+
const uint64_t stripeEncodedLogicalSize;
3453
};
3554

3655
class FlushPolicy {
3756
public:
3857
virtual ~FlushPolicy() = default;
3958
virtual FlushDecision shouldFlush(const StripeProgress& stripeProgress) = 0;
59+
virtual ChunkDecision shouldChunk(const StripeProgress&) {
60+
return ChunkDecision::None;
61+
}
4062
};
4163

4264
class StripeRawSizeFlushPolicy final : public FlushPolicy {
@@ -53,15 +75,44 @@ class StripeRawSizeFlushPolicy final : public FlushPolicy {
5375
class LambdaFlushPolicy : public FlushPolicy {
5476
public:
5577
explicit LambdaFlushPolicy(
56-
std::function<FlushDecision(const StripeProgress&)> lambda)
57-
: lambda_{std::move(lambda)} {}
78+
std::function<FlushDecision(const StripeProgress&)> flushLambda =
79+
[](const StripeProgress&) { return FlushDecision::None; },
80+
std::function<ChunkDecision(const StripeProgress&)> chunkLambda =
81+
[](const StripeProgress&) { return ChunkDecision::None; })
82+
: flushLambda_{std::move(flushLambda)},
83+
chunkLambda_{std::move(chunkLambda)} {}
5884

5985
FlushDecision shouldFlush(const StripeProgress& stripeProgress) override {
60-
return lambda_(stripeProgress);
86+
return flushLambda_(stripeProgress);
6187
}
6288

89+
ChunkDecision shouldChunk(const StripeProgress& stripeProgress) override {
90+
return chunkLambda_(stripeProgress);
91+
}
92+
93+
private:
94+
std::function<FlushDecision(const StripeProgress&)> flushLambda_;
95+
std::function<ChunkDecision(const StripeProgress&)> chunkLambda_;
96+
};
97+
98+
class ChunkFlushPolicy : public FlushPolicy {
99+
public:
100+
explicit ChunkFlushPolicy(ChunkFlushPolicyConfig config)
101+
: config_{std::move(config)},
102+
lastChunkDecision_{ChunkDecision::None},
103+
lastStripeRawSize_{0} {}
104+
105+
// Optimize for expected storage stripe size.
106+
// Does not track state between calls.
107+
FlushDecision shouldFlush(const StripeProgress& stripeProgress) override;
108+
109+
// Relieve memory pressure with chunking. Tracks state between calls.
110+
ChunkDecision shouldChunk(const StripeProgress& stripeProgress) override;
111+
63112
private:
64-
std::function<FlushDecision(const StripeProgress&)> lambda_;
113+
const ChunkFlushPolicyConfig config_;
114+
ChunkDecision lastChunkDecision_;
115+
uint64_t lastStripeRawSize_;
65116
};
66117

67118
} // namespace facebook::nimble

dwio/nimble/velox/VeloxWriter.cpp

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -846,18 +846,20 @@ bool VeloxWriter::tryWriteStripe(bool force) {
846846
.stripeEncodedSize = context_->stripeSize});
847847
};
848848

849-
auto decision = force ? FlushDecision::Stripe : shouldFlush();
850-
if (decision == FlushDecision::None) {
851-
return false;
852-
}
849+
auto shouldChunk = [&]() {
850+
return context_->flushPolicy->shouldChunk(StripeProgress{
851+
.stripeRawSize = context_->memoryUsed,
852+
.stripeEncodedSize = context_->stripeSize});
853+
};
853854

854855
try {
855856
// TODO: we can improve merge the last chunk write with stripe
856-
if (decision == FlushDecision::Chunk && context_->options.enableChunking) {
857+
if (context_->options.enableChunking &&
858+
shouldChunk() == ChunkDecision::Chunk) {
857859
writeChunk(false);
858-
decision = shouldFlush();
859860
}
860861

862+
auto decision = force ? FlushDecision::Stripe : shouldFlush();
861863
if (decision != FlushDecision::Stripe) {
862864
return false;
863865
}

dwio/nimble/velox/selective/tests/E2EFilterTest.cpp

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -128,13 +128,14 @@ class E2EFilterTest : public dwio::common::E2EFilterTestBase {
128128
writeSchema_ = rowType_;
129129
VeloxWriterOptions options;
130130
options.enableChunking = true;
131-
options.flushPolicyFactory = [] {
131+
auto i = 0;
132+
options.flushPolicyFactory = [&] {
132133
return std::make_unique<LambdaFlushPolicy>(
133-
[i = 0](const StripeProgress&) mutable {
134-
if (i++ % 3 == 2) {
135-
return FlushDecision::Stripe;
136-
}
137-
return FlushDecision::Chunk;
134+
[&](const StripeProgress&) {
135+
return (i++ % 3 == 2) ? FlushDecision::Stripe : FlushDecision::None;
136+
},
137+
[&](const StripeProgress&) {
138+
return (i++ % 3 == 2) ? ChunkDecision::Chunk : ChunkDecision::None;
138139
});
139140
};
140141
if (!flatMapColumns_.empty()) {

dwio/nimble/velox/selective/tests/SelectiveNimbleReaderTest.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -566,7 +566,8 @@ TEST_F(SelectiveNimbleReaderTest, multiChunkNulls) {
566566
options.minStreamChunkRawSize = 0;
567567
options.flushPolicyFactory = [] {
568568
return std::make_unique<LambdaFlushPolicy>(
569-
[](const StripeProgress&) { return FlushDecision::Chunk; });
569+
[](const StripeProgress&) { return FlushDecision::None; },
570+
[](const StripeProgress&) { return ChunkDecision::Chunk; });
570571
};
571572
auto file = test::createNimbleFile(
572573
*rootPool(), {chunk1, chunk2, chunk3}, options, false);
@@ -612,7 +613,8 @@ TEST_F(SelectiveNimbleReaderTest, multiChunkInt16RowSetOverBoundary) {
612613
options.minStreamChunkRawSize = 0;
613614
options.flushPolicyFactory = [] {
614615
return std::make_unique<LambdaFlushPolicy>(
615-
[](const StripeProgress&) { return FlushDecision::Chunk; });
616+
[](const StripeProgress&) { return FlushDecision::None; },
617+
[](const StripeProgress&) { return ChunkDecision::Chunk; });
616618
};
617619
auto file =
618620
test::createNimbleFile(*rootPool(), {chunk1, chunk2}, options, false);

0 commit comments

Comments
 (0)