Skip to content

Commit 0371580

Browse files
macvincentfacebook-github-bot
authored andcommitted
New Flush Policy Implementation With Chunking (#240)
Summary: Pull Request resolved: #240 Differential Revision: D81516697
1 parent 6c6ab18 commit 0371580

File tree

3 files changed

+433
-5
lines changed

3 files changed

+433
-5
lines changed

dwio/nimble/velox/FlushPolicy.cpp

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,56 @@ FlushDecision RawStripeSizeFlushPolicy::shouldFlush(
2323
: FlushDecision::None;
2424
}
2525

26+
FlushDecision ChunkFlushPolicy::shouldFlush(
27+
const StripeProgress& stripeProgress) {
28+
const auto relieveMemoryPressure = [&]() {
29+
const uint64_t inMemoryByteSize =
30+
stripeProgress.stripeRawSize + stripeProgress.stripeEncodedSize;
31+
// Determine when we need to start relieving memory pressure with chunking
32+
if (lastFlushDecision_ == FlushDecision::None &&
33+
inMemoryByteSize > config_.writerMaxMemoryBytes) {
34+
return FlushDecision::Chunk;
35+
}
36+
37+
// Try chunking when possible to relieve memory pressure
38+
else if (
39+
lastFlushDecision_ == FlushDecision::Chunk &&
40+
stripeProgress.successfullyChunked &&
41+
inMemoryByteSize > config_.writerMinMemoryBytes) {
42+
return FlushDecision::Chunk;
43+
}
44+
45+
// When chunking is unable to relieve memory pressure, we flush
46+
else if (
47+
lastFlushDecision_ == FlushDecision::Chunk &&
48+
!stripeProgress.successfullyChunked &&
49+
inMemoryByteSize > config_.writerMinMemoryBytes) {
50+
return FlushDecision::Stripe;
51+
}
52+
53+
return FlushDecision::None;
54+
};
55+
56+
lastFlushDecision_ = relieveMemoryPressure();
57+
if (lastFlushDecision_ != FlushDecision::None ||
58+
stripeProgress.stripeEncodedSize == 0) {
59+
return lastFlushDecision_;
60+
}
61+
62+
// When no writer memory pressure, optimize for storage stripe size
63+
const auto optimizeStorageSize = [&]() {
64+
double compressionRatio = config_.compressionRatioFactor *
65+
(stripeProgress.stripeEncodedRawSize /
66+
stripeProgress.stripeEncodedSize);
67+
double expectedEncodedStripeSize = stripeProgress.stripeEncodedSize +
68+
compressionRatio * stripeProgress.stripeRawSize;
69+
return expectedEncodedStripeSize >= config_.targetStripeSizeBytes
70+
? FlushDecision::Stripe
71+
: FlushDecision::None;
72+
};
73+
lastFlushDecision_ = optimizeStorageSize();
74+
75+
return lastFlushDecision_;
76+
}
77+
2678
} // namespace facebook::nimble

dwio/nimble/velox/FlushPolicy.h

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,15 @@
2020

2121
namespace facebook::nimble {
2222

23-
struct StripeProgress {
24-
// Size of the stripe data when it's fully decompressed and decoded
25-
const uint64_t stripeRawSize;
26-
// Size of the stripe after buffered data is encoded and optionally compressed
27-
const uint64_t stripeEncodedSize;
23+
struct FlushPolicyConfig {
24+
// Threshold to trigger chunking to relieve memory pressure
25+
const uint64_t writerMaxMemoryBytes;
26+
// Threshold below which chunking stops and stripe size optimization resumes
27+
const uint64_t writerMinMemoryBytes;
28+
// Target size for encoded stripes
29+
const uint64_t targetStripeSizeBytes;
30+
// Controls effect of current compression ratio on final stripe size
31+
const double compressionRatioFactor{0.0};
2832
};
2933

3034
enum class FlushDecision : uint8_t {
@@ -33,6 +37,17 @@ enum class FlushDecision : uint8_t {
3337
Chunk = 2,
3438
};
3539

40+
struct StripeProgress {
41+
// Size of the stripe data when it's fully decompressed and decoded
42+
const uint64_t stripeRawSize;
43+
// Size of the stripe after buffered data is encoded and optionally compressed
44+
const uint64_t stripeEncodedSize;
45+
// Previous raw size of the now encoded stripe data
46+
const uint64_t stripeEncodedRawSize;
47+
// True if the last suggested chunk operation was successful
48+
const bool successfullyChunked;
49+
};
50+
3651
class FlushPolicy {
3752
public:
3853
virtual ~FlushPolicy() = default;
@@ -64,4 +79,16 @@ class LambdaFlushPolicy : public FlushPolicy {
6479
std::function<FlushDecision(const StripeProgress&)> lambda_;
6580
};
6681

82+
class ChunkFlushPolicy : public FlushPolicy {
83+
public:
84+
explicit ChunkFlushPolicy(const FlushPolicyConfig& config)
85+
: config_{config} {}
86+
87+
FlushDecision shouldFlush(const StripeProgress& stripeProgress) override;
88+
89+
private:
90+
FlushPolicyConfig config_;
91+
FlushDecision lastFlushDecision_{FlushDecision::None};
92+
};
93+
6794
} // namespace facebook::nimble

0 commit comments

Comments
 (0)