Skip to content

Commit 2fbe79a

Browse files
macvincentfacebook-github-bot
authored andcommitted
Refactor Velox Writer to Use New Flush Policy Contract (#242)
Summary: This should be a no-op since no chunking flush policy is currently being used in Prod. but we make three changes in this dif: 1. `writeChunk` now returns a boolean to indicate whether any stream was successfully chunked 2. The previous raw size of the encoded stripe data in the writer context is now stored in the Writer context 3. We update and pass down the memory stats needed by the new flush policy contract TODO: We will be introducing two more VeloxWriter changes in the next diffs in this stack to: 1. Support per stream chunking instead of always chunking all eligible streams 2. Support breaking down large stream into multiple smaller chunks Differential Revision: D81545433
1 parent 6a5ea5e commit 2fbe79a

File tree

3 files changed

+236
-12
lines changed

3 files changed

+236
-12
lines changed

dwio/nimble/velox/VeloxWriter.cpp

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ namespace detail {
4444
class WriterContext : public FieldWriterContext {
4545
public:
4646
const VeloxWriterOptions options;
47-
std::unique_ptr<FlushPolicy> flushPolicy;
47+
std::function<std::unique_ptr<FlushPolicy>()> flushPolicyFactory;
4848
velox::CpuWallTiming totalFlushTiming;
4949
velox::CpuWallTiming stripeFlushTiming;
5050
velox::CpuWallTiming encodingSelectionTiming;
@@ -57,6 +57,8 @@ class WriterContext : public FieldWriterContext {
5757
uint64_t rowsInFile{0};
5858
uint64_t rowsInStripe{0};
5959
uint64_t stripeSize{0};
60+
// Previous raw size of the now encoded stripe data
61+
uint64_t stripeEncodedRawSize{0};
6062
uint64_t rawSize{0};
6163
std::vector<uint64_t> rowsPerStripe;
6264

@@ -65,8 +67,8 @@ class WriterContext : public FieldWriterContext {
6567
VeloxWriterOptions options)
6668
: FieldWriterContext{memoryPool, options.reclaimerFactory(), options.vectorDecoderVisitor},
6769
options{std::move(options)},
70+
flushPolicyFactory{this->options.flushPolicyFactory},
6871
logger{this->options.metricsLogger} {
69-
flushPolicy = this->options.flushPolicyFactory();
7072
inputBufferGrowthPolicy = this->options.lowMemoryMode
7173
? std::make_unique<ExactGrowthPolicy>()
7274
: this->options.inputGrowthPolicyFactory();
@@ -82,6 +84,7 @@ class WriterContext : public FieldWriterContext {
8284
memoryUsed = 0;
8385
rowsInStripe = 0;
8486
stripeSize = 0;
87+
stripeEncodedRawSize = 0;
8588
++stripeIndex_;
8689
}
8790

@@ -638,9 +641,11 @@ void VeloxWriter::flush() {
638641
}
639642
}
640643

641-
void VeloxWriter::writeChunk(bool lastChunk) {
644+
bool VeloxWriter::writeChunk(bool lastChunk) {
642645
uint64_t previousFlushWallTime = context_->stripeFlushTiming.wallNanos;
643646
std::atomic<uint64_t> chunkSize = 0;
647+
std::atomic<uint64_t> sizeBeforeEncoding = 0;
648+
std::atomic<bool> wroteChunk = false;
644649
{
645650
LoggingScope scope{*context_->logger};
646651
velox::CpuWallTimer veloxTimer{context_->stripeFlushTiming};
@@ -702,6 +707,8 @@ void VeloxWriter::writeChunk(bool lastChunk) {
702707
stream.content.push_back(std::move(buffer));
703708
}
704709
}
710+
wroteChunk = true;
711+
sizeBeforeEncoding += streamData.memoryUsed();
705712
streamData.reset();
706713
};
707714

@@ -711,6 +718,7 @@ void VeloxWriter::writeChunk(bool lastChunk) {
711718
const auto* context =
712719
streamData.descriptor().context<WriterStreamContext>();
713720

721+
// TODO: Breakdown large streams above a threshold into smaller chunks.
714722
const auto minStreamSize =
715723
lastChunk ? 0 : context_->options.minStreamChunkRawSize;
716724

@@ -779,6 +787,12 @@ void VeloxWriter::writeChunk(bool lastChunk) {
779787
}
780788

781789
context_->stripeSize += chunkSize;
790+
context_->stripeEncodedRawSize += sizeBeforeEncoding;
791+
uint64_t memoryUsed = 0;
792+
for (const auto& stream : context_->streams()) {
793+
memoryUsed += stream->memoryUsed();
794+
}
795+
context_->memoryUsed = memoryUsed;
782796
}
783797

784798
// Consider getting this from flush timing.
@@ -787,6 +801,7 @@ void VeloxWriter::writeChunk(bool lastChunk) {
787801
1'000'000;
788802
VLOG(1) << "writeChunk milliseconds: " << flushWallTimeMs
789803
<< ", chunk bytes: " << chunkSize;
804+
return wroteChunk.load();
790805
}
791806

792807
uint32_t VeloxWriter::writeStripe() {
@@ -840,35 +855,53 @@ bool VeloxWriter::tryWriteStripe(bool force) {
840855
return false;
841856
}
842857

858+
auto flushPolicy = context_->flushPolicyFactory();
859+
NIMBLE_DASSERT(flushPolicy != nullptr, "Flush policy must not be null");
860+
843861
auto shouldFlush = [&]() {
844-
return context_->flushPolicy->shouldFlush(StripeProgress{
862+
return flushPolicy->shouldFlush(StripeProgress{
845863
.stripeRawSize = context_->memoryUsed,
846-
.stripeEncodedSize = context_->stripeSize});
864+
.stripeEncodedSize = context_->stripeSize,
865+
.stripeEncodedRawSize = context_->stripeEncodedRawSize});
866+
};
867+
868+
auto shouldChunk = [&]() {
869+
return flushPolicy->shouldChunk(StripeProgress{
870+
.stripeRawSize = context_->memoryUsed,
871+
.stripeEncodedSize = context_->stripeSize,
872+
.stripeEncodedRawSize = context_->stripeEncodedRawSize,
873+
});
847874
};
848875

849876
auto decision = force ? FlushDecision::Stripe : shouldFlush();
877+
878+
if (context_->options.enableChunking && decision == FlushDecision::None) {
879+
decision = shouldChunk();
880+
}
881+
850882
if (decision == FlushDecision::None) {
851883
return false;
852884
}
853885

854886
try {
855887
// TODO: we can improve merge the last chunk write with stripe
856888
if (decision == FlushDecision::Chunk && context_->options.enableChunking) {
857-
writeChunk(false);
858-
decision = shouldFlush();
889+
while (decision == FlushDecision::Chunk && writeChunk(false)) {
890+
decision = shouldChunk();
891+
}
859892
}
860-
893+
decision = (decision != FlushDecision::Stripe) ? shouldFlush() : decision;
861894
if (decision != FlushDecision::Stripe) {
862895
return false;
863896
}
864897

898+
uint32_t stripeSize = writeStripe();
865899
StripeFlushMetrics metrics{
866900
.inputSize = context_->stripeSize,
867901
.rowCount = context_->rowsInStripe,
902+
.stripeSize = stripeSize,
868903
.trackedMemory = context_->memoryUsed,
869904
};
870-
871-
metrics.stripeSize = writeStripe();
872905
context_->logger->logStripeFlush(metrics);
873906

874907
context_->nextStripe();

dwio/nimble/velox/VeloxWriter.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ class VeloxWriter {
8686

8787
// Returning 'true' if stripe was written.
8888
bool tryWriteStripe(bool force = false);
89-
void writeChunk(bool lastChunk = true);
89+
// Returns 'true' if chunk was written.
90+
bool writeChunk(bool lastChunk = true);
9091
uint32_t writeStripe();
9192
};
9293

dwio/nimble/velox/tests/VeloxWriterTests.cpp

Lines changed: 191 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include "dwio/nimble/tablet/Constants.h"
2323
#include "dwio/nimble/velox/ChunkedStream.h"
2424
#include "dwio/nimble/velox/EncodingLayoutTree.h"
25+
#include "dwio/nimble/velox/FlushPolicy.h"
2526
#include "dwio/nimble/velox/SchemaSerialization.h"
2627
#include "dwio/nimble/velox/StatsGenerated.h"
2728
#include "dwio/nimble/velox/VeloxReader.h"
@@ -289,7 +290,7 @@ std::vector<velox::RowVectorPtr> generateBatches(
289290
velox::VectorFuzzer fuzzer(
290291
{.vectorSize = size, .nullRatio = 0.1}, &pool, seed);
291292
std::vector<velox::RowVectorPtr> batches;
292-
293+
batches.reserve(batchCount);
293294
for (size_t i = 0; i < batchCount; ++i) {
294295
batches.push_back(fuzzer.fuzzInputFlatRow(type));
295296
}
@@ -1951,6 +1952,108 @@ TEST_F(VeloxWriterTests, RawSizeWritten) {
19511952
ASSERT_EQ(expectedRawSize, rawSize);
19521953
}
19531954

1955+
struct ChunkFlushPolicyTestCase {
1956+
const size_t batchCount{20};
1957+
const bool enableChunking{true};
1958+
const uint64_t targetStripeSizeBytes{256 << 10};
1959+
const uint64_t writerMaxMemoryBytes{80 << 10};
1960+
const uint64_t writerMinMemoryBytes{75 << 10};
1961+
const double compressionRatio{1.0};
1962+
const uint32_t minStreamChunkRawSize{100};
1963+
const uint32_t expectedStripeCount{0};
1964+
const uint32_t expectedMaxChunkCount{0};
1965+
const uint32_t expectedMinChunkCount{0};
1966+
};
1967+
1968+
class ChunkFlushPolicyTest
1969+
: public VeloxWriterTests,
1970+
public ::testing::WithParamInterface<ChunkFlushPolicyTestCase> {};
1971+
1972+
TEST_P(ChunkFlushPolicyTest, ChunkFlushPolicyIntegration) {
1973+
auto type = velox::ROW(
1974+
{{"BIGINT", velox::BIGINT()}, {"SMALLINT", velox::SMALLINT()}});
1975+
nimble::VeloxWriterOptions writerOptions{
1976+
.minStreamChunkRawSize = GetParam().minStreamChunkRawSize,
1977+
.flushPolicyFactory = GetParam().enableChunking
1978+
? []() -> std::unique_ptr<nimble::FlushPolicy> {
1979+
return std::make_unique<nimble::ChunkFlushPolicy>(
1980+
nimble::ChunkFlushPolicyConfig{
1981+
.writerMaxMemoryBytes = GetParam().writerMaxMemoryBytes,
1982+
.writerMinMemoryBytes = GetParam().writerMinMemoryBytes,
1983+
.targetStripeSizeBytes = GetParam().targetStripeSizeBytes,
1984+
.compressionRatio =
1985+
GetParam().compressionRatio,
1986+
});
1987+
}
1988+
: []() -> std::unique_ptr<nimble::FlushPolicy> {
1989+
return std::make_unique<nimble::StripeRawSizeFlushPolicy>(
1990+
GetParam().targetStripeSizeBytes);
1991+
},
1992+
.enableChunking = GetParam().enableChunking,
1993+
};
1994+
1995+
std::string file;
1996+
auto writeFile = std::make_unique<velox::InMemoryWriteFile>(&file);
1997+
1998+
nimble::VeloxWriter writer(
1999+
*rootPool_, type, std::move(writeFile), std::move(writerOptions));
2000+
auto batches = generateBatches(
2001+
type,
2002+
GetParam().batchCount,
2003+
/*size=*/4000,
2004+
/*seed=*/20221110,
2005+
*leafPool_);
2006+
2007+
for (const auto& batch : batches) {
2008+
writer.write(batch);
2009+
}
2010+
writer.close();
2011+
2012+
velox::InMemoryReadFile readFile(file);
2013+
auto selector = std::make_shared<velox::dwio::common::ColumnSelector>(type);
2014+
nimble::VeloxReader reader(*leafPool_, &readFile, std::move(selector));
2015+
2016+
// Verify stripe count
2017+
auto expectedStripeCount = GetParam().expectedStripeCount;
2018+
auto actualStripeCount = reader.tabletReader().stripeCount();
2019+
EXPECT_EQ(expectedStripeCount, actualStripeCount);
2020+
2021+
// Verify chunk count
2022+
auto chunkCountPair = [&]() {
2023+
nimble::TabletReader tablet{*leafPool_, &readFile};
2024+
uint32_t maxChunkCount = 0;
2025+
uint32_t minChunkCount = std::numeric_limits<uint32_t>::max();
2026+
2027+
for (uint32_t index = 0; index < actualStripeCount; ++index) {
2028+
auto stripeIdentifier = tablet.getStripeIdentifier(index);
2029+
auto streamCount = tablet.streamCount(stripeIdentifier);
2030+
2031+
std::vector<uint32_t> streamIds(streamCount);
2032+
std::iota(streamIds.begin(), streamIds.end(), 0);
2033+
auto streamLoaders = tablet.load(stripeIdentifier, streamIds);
2034+
2035+
for (auto& streamLoader : streamLoaders) {
2036+
if (!streamLoader) {
2037+
continue;
2038+
}
2039+
nimble::InMemoryChunkedStream chunked{
2040+
*leafPool_, std::move(streamLoader)};
2041+
uint32_t chunkCount = 0;
2042+
while (chunked.hasNext()) {
2043+
chunked.nextChunk();
2044+
chunkCount++;
2045+
}
2046+
maxChunkCount = std::max(maxChunkCount, chunkCount);
2047+
minChunkCount = std::min(minChunkCount, chunkCount);
2048+
}
2049+
}
2050+
return std::make_pair(maxChunkCount, minChunkCount);
2051+
};
2052+
auto [maxChunkCount, minChunkCount] = chunkCountPair();
2053+
EXPECT_EQ(GetParam().expectedMaxChunkCount, maxChunkCount);
2054+
EXPECT_EQ(GetParam().expectedMinChunkCount, minChunkCount);
2055+
}
2056+
19542057
INSTANTIATE_TEST_CASE_P(
19552058
StripeRawSizeFlushPolicyTestSuite,
19562059
StripeRawSizeFlushPolicyTest,
@@ -1975,4 +2078,91 @@ INSTANTIATE_TEST_CASE_P(
19752078
.batchCount = 100,
19762079
.rawStripeSize = 256 << 20,
19772080
.stripeCount = 1}));
2081+
2082+
INSTANTIATE_TEST_CASE_P(
2083+
ChunkFlushPolicyTestSuite,
2084+
ChunkFlushPolicyTest,
2085+
::testing::Values(
2086+
// Base case (no chunking, RawStripeSizeFlushPolicy)
2087+
ChunkFlushPolicyTestCase{
2088+
.batchCount = 20,
2089+
.enableChunking = false,
2090+
.targetStripeSizeBytes = 250 << 10, // 250KB
2091+
.writerMaxMemoryBytes = 80 << 10,
2092+
.writerMinMemoryBytes = 75 << 10,
2093+
.compressionRatio = 1.0,
2094+
.minStreamChunkRawSize = 100,
2095+
.expectedStripeCount = 4,
2096+
.expectedMaxChunkCount = 1,
2097+
.expectedMinChunkCount = 1,
2098+
},
2099+
// Base case with default settings (has chunking)
2100+
ChunkFlushPolicyTestCase{
2101+
.batchCount = 20,
2102+
.enableChunking = true,
2103+
.targetStripeSizeBytes = 250 << 10, // 250KB
2104+
.writerMaxMemoryBytes = 80 << 10,
2105+
.writerMinMemoryBytes = 75 << 10,
2106+
.compressionRatio = 1.0,
2107+
.minStreamChunkRawSize = 100,
2108+
.expectedStripeCount = 3,
2109+
.expectedMaxChunkCount = 7,
2110+
.expectedMinChunkCount = 3,
2111+
},
2112+
// High memory regression threshold
2113+
// Produces file identical to RawStripeSizeFlushPolicy
2114+
ChunkFlushPolicyTestCase{
2115+
.batchCount = 20,
2116+
.enableChunking = true,
2117+
.targetStripeSizeBytes = 256 << 10,
2118+
.writerMaxMemoryBytes = 500 << 10, // +420KB
2119+
.writerMinMemoryBytes = 495 << 10, // +420KB
2120+
.compressionRatio = 1.0,
2121+
.minStreamChunkRawSize = 100,
2122+
.expectedStripeCount = 4,
2123+
.expectedMaxChunkCount = 1,
2124+
.expectedMinChunkCount = 1,
2125+
},
2126+
// Low memory regression threshold
2127+
// Produces file with more chunks per stripe
2128+
ChunkFlushPolicyTestCase{
2129+
.batchCount = 20,
2130+
.enableChunking = true,
2131+
.targetStripeSizeBytes = 256 << 10,
2132+
.writerMaxMemoryBytes = 40 << 10, // -40KB
2133+
.writerMinMemoryBytes = 35 << 10, // -40KB
2134+
.compressionRatio = 1.0,
2135+
.minStreamChunkRawSize = 100,
2136+
.expectedStripeCount = 3,
2137+
.expectedMaxChunkCount = 8,
2138+
.expectedMinChunkCount = 4,
2139+
},
2140+
// High target stripe size bytes (with disabled memory pressure
2141+
// optimization) produces fewer stripes. Single chunks.
2142+
ChunkFlushPolicyTestCase{
2143+
.batchCount = 20,
2144+
.enableChunking = true,
2145+
.targetStripeSizeBytes = 900 << 10, // +900KB
2146+
.writerMaxMemoryBytes = 2 << 20, // +2MB
2147+
.writerMinMemoryBytes = 1 << 20, // +1MB
2148+
.compressionRatio = 1.0,
2149+
.minStreamChunkRawSize = 100,
2150+
.expectedStripeCount = 1, // -2 stripes
2151+
.expectedMaxChunkCount = 1,
2152+
.expectedMinChunkCount = 1,
2153+
},
2154+
// Low target stripe size bytes (with disabled memory pressure
2155+
// optimization) produces more stripes. Single chunks.
2156+
ChunkFlushPolicyTestCase{
2157+
.batchCount = 20,
2158+
.enableChunking = true,
2159+
.targetStripeSizeBytes = 90 << 10, // -160KB
2160+
.writerMaxMemoryBytes = 2 << 20, // +2MB
2161+
.writerMinMemoryBytes = 1 << 20, // +1MB
2162+
.compressionRatio = 1.0,
2163+
.minStreamChunkRawSize = 100,
2164+
.expectedStripeCount = 7, // +6 stripes
2165+
.expectedMaxChunkCount = 1,
2166+
.expectedMinChunkCount = 1,
2167+
}));
19782168
} // namespace facebook

0 commit comments

Comments
 (0)