Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

repos:
- repo: https://github.com/pre-commit/mirrors-clang-format
rev: v18.1.3
rev: v21.1.0
hooks:
- id: clang-format
- repo: https://github.com/BlankSpruce/gersemi
Expand Down
8 changes: 2 additions & 6 deletions dwio/nimble/velox/FlushPolicy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,10 @@

namespace facebook::nimble {

FlushDecision RawStripeSizeFlushPolicy::shouldFlush(
FlushDecision StripeRawSizeFlushPolicy::shouldFlush(
const StripeProgress& stripeProgress) {
return stripeProgress.rawStripeSize >= rawStripeSize_ ? FlushDecision::Stripe
return stripeProgress.stripeRawSize >= stripeRawSize_ ? FlushDecision::Stripe
: FlushDecision::None;
}

void RawStripeSizeFlushPolicy::onClose() {
// No-op
}

} // namespace facebook::nimble
25 changes: 7 additions & 18 deletions dwio/nimble/velox/FlushPolicy.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@ namespace facebook::nimble {

struct StripeProgress {
// Size of the stripe data when it's fully decompressed and decoded
const uint64_t rawStripeSize;
const uint64_t stripeRawSize;
// Size of the stripe after buffered data is encoded and optionally compressed
const uint64_t stripeSize;
// Size of the allocated buffer in the writer
const uint64_t bufferSize;
const uint64_t stripeEncodedSize;
};

enum class FlushDecision : uint8_t {
Expand All @@ -39,38 +37,29 @@ class FlushPolicy {
public:
virtual ~FlushPolicy() = default;
virtual FlushDecision shouldFlush(const StripeProgress& stripeProgress) = 0;
// Required for memory pressure coordination for now. Will remove in the
// future.
virtual void onClose() = 0;
};

class RawStripeSizeFlushPolicy final : public FlushPolicy {
class StripeRawSizeFlushPolicy final : public FlushPolicy {
public:
explicit RawStripeSizeFlushPolicy(uint64_t rawStripeSize)
: rawStripeSize_{rawStripeSize} {}
explicit StripeRawSizeFlushPolicy(uint64_t stripeRawSize)
: stripeRawSize_{stripeRawSize} {}

FlushDecision shouldFlush(const StripeProgress& stripeProgress) override;

void onClose() override;

private:
const uint64_t rawStripeSize_;
const uint64_t stripeRawSize_;
};

class LambdaFlushPolicy : public FlushPolicy {
public:
explicit LambdaFlushPolicy(
std::function<FlushDecision(const StripeProgress&)> lambda)
: lambda_{lambda} {}
: lambda_{std::move(lambda)} {}

FlushDecision shouldFlush(const StripeProgress& stripeProgress) override {
return lambda_(stripeProgress);
}

void onClose() override {
// No-op
}

private:
std::function<FlushDecision(const StripeProgress&)> lambda_;
};
Expand Down
10 changes: 2 additions & 8 deletions dwio/nimble/velox/VeloxWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
#include "dwio/nimble/velox/SchemaSerialization.h"
#include "dwio/nimble/velox/SchemaTypes.h"
#include "dwio/nimble/velox/StatsGenerated.h"
#include "folly/ScopeGuard.h"
#include "velox/common/time/CpuWallTimer.h"
#include "velox/dwio/common/ExecutorBarrier.h"
#include "velox/type/Type.h"
Expand Down Expand Up @@ -552,8 +551,6 @@ void VeloxWriter::close() {

if (file_) {
try {
auto exitGuard =
folly::makeGuard([this]() { context_->flushPolicy->onClose(); });
flush();
root_->close();

Expand Down Expand Up @@ -847,11 +844,8 @@ bool VeloxWriter::tryWriteStripe(bool force) {
auto shouldFlush = [&]() {
return context_->flushPolicy->shouldFlush(
StripeProgress{
.rawStripeSize = context_->memoryUsed,
.stripeSize = context_->stripeSize,
.bufferSize =
static_cast<uint64_t>(context_->bufferMemoryPool->usedBytes()),
});
.stripeRawSize = context_->memoryUsed,
.stripeEncodedSize = context_->stripeSize});
};

auto decision = force ? FlushDecision::Stripe : shouldFlush();
Expand Down
2 changes: 1 addition & 1 deletion dwio/nimble/velox/VeloxWriterOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ struct VeloxWriterOptions {
// Provides policy that controls stripe sizes and memory footprint.
std::function<std::unique_ptr<FlushPolicy>()> flushPolicyFactory = []() {
// Buffering 256MB data before encoding stripes.
return std::make_unique<RawStripeSizeFlushPolicy>(256 << 20);
return std::make_unique<StripeRawSizeFlushPolicy>(256 << 20);
};

// When the writer needs to buffer data, and internal buffers don't have
Expand Down
26 changes: 13 additions & 13 deletions dwio/nimble/velox/tests/VeloxWriterTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,21 +297,21 @@ std::vector<velox::RowVectorPtr> generateBatches(
}
} // namespace

struct RawStripeSizeFlushPolicyTestCase {
struct StripeRawSizeFlushPolicyTestCase {
const size_t batchCount;
const uint32_t rawStripeSize;
const uint32_t stripeCount;
};

class RawStripeSizeFlushPolicyTest
class StripeRawSizeFlushPolicyTest
: public VeloxWriterTests,
public ::testing::WithParamInterface<RawStripeSizeFlushPolicyTestCase> {};
public ::testing::WithParamInterface<StripeRawSizeFlushPolicyTestCase> {};

TEST_P(RawStripeSizeFlushPolicyTest, RawStripeSizeFlushPolicy) {
TEST_P(StripeRawSizeFlushPolicyTest, StripeRawSizeFlushPolicy) {
auto type = velox::ROW({{"simple", velox::INTEGER()}});
nimble::VeloxWriterOptions writerOptions{.flushPolicyFactory = []() {
// Buffering 256MB data before encoding stripes.
return std::make_unique<nimble::RawStripeSizeFlushPolicy>(
return std::make_unique<nimble::StripeRawSizeFlushPolicy>(
GetParam().rawStripeSize);
}};

Expand Down Expand Up @@ -385,7 +385,7 @@ TEST_F(VeloxWriterTests, MemoryReclaimPath) {

TEST_F(VeloxWriterTests, FlushHugeStrings) {
nimble::VeloxWriterOptions writerOptions{.flushPolicyFactory = []() {
return std::make_unique<nimble::RawStripeSizeFlushPolicy>(1 * 1024 * 1024);
return std::make_unique<nimble::StripeRawSizeFlushPolicy>(1 * 1024 * 1024);
}};

velox::test::VectorMaker vectorMaker{leafPool_.get()};
Expand Down Expand Up @@ -1953,26 +1953,26 @@ TEST_F(VeloxWriterTests, RawSizeWritten) {
}

INSTANTIATE_TEST_CASE_P(
RawStripeSizeFlushPolicyTestSuite,
RawStripeSizeFlushPolicyTest,
StripeRawSizeFlushPolicyTestSuite,
StripeRawSizeFlushPolicyTest,
::testing::Values(
RawStripeSizeFlushPolicyTestCase{
StripeRawSizeFlushPolicyTestCase{
.batchCount = 50,
.rawStripeSize = 256 << 10,
.stripeCount = 4},
RawStripeSizeFlushPolicyTestCase{
StripeRawSizeFlushPolicyTestCase{
.batchCount = 100,
.rawStripeSize = 256 << 10,
.stripeCount = 7},
RawStripeSizeFlushPolicyTestCase{
StripeRawSizeFlushPolicyTestCase{
.batchCount = 100,
.rawStripeSize = 256 << 11,
.stripeCount = 4},
RawStripeSizeFlushPolicyTestCase{
StripeRawSizeFlushPolicyTestCase{
.batchCount = 100,
.rawStripeSize = 256 << 12,
.stripeCount = 2},
RawStripeSizeFlushPolicyTestCase{
StripeRawSizeFlushPolicyTestCase{
.batchCount = 100,
.rawStripeSize = 256 << 20,
.stripeCount = 1}));
Expand Down
Loading