Skip to content

Commit 660170e

Browse files
macvincentfacebook-github-bot
authored andcommitted
Refactor Velox Writer to Use New Flush Policy Contract (facebookincubator#242)
Summary: This should be a no-op since no chunking flush policy is currently being used in Prod. but we make three changes in this dif: 1. `writeChunk` now returns a boolean to indicate whether any stream was successfully chunked 2. The previous raw size of the encoded stripe data in the writer context is now stored in the Writer context 3. We update and pass down the memory stats needed by the new flush policy contract TODO: We will be introducing two more VeloxWriter changes in the next diffs in this stack to: 1. Support per stream chunking instead of always chunking all eligible streams 2. Support breaking down large stream into multiple smaller chunks Differential Revision: D81545433
1 parent 358d997 commit 660170e

File tree

3 files changed

+223
-14
lines changed

3 files changed

+223
-14
lines changed

dwio/nimble/velox/VeloxWriter.cpp

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ namespace detail {
4444
class WriterContext : public FieldWriterContext {
4545
public:
4646
const VeloxWriterOptions options;
47-
std::unique_ptr<FlushPolicy> flushPolicy;
47+
std::function<std::unique_ptr<FlushPolicy>()> flushPolicyFactory;
4848
velox::CpuWallTiming totalFlushTiming;
4949
velox::CpuWallTiming stripeFlushTiming;
5050
velox::CpuWallTiming encodingSelectionTiming;
@@ -57,6 +57,8 @@ class WriterContext : public FieldWriterContext {
5757
uint64_t rowsInFile{0};
5858
uint64_t rowsInStripe{0};
5959
uint64_t stripeSize{0};
60+
// Logical raw size of the encoded stripe data
61+
uint64_t stripeEncodedLogicalSize{0};
6062
uint64_t rawSize{0};
6163
std::vector<uint64_t> rowsPerStripe;
6264

@@ -65,8 +67,8 @@ class WriterContext : public FieldWriterContext {
6567
VeloxWriterOptions options)
6668
: FieldWriterContext{memoryPool, options.reclaimerFactory(), options.vectorDecoderVisitor},
6769
options{std::move(options)},
70+
flushPolicyFactory{this->options.flushPolicyFactory},
6871
logger{this->options.metricsLogger} {
69-
flushPolicy = this->options.flushPolicyFactory();
7072
inputBufferGrowthPolicy = this->options.lowMemoryMode
7173
? std::make_unique<ExactGrowthPolicy>()
7274
: this->options.inputGrowthPolicyFactory();
@@ -82,6 +84,7 @@ class WriterContext : public FieldWriterContext {
8284
memoryUsed = 0;
8385
rowsInStripe = 0;
8486
stripeSize = 0;
87+
stripeEncodedLogicalSize = 0;
8588
++stripeIndex_;
8689
}
8790

@@ -638,9 +641,11 @@ void VeloxWriter::flush() {
638641
}
639642
}
640643

641-
void VeloxWriter::writeChunk(bool lastChunk) {
644+
bool VeloxWriter::writeChunk(bool lastChunk) {
642645
uint64_t previousFlushWallTime = context_->stripeFlushTiming.wallNanos;
643646
std::atomic<uint64_t> chunkSize = 0;
647+
std::atomic<uint64_t> logicalSizeBeforeEncoding = 0;
648+
std::atomic<bool> wroteChunk = false;
644649
{
645650
LoggingScope scope{*context_->logger};
646651
velox::CpuWallTimer veloxTimer{context_->stripeFlushTiming};
@@ -702,6 +707,8 @@ void VeloxWriter::writeChunk(bool lastChunk) {
702707
stream.content.push_back(std::move(buffer));
703708
}
704709
}
710+
wroteChunk = true;
711+
logicalSizeBeforeEncoding += streamData.memoryUsed();
705712
streamData.reset();
706713
};
707714

@@ -711,6 +718,7 @@ void VeloxWriter::writeChunk(bool lastChunk) {
711718
const auto* context =
712719
streamData.descriptor().context<WriterStreamContext>();
713720

721+
// TODO: Breakdown large streams above a threshold into smaller chunks.
714722
const auto minStreamSize =
715723
lastChunk ? 0 : context_->options.minStreamChunkRawSize;
716724

@@ -779,6 +787,8 @@ void VeloxWriter::writeChunk(bool lastChunk) {
779787
}
780788

781789
context_->stripeSize += chunkSize;
790+
context_->stripeEncodedLogicalSize += logicalSizeBeforeEncoding;
791+
context_->memoryUsed -= logicalSizeBeforeEncoding;
782792
}
783793

784794
// Consider getting this from flush timing.
@@ -787,6 +797,7 @@ void VeloxWriter::writeChunk(bool lastChunk) {
787797
1'000'000;
788798
VLOG(1) << "writeChunk milliseconds: " << flushWallTimeMs
789799
<< ", chunk bytes: " << chunkSize;
800+
return wroteChunk.load();
790801
}
791802

792803
uint32_t VeloxWriter::writeStripe() {
@@ -840,37 +851,43 @@ bool VeloxWriter::tryWriteStripe(bool force) {
840851
return false;
841852
}
842853

854+
auto flushPolicy = context_->flushPolicyFactory();
855+
NIMBLE_DASSERT(flushPolicy != nullptr, "Flush policy must not be null");
856+
843857
auto shouldFlush = [&]() {
844-
return context_->flushPolicy->shouldFlush(StripeProgress{
858+
return flushPolicy->shouldFlush(StripeProgress{
845859
.stripeRawSize = context_->memoryUsed,
846-
.stripeEncodedSize = context_->stripeSize});
860+
.stripeEncodedSize = context_->stripeSize,
861+
.stripeEncodedLogicalSize = context_->stripeEncodedLogicalSize});
847862
};
848863

849864
auto shouldChunk = [&]() {
850-
return context_->flushPolicy->shouldChunk(StripeProgress{
865+
return flushPolicy->shouldChunk(StripeProgress{
851866
.stripeRawSize = context_->memoryUsed,
852-
.stripeEncodedSize = context_->stripeSize});
867+
.stripeEncodedSize = context_->stripeSize,
868+
.stripeEncodedLogicalSize = context_->stripeEncodedLogicalSize,
869+
});
853870
};
854871

855872
try {
856873
// TODO: we can improve merge the last chunk write with stripe
857-
if (context_->options.enableChunking &&
858-
shouldChunk() == ChunkDecision::Chunk) {
859-
writeChunk(false);
874+
if (context_->options.enableChunking) {
875+
while (shouldChunk() == ChunkDecision::Chunk && writeChunk(false)) {
876+
}
860877
}
861878

862879
auto decision = force ? FlushDecision::Stripe : shouldFlush();
863880
if (decision != FlushDecision::Stripe) {
864881
return false;
865882
}
866883

884+
uint32_t stripeSize = writeStripe();
867885
StripeFlushMetrics metrics{
868886
.inputSize = context_->stripeSize,
869887
.rowCount = context_->rowsInStripe,
888+
.stripeSize = stripeSize,
870889
.trackedMemory = context_->memoryUsed,
871890
};
872-
873-
metrics.stripeSize = writeStripe();
874891
context_->logger->logStripeFlush(metrics);
875892

876893
context_->nextStripe();

dwio/nimble/velox/VeloxWriter.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ class VeloxWriter {
8686

8787
// Returning 'true' if stripe was written.
8888
bool tryWriteStripe(bool force = false);
89-
void writeChunk(bool lastChunk = true);
89+
// Returns 'true' if chunk was written.
90+
bool writeChunk(bool lastChunk = true);
9091
uint32_t writeStripe();
9192
};
9293

dwio/nimble/velox/tests/VeloxWriterTests.cpp

Lines changed: 192 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include "dwio/nimble/tablet/Constants.h"
2323
#include "dwio/nimble/velox/ChunkedStream.h"
2424
#include "dwio/nimble/velox/EncodingLayoutTree.h"
25+
#include "dwio/nimble/velox/FlushPolicy.h"
2526
#include "dwio/nimble/velox/SchemaSerialization.h"
2627
#include "dwio/nimble/velox/StatsGenerated.h"
2728
#include "dwio/nimble/velox/VeloxReader.h"
@@ -289,7 +290,7 @@ std::vector<velox::RowVectorPtr> generateBatches(
289290
velox::VectorFuzzer fuzzer(
290291
{.vectorSize = size, .nullRatio = 0.1}, &pool, seed);
291292
std::vector<velox::RowVectorPtr> batches;
292-
293+
batches.reserve(batchCount);
293294
for (size_t i = 0; i < batchCount; ++i) {
294295
batches.push_back(fuzzer.fuzzInputFlatRow(type));
295296
}
@@ -1953,6 +1954,109 @@ TEST_F(VeloxWriterTests, RawSizeWritten) {
19531954
ASSERT_EQ(expectedRawSize, rawSize);
19541955
}
19551956

1957+
struct ChunkFlushPolicyTestCase {
1958+
const size_t batchCount{20};
1959+
const bool enableChunking{true};
1960+
const uint64_t targetStripeSizeBytes{256 << 10};
1961+
const uint64_t writerMemoryHighThreshold{80 << 10};
1962+
const uint64_t writerMemoryLowThreshold{75 << 10};
1963+
const double compressionRatioFactor{1.0};
1964+
const uint32_t minStreamChunkRawSize{100};
1965+
const uint32_t expectedStripeCount{0};
1966+
const uint32_t expectedMaxChunkCount{0};
1967+
const uint32_t expectedMinChunkCount{0};
1968+
};
1969+
1970+
class ChunkFlushPolicyTest
1971+
: public VeloxWriterTests,
1972+
public ::testing::WithParamInterface<ChunkFlushPolicyTestCase> {};
1973+
1974+
TEST_P(ChunkFlushPolicyTest, ChunkFlushPolicyIntegration) {
1975+
auto type = velox::ROW(
1976+
{{"BIGINT", velox::BIGINT()}, {"SMALLINT", velox::SMALLINT()}});
1977+
nimble::VeloxWriterOptions writerOptions{
1978+
.minStreamChunkRawSize = GetParam().minStreamChunkRawSize,
1979+
.flushPolicyFactory = GetParam().enableChunking
1980+
? []() -> std::unique_ptr<nimble::FlushPolicy> {
1981+
return std::make_unique<nimble::ChunkFlushPolicy>(
1982+
std::make_shared<const nimble::ChunkFlushPolicyConfig>(
1983+
nimble::ChunkFlushPolicyConfig{
1984+
.writerMemoryHighThreshold = GetParam().writerMemoryHighThreshold,
1985+
.writerMemoryLowThreshold = GetParam().writerMemoryLowThreshold,
1986+
.targetStripeSizeBytes = GetParam().targetStripeSizeBytes,
1987+
.compressionRatioFactor =
1988+
GetParam().compressionRatioFactor,
1989+
}));
1990+
}
1991+
: []() -> std::unique_ptr<nimble::FlushPolicy> {
1992+
return std::make_unique<nimble::StripeRawSizeFlushPolicy>(
1993+
GetParam().targetStripeSizeBytes);
1994+
},
1995+
.enableChunking = GetParam().enableChunking,
1996+
};
1997+
1998+
std::string file;
1999+
auto writeFile = std::make_unique<velox::InMemoryWriteFile>(&file);
2000+
2001+
nimble::VeloxWriter writer(
2002+
*rootPool_, type, std::move(writeFile), std::move(writerOptions));
2003+
auto batches = generateBatches(
2004+
type,
2005+
GetParam().batchCount,
2006+
/*size=*/4000,
2007+
/*seed=*/20221110,
2008+
*leafPool_);
2009+
2010+
for (const auto& batch : batches) {
2011+
writer.write(batch);
2012+
}
2013+
writer.close();
2014+
2015+
velox::InMemoryReadFile readFile(file);
2016+
auto selector = std::make_shared<velox::dwio::common::ColumnSelector>(type);
2017+
nimble::VeloxReader reader(*leafPool_, &readFile, std::move(selector));
2018+
2019+
// Verify stripe count
2020+
auto expectedStripeCount = GetParam().expectedStripeCount;
2021+
auto actualStripeCount = reader.tabletReader().stripeCount();
2022+
EXPECT_EQ(expectedStripeCount, actualStripeCount);
2023+
2024+
// Verify chunk count
2025+
auto chunkCountPair = [&]() {
2026+
nimble::TabletReader tablet{*leafPool_, &readFile};
2027+
uint32_t maxChunkCount = 0;
2028+
uint32_t minChunkCount = std::numeric_limits<uint32_t>::max();
2029+
2030+
for (uint32_t index = 0; index < actualStripeCount; ++index) {
2031+
auto stripeIdentifier = tablet.getStripeIdentifier(index);
2032+
auto streamCount = tablet.streamCount(stripeIdentifier);
2033+
2034+
std::vector<uint32_t> streamIds(streamCount);
2035+
std::iota(streamIds.begin(), streamIds.end(), 0);
2036+
auto streamLoaders = tablet.load(stripeIdentifier, streamIds);
2037+
2038+
for (auto& streamLoader : streamLoaders) {
2039+
if (!streamLoader) {
2040+
continue;
2041+
}
2042+
nimble::InMemoryChunkedStream chunked{
2043+
*leafPool_, std::move(streamLoader)};
2044+
uint32_t chunkCount = 0;
2045+
while (chunked.hasNext()) {
2046+
chunked.nextChunk();
2047+
chunkCount++;
2048+
}
2049+
maxChunkCount = std::max(maxChunkCount, chunkCount);
2050+
minChunkCount = std::min(minChunkCount, chunkCount);
2051+
}
2052+
}
2053+
return std::make_pair(maxChunkCount, minChunkCount);
2054+
};
2055+
auto [maxChunkCount, minChunkCount] = chunkCountPair();
2056+
EXPECT_EQ(GetParam().expectedMaxChunkCount, maxChunkCount);
2057+
EXPECT_EQ(GetParam().expectedMinChunkCount, minChunkCount);
2058+
}
2059+
19562060
INSTANTIATE_TEST_CASE_P(
19572061
StripeRawSizeFlushPolicyTestSuite,
19582062
StripeRawSizeFlushPolicyTest,
@@ -1977,4 +2081,91 @@ INSTANTIATE_TEST_CASE_P(
19772081
.batchCount = 100,
19782082
.rawStripeSize = 256 << 20,
19792083
.stripeCount = 1}));
2084+
2085+
INSTANTIATE_TEST_CASE_P(
2086+
ChunkFlushPolicyTestSuite,
2087+
ChunkFlushPolicyTest,
2088+
::testing::Values(
2089+
// Base case (no chunking, RawStripeSizeFlushPolicy)
2090+
ChunkFlushPolicyTestCase{
2091+
.batchCount = 20,
2092+
.enableChunking = false,
2093+
.targetStripeSizeBytes = 250 << 10, // 250KB
2094+
.writerMemoryHighThreshold = 80 << 10,
2095+
.writerMemoryLowThreshold = 75 << 10,
2096+
.compressionRatioFactor = 1.0,
2097+
.minStreamChunkRawSize = 100,
2098+
.expectedStripeCount = 4,
2099+
.expectedMaxChunkCount = 1,
2100+
.expectedMinChunkCount = 1,
2101+
},
2102+
// Base case with default settings (has chunking)
2103+
ChunkFlushPolicyTestCase{
2104+
.batchCount = 20,
2105+
.enableChunking = true,
2106+
.targetStripeSizeBytes = 250 << 10, // 250KB
2107+
.writerMemoryHighThreshold = 80 << 10,
2108+
.writerMemoryLowThreshold = 75 << 10,
2109+
.compressionRatioFactor = 1.0,
2110+
.minStreamChunkRawSize = 100,
2111+
.expectedStripeCount = 3,
2112+
.expectedMaxChunkCount = 7,
2113+
.expectedMinChunkCount = 3,
2114+
},
2115+
// High memory regression threshold
2116+
// Produces file identical to RawStripeSizeFlushPolicy
2117+
ChunkFlushPolicyTestCase{
2118+
.batchCount = 20,
2119+
.enableChunking = true,
2120+
.targetStripeSizeBytes = 256 << 10,
2121+
.writerMemoryHighThreshold = 500 << 10, // +420KB
2122+
.writerMemoryLowThreshold = 495 << 10, // +420KB
2123+
.compressionRatioFactor = 1.0,
2124+
.minStreamChunkRawSize = 100,
2125+
.expectedStripeCount = 4,
2126+
.expectedMaxChunkCount = 1,
2127+
.expectedMinChunkCount = 1,
2128+
},
2129+
// Low memory regression threshold
2130+
// Produces file with more chunks per stripe
2131+
ChunkFlushPolicyTestCase{
2132+
.batchCount = 20,
2133+
.enableChunking = true,
2134+
.targetStripeSizeBytes = 256 << 10,
2135+
.writerMemoryHighThreshold = 40 << 10, // -40KB
2136+
.writerMemoryLowThreshold = 35 << 10, // -40KB
2137+
.compressionRatioFactor = 1.0,
2138+
.minStreamChunkRawSize = 100,
2139+
.expectedStripeCount = 3,
2140+
.expectedMaxChunkCount = 8,
2141+
.expectedMinChunkCount = 4,
2142+
},
2143+
// High target stripe size bytes (with disabled memory pressure
2144+
// optimization) produces fewer stripes. Single chunks.
2145+
ChunkFlushPolicyTestCase{
2146+
.batchCount = 20,
2147+
.enableChunking = true,
2148+
.targetStripeSizeBytes = 900 << 10, // +900KB
2149+
.writerMemoryHighThreshold = 2 << 20, // +2MB
2150+
.writerMemoryLowThreshold = 1 << 20, // +1MB
2151+
.compressionRatioFactor = 1.0,
2152+
.minStreamChunkRawSize = 100,
2153+
.expectedStripeCount = 1, // -2 stripes
2154+
.expectedMaxChunkCount = 1,
2155+
.expectedMinChunkCount = 1,
2156+
},
2157+
// Low target stripe size bytes (with disabled memory pressure
2158+
// optimization) produces more stripes. Single chunks.
2159+
ChunkFlushPolicyTestCase{
2160+
.batchCount = 20,
2161+
.enableChunking = true,
2162+
.targetStripeSizeBytes = 90 << 10, // -160KB
2163+
.writerMemoryHighThreshold = 2 << 20, // +2MB
2164+
.writerMemoryLowThreshold = 1 << 20, // +1MB
2165+
.compressionRatioFactor = 1.0,
2166+
.minStreamChunkRawSize = 100,
2167+
.expectedStripeCount = 7, // +6 stripes
2168+
.expectedMaxChunkCount = 1,
2169+
.expectedMinChunkCount = 1,
2170+
}));
19802171
} // namespace facebook

0 commit comments

Comments
 (0)