Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 27 additions & 7 deletions dwio/nimble/velox/FlushPolicy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,35 @@
#include "dwio/nimble/velox/FlushPolicy.h"

namespace facebook::nimble {

FlushDecision RawStripeSizeFlushPolicy::shouldFlush(
const StripeProgress& stripeProgress) {
return stripeProgress.rawStripeSize >= rawStripeSize_ ? FlushDecision::Stripe
: FlushDecision::None;
// Relieve memory pressure with chunking.
bool ChunkFlushPolicy::shouldChunk(const StripeProgress& stripeProgress) {
const uint64_t inMemoryBytes =
stripeProgress.stripeRawSize + stripeProgress.stripeEncodedSize;
const auto writerMemoryThreshold = (lastChunkDecision_ == false)
? config_.writerMemoryHighThresholdBytes
: config_.writerMemoryLowThresholdBytes;
lastChunkDecision_ = inMemoryBytes > writerMemoryThreshold;
return lastChunkDecision_;
}

void RawStripeSizeFlushPolicy::onClose() {
// No-op
// Optimize for expected storage stripe size.
bool ChunkFlushPolicy::shouldFlush(const StripeProgress& stripeProgress) {
// When chunking is unable to relieve memory pressure, we flush stripe.
if (stripeProgress.stripeRawSize + stripeProgress.stripeEncodedSize >
config_.writerMemoryHighThresholdBytes) {
return true;
}

double compressionFactor = config_.estimatedCompressionFactor;
// Use historical compression ratio as a heuristic when available.
if (stripeProgress.stripeEncodedSize > 0) {
compressionFactor =
static_cast<double>(stripeProgress.stripeEncodedLogicalSize) /
stripeProgress.stripeEncodedSize;
}
double expectedEncodedStripeSize = stripeProgress.stripeEncodedSize +
stripeProgress.stripeRawSize / std::max(compressionFactor, 1.0);
return (expectedEncodedStripeSize >= config_.targetStripeSizeBytes);
}

} // namespace facebook::nimble
84 changes: 57 additions & 27 deletions dwio/nimble/velox/FlushPolicy.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,59 +20,89 @@

namespace facebook::nimble {

// TODO: Set default values for these parameters based on DISCO experiments.
// Use abitrary values for now.
struct ChunkFlushPolicyConfig {
// Threshold to trigger chunking to relieve memory pressure
const uint64_t writerMemoryHighThresholdBytes{200 * 1024L * 1024L};
// Threshold below which chunking stops and stripe size optimization resumes
const uint64_t writerMemoryLowThresholdBytes{100 * 1024L * 1024L};
// Target size for encoded stripes
const uint64_t targetStripeSizeBytes{100 * 1024L * 1024L};
// Expected ratio of raw to encoded data
const double estimatedCompressionFactor{1.3};
};

struct StripeProgress {
// Size of the stripe data when it's fully decompressed and decoded
const uint64_t rawStripeSize;
const uint64_t stripeRawSize;
// Size of the stripe after buffered data is encoded and optionally compressed
const uint64_t stripeSize;
// Size of the allocated buffer in the writer
const uint64_t bufferSize;
};

enum class FlushDecision : uint8_t {
None = 0,
Stripe = 1,
Chunk = 2,
const uint64_t stripeEncodedSize;
// Logical size of the now encoded stripe data
const uint64_t stripeEncodedLogicalSize;
};

class FlushPolicy {
public:
virtual ~FlushPolicy() = default;
virtual FlushDecision shouldFlush(const StripeProgress& stripeProgress) = 0;
// Required for memory pressure coordination for now. Will remove in the
// future.
virtual void onClose() = 0;
virtual bool shouldFlush(const StripeProgress& stripeProgress) = 0;
virtual bool shouldChunk(const StripeProgress& stripeProgress) = 0;
};

class RawStripeSizeFlushPolicy final : public FlushPolicy {
class StripeRawSizeFlushPolicy final : public FlushPolicy {
public:
explicit RawStripeSizeFlushPolicy(uint64_t rawStripeSize)
: rawStripeSize_{rawStripeSize} {}
explicit StripeRawSizeFlushPolicy(uint64_t stripeRawSize)
: stripeRawSize_{stripeRawSize} {}

FlushDecision shouldFlush(const StripeProgress& stripeProgress) override;
bool shouldFlush(const StripeProgress& stripeProgress) override {
return stripeProgress.stripeRawSize >= stripeRawSize_;
}

void onClose() override;
bool shouldChunk(const StripeProgress&) override {
return false;
}

private:
const uint64_t rawStripeSize_;
const uint64_t stripeRawSize_;
};

class LambdaFlushPolicy : public FlushPolicy {
public:
explicit LambdaFlushPolicy(
std::function<FlushDecision(const StripeProgress&)> lambda)
: lambda_{lambda} {}
std::function<bool(const StripeProgress&)> flushLambda =
[](const StripeProgress&) { return false; },
std::function<bool(const StripeProgress&)> chunkLambda =
[](const StripeProgress&) { return false; })
: flushLambda_{std::move(flushLambda)},
chunkLambda_{std::move(chunkLambda)} {}

FlushDecision shouldFlush(const StripeProgress& stripeProgress) override {
return lambda_(stripeProgress);
bool shouldFlush(const StripeProgress& stripeProgress) override {
return flushLambda_(stripeProgress);
}

void onClose() override {
// No-op
bool shouldChunk(const StripeProgress& stripeProgress) override {
return chunkLambda_(stripeProgress);
}

private:
std::function<FlushDecision(const StripeProgress&)> lambda_;
std::function<bool(const StripeProgress&)> flushLambda_;
std::function<bool(const StripeProgress&)> chunkLambda_;
};

class ChunkFlushPolicy : public FlushPolicy {
public:
explicit ChunkFlushPolicy(ChunkFlushPolicyConfig config)
: config_{std::move(config)}, lastChunkDecision_{false} {}

// Optimize for expected storage stripe size.
bool shouldFlush(const StripeProgress& stripeProgress) override;

// Relieve memory pressure with chunking.
bool shouldChunk(const StripeProgress& stripeProgress) override;

private:
const ChunkFlushPolicyConfig config_;
bool lastChunkDecision_;
};

} // namespace facebook::nimble
24 changes: 9 additions & 15 deletions dwio/nimble/velox/VeloxWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
#include "dwio/nimble/velox/SchemaSerialization.h"
#include "dwio/nimble/velox/SchemaTypes.h"
#include "dwio/nimble/velox/StatsGenerated.h"
#include "folly/ScopeGuard.h"
#include "velox/common/time/CpuWallTimer.h"
#include "velox/dwio/common/ExecutorBarrier.h"
#include "velox/type/Type.h"
Expand Down Expand Up @@ -551,8 +550,6 @@ void VeloxWriter::close() {

if (file_) {
try {
auto exitGuard =
folly::makeGuard([this]() { context_->flushPolicy->onClose(); });
flush();
root_->close();

Expand Down Expand Up @@ -845,26 +842,23 @@ bool VeloxWriter::tryWriteStripe(bool force) {

auto shouldFlush = [&]() {
return context_->flushPolicy->shouldFlush(StripeProgress{
.rawStripeSize = context_->memoryUsed,
.stripeSize = context_->stripeSize,
.bufferSize =
static_cast<uint64_t>(context_->bufferMemoryPool->usedBytes()),
});
.stripeRawSize = context_->memoryUsed,
.stripeEncodedSize = context_->stripeSize});
};

auto decision = force ? FlushDecision::Stripe : shouldFlush();
if (decision == FlushDecision::None) {
return false;
}
auto shouldChunk = [&]() {
return context_->flushPolicy->shouldChunk(StripeProgress{
.stripeRawSize = context_->memoryUsed,
.stripeEncodedSize = context_->stripeSize});
};

try {
// TODO: we can improve merge the last chunk write with stripe
if (decision == FlushDecision::Chunk && context_->options.enableChunking) {
if (context_->options.enableChunking && shouldChunk()) {
writeChunk(false);
decision = shouldFlush();
}

if (decision != FlushDecision::Stripe) {
if (!(force || shouldFlush())) {
return false;
}

Expand Down
2 changes: 1 addition & 1 deletion dwio/nimble/velox/VeloxWriterOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ struct VeloxWriterOptions {
// Provides policy that controls stripe sizes and memory footprint.
std::function<std::unique_ptr<FlushPolicy>()> flushPolicyFactory = []() {
// Buffering 256MB data before encoding stripes.
return std::make_unique<RawStripeSizeFlushPolicy>(256 << 20);
return std::make_unique<StripeRawSizeFlushPolicy>(256 << 20);
};

// When the writer needs to buffer data, and internal buffers don't have
Expand Down
11 changes: 4 additions & 7 deletions dwio/nimble/velox/selective/tests/E2EFilterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,11 @@ class E2EFilterTest : public dwio::common::E2EFilterTestBase {
writeSchema_ = rowType_;
VeloxWriterOptions options;
options.enableChunking = true;
options.flushPolicyFactory = [] {
auto i = 0;
options.flushPolicyFactory = [&] {
return std::make_unique<LambdaFlushPolicy>(
[i = 0](const StripeProgress&) mutable {
if (i++ % 3 == 2) {
return FlushDecision::Stripe;
}
return FlushDecision::Chunk;
});
[&](const StripeProgress&) { return (i++ % 3 == 2) ? true : false; },
[&](const StripeProgress&) { return (i++ % 3 == 2) ? true : false; });
};
if (!flatMapColumns_.empty()) {
setUpFlatMapColumns();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,8 @@ TEST_F(SelectiveNimbleReaderTest, multiChunkNulls) {
options.minStreamChunkRawSize = 0;
options.flushPolicyFactory = [] {
return std::make_unique<LambdaFlushPolicy>(
[](const StripeProgress&) { return FlushDecision::Chunk; });
/*flushLambda=*/[](const StripeProgress&) { return false; },
/*chunkLambda=*/[](const StripeProgress&) { return true; });
};
auto file = test::createNimbleFile(
*rootPool(), {chunk1, chunk2, chunk3}, options, false);
Expand Down Expand Up @@ -612,7 +613,8 @@ TEST_F(SelectiveNimbleReaderTest, multiChunkInt16RowSetOverBoundary) {
options.minStreamChunkRawSize = 0;
options.flushPolicyFactory = [] {
return std::make_unique<LambdaFlushPolicy>(
[](const StripeProgress&) { return FlushDecision::Chunk; });
/*flushLambda=*/[](const StripeProgress&) { return false; },
/*chunkLambda=*/[](const StripeProgress&) { return true; });
};
auto file =
test::createNimbleFile(*rootPool(), {chunk1, chunk2}, options, false);
Expand Down
Loading
Loading