Skip to content

Commit 1dba259

Browse files
macvincentfacebook-github-bot
authored andcommitted
New Flush Policy Implementation With Chunking (#240)
Summary: This is an implementation of the new chunking policy described in this [doc](https://fburl.com/gdoc/gkdwwju1). It works in two phases: **Phase 1 - Memory Pressure Management:** The policy monitors total in-memory data size: * When memory usage exceeds the maximum threshold, initiates chunking to reduce memory footprint while continuing data ingestion * When previous chunking attempts succeeded and memory remains above the minimum threshold, continues chunking to further reduce memory usage * When chunking fails to reduce memory usage effectively and memory stays above the minimum threshold, forces a full stripe flush to guarantee memory relief **Phase 2 - Storage Size Optimization:** Only executed when no memory pressure exists. Implements compression-aware stripe size prediction: * Uses a configurable compression factor as baseline, enhanced with historical compression ratios from previously encoded data when available * Calculates the anticipated final compressed stripe size by applying the estimated compression ratio to unencoded data * Triggers stripe flush when the predicted compressed size reaches the target stripe size threshold Differential Revision: D81516697
1 parent d20b923 commit 1dba259

File tree

3 files changed

+467
-5
lines changed

3 files changed

+467
-5
lines changed

dwio/nimble/velox/FlushPolicy.cpp

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,62 @@ FlushDecision RawStripeSizeFlushPolicy::shouldFlush(
2323
: FlushDecision::None;
2424
}
2525

26+
FlushDecision ChunkFlushPolicy::shouldFlush(
27+
const StripeProgress& stripeProgress) {
28+
const auto relieveMemoryPressure = [&]() {
29+
const uint64_t inMemoryByteSize =
30+
stripeProgress.stripeRawSize + stripeProgress.stripeEncodedSize;
31+
// Determine when we need to start relieving memory pressure with chunking
32+
if (lastFlushDecision_ == FlushDecision::None &&
33+
inMemoryByteSize > config_.writerMaxMemoryBytes) {
34+
return FlushDecision::Chunk;
35+
}
36+
37+
// Try chunking when possible to relieve memory pressure
38+
else if (
39+
lastFlushDecision_ == FlushDecision::Chunk &&
40+
stripeProgress.successfullyChunked &&
41+
inMemoryByteSize > config_.writerMinMemoryBytes) {
42+
return FlushDecision::Chunk;
43+
}
44+
45+
// When chunking is unable to relieve memory pressure, we flush
46+
else if (
47+
lastFlushDecision_ == FlushDecision::Chunk &&
48+
!stripeProgress.successfullyChunked &&
49+
inMemoryByteSize > config_.writerMinMemoryBytes) {
50+
return FlushDecision::Stripe;
51+
}
52+
53+
return FlushDecision::None;
54+
};
55+
56+
lastFlushDecision_ = relieveMemoryPressure();
57+
if (lastFlushDecision_ != FlushDecision::None) {
58+
return lastFlushDecision_;
59+
}
60+
61+
// When no writer memory pressure, optimize for storage stripe size
62+
const auto optimizeStorageSize = [&]() {
63+
double compressionRatio = config_.compressionRatioFactor;
64+
// Use historical compression ratio as a heuristic when available.
65+
if (stripeProgress.stripeEncodedSize > 0) {
66+
compressionRatio *= stripeProgress.stripeEncodedRawSize /
67+
stripeProgress.stripeEncodedSize;
68+
}
69+
double expectedEncodedStripeSize = stripeProgress.stripeEncodedSize +
70+
compressionRatio * stripeProgress.stripeRawSize;
71+
return expectedEncodedStripeSize >= config_.targetStripeSizeBytes
72+
? FlushDecision::Stripe
73+
: FlushDecision::None;
74+
};
75+
lastFlushDecision_ = optimizeStorageSize();
76+
77+
return lastFlushDecision_;
78+
}
79+
80+
void ChunkFlushPolicy::reset() {
81+
lastFlushDecision_ = FlushDecision::None;
82+
}
83+
2684
} // namespace facebook::nimble

dwio/nimble/velox/FlushPolicy.h

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,16 @@
2020

2121
namespace facebook::nimble {
2222

23-
struct StripeProgress {
24-
// Size of the stripe data when it's fully decompressed and decoded
25-
const uint64_t stripeRawSize;
26-
// Size of the stripe after buffered data is encoded and optionally compressed
27-
const uint64_t stripeEncodedSize;
23+
// TODO: Set default values for these parameters.
24+
struct ChunkFlushPolicyConfig {
25+
// Threshold to trigger chunking to relieve memory pressure
26+
const uint64_t writerMaxMemoryBytes;
27+
// Threshold below which chunking stops and stripe size optimization resumes
28+
const uint64_t writerMinMemoryBytes;
29+
// Target size for encoded stripes
30+
const uint64_t targetStripeSizeBytes;
31+
// Controls effect of current compression ratio on final stripe size
32+
const double compressionRatioFactor{0.0};
2833
};
2934

3035
enum class FlushDecision : uint8_t {
@@ -33,10 +38,22 @@ enum class FlushDecision : uint8_t {
3338
Chunk = 2,
3439
};
3540

41+
struct StripeProgress {
42+
// Size of the stripe data when it's fully decompressed and decoded
43+
const uint64_t stripeRawSize;
44+
// Size of the stripe after buffered data is encoded and optionally compressed
45+
const uint64_t stripeEncodedSize;
46+
// Previous raw size of the now encoded stripe data
47+
const uint64_t stripeEncodedRawSize;
48+
// True if the last suggested chunk operation was successful
49+
const bool successfullyChunked;
50+
};
51+
3652
class FlushPolicy {
3753
public:
3854
virtual ~FlushPolicy() = default;
3955
virtual FlushDecision shouldFlush(const StripeProgress& stripeProgress) = 0;
56+
virtual void reset() { /* no-op */ }
4057
};
4158

4259
class RawStripeSizeFlushPolicy final : public FlushPolicy {
@@ -64,4 +81,18 @@ class LambdaFlushPolicy : public FlushPolicy {
6481
std::function<FlushDecision(const StripeProgress&)> lambda_;
6582
};
6683

84+
class ChunkFlushPolicy : public FlushPolicy {
85+
public:
86+
explicit ChunkFlushPolicy(const ChunkFlushPolicyConfig& config)
87+
: config_{config} {}
88+
89+
FlushDecision shouldFlush(const StripeProgress& stripeProgress) override;
90+
91+
void reset() override;
92+
93+
private:
94+
ChunkFlushPolicyConfig config_;
95+
FlushDecision lastFlushDecision_{FlushDecision::None};
96+
};
97+
6798
} // namespace facebook::nimble

0 commit comments

Comments
 (0)