Skip to content

Commit 6926070

Browse files
macvincentmeta-codesync[bot]
authored andcommitted
Integrate Max Stream Size Chunking in Velox Writer (#249)
Summary: Pull Request resolved: #249 This is the last feature of the new chunking policy described in this [doc](https://fburl.com/gdoc/gkdwwju1). Here, we break down large streams into multiple chunks of size up to `maxStreamChunkRawSize`. This protects the reader from attempting to materialize huge chunks. We included StreamData support for this in the previous diff. In this diff, we integrate with the VeloxWriter. With this change, while memory pressure is detected, we: 1. Chunk large streams above `maxStreamChunkRawSize`, retaining stream data below the limit. 2. If there is still memory pressure after the first step, chunk streams with size above `minStreamChunkRawSize`. During stripe flush, we chunk all remaining data, breaking down streams above `maxStreamChunkRawSize` into smaller chunks. --- The general chunking policy has two phases: ## **Phase 1 - Memory Pressure Management (shouldChunk)** The policy monitors total in-memory data size: - When memory usage exceeds the maximum threshold, it initiates chunking to reduce memory footprint while continuing data ingestion. - When previous chunking attempts succeeded and memory remains above the minimum threshold, it continues chunking to further reduce memory usage. - When chunking fails to reduce memory usage effectively and memory stays above the minimum threshold, it forces a full stripe flush to guarantee memory relief. ## **Phase 2 - Storage Size Optimization (shouldFlush)** Implements compression-aware stripe size prediction: - Calculates the anticipated final compressed stripe size by applying the estimated compression ratio to unencoded data. - Triggers stripe flush when the predicted compressed size reaches the target stripe size threshold. Differential Revision: D82175496
1 parent df2c18e commit 6926070

File tree

5 files changed

+106
-68
lines changed

5 files changed

+106
-68
lines changed

dwio/nimble/velox/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ add_library(
153153
VeloxWriter.cpp
154154
ChunkedStreamWriter.cpp
155155
VeloxWriterDefaultMetadataOSS.cpp
156+
StreamChunker.cpp
156157
)
157158
target_link_libraries(
158159
nimble_velox_writer

dwio/nimble/velox/VeloxWriter.cpp

Lines changed: 69 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include "dwio/nimble/velox/SchemaSerialization.h"
3434
#include "dwio/nimble/velox/SchemaTypes.h"
3535
#include "dwio/nimble/velox/StatsGenerated.h"
36+
#include "dwio/nimble/velox/StreamChunker.h"
3637
#include "velox/common/time/CpuWallTimer.h"
3738
#include "velox/dwio/common/ExecutorBarrier.h"
3839
#include "velox/type/Type.h"
@@ -806,6 +807,7 @@ void VeloxWriter::writeChunk(bool lastChunk) {
806807

807808
bool VeloxWriter::writeChunks(
808809
std::span<const uint32_t> streamIndices,
810+
bool ensureFullChunks,
809811
bool lastChunk) {
810812
uint64_t previousFlushWallTime = context_->stripeFlushTiming.wallNanos;
811813
std::atomic<uint64_t> chunkSize = 0;
@@ -821,56 +823,43 @@ bool VeloxWriter::writeChunks(
821823
streams_.resize(context_->schemaBuilder.nodeCount());
822824

823825
auto processStream = [&](StreamData& streamData) {
824-
// TODO: Breakdown large streams above a threshold into smaller chunks.
825-
const auto minStreamSize =
826-
lastChunk ? 0 : context_->options.minStreamChunkRawSize;
827826
const auto* context =
828827
streamData.descriptor().context<WriterStreamContext>();
829-
bool isNullStream = context && context->isNullStream;
830-
bool shouldChunkStream = false;
831-
if (isNullStream) {
832-
// We apply the same null logic, where if all values
833-
// are non-nulls, we omit the entire stream.
834-
shouldChunkStream = streamData.hasNulls() &&
835-
streamData.nonNulls().size() > minStreamSize;
836-
} else {
837-
shouldChunkStream = streamData.data().size() > minStreamSize;
838-
}
839-
840-
// If we have previous written chunks for this stream, during final
841-
// chunk, always write any remaining data.
842-
const auto offset = streamData.descriptor().offset();
843-
NIMBLE_DASSERT(offset < streams_.size(), "Stream offset out of range.");
844-
auto& stream = streams_[offset];
845-
if (lastChunk && !shouldChunkStream && !stream.content.empty()) {
846-
shouldChunkStream =
847-
!streamData.empty() || !streamData.nonNulls().empty();
848-
}
849-
850-
if (shouldChunkStream) {
851-
std::string_view encoded;
852-
if (isNullStream) {
853-
// For null streams we promote the null values to be written as
854-
// boolean data.
855-
encoded = encodeStream(
856-
*context_, *encodingBuffer_, NullsAsDataStreamData(streamData));
857-
} else {
858-
encoded = encodeStream(*context_, *encodingBuffer_, streamData);
859-
}
828+
const bool isNullStream = context && context->isNullStream;
829+
const auto& offset = streamData.descriptor().offset();
830+
auto& streamSize = context_->columnStats[offset].physicalSize;
831+
logicalSizeBeforeEncoding += streamData.memoryUsed();
832+
auto& streamContent = streams_[offset].content;
833+
auto chunker = getStreamChunker(
834+
streamData,
835+
context_->options.maxStreamChunkRawSize,
836+
context_->options.minStreamChunkRawSize,
837+
ensureFullChunks,
838+
streamContent.empty(),
839+
isNullStream,
840+
lastChunk);
841+
while (auto streamDataView = chunker->next()) {
842+
// Null stream values are converted to boolean data for encoding.
843+
std::string_view encoded = isNullStream
844+
? encodeStream(
845+
*context_,
846+
*encodingBuffer_,
847+
NullsAsDataStreamData(*streamDataView))
848+
: encodeStream(*context_, *encodingBuffer_, *streamDataView);
860849

861850
if (!encoded.empty()) {
862-
auto& streamSize = context_->columnStats[offset].physicalSize;
863851
ChunkedStreamWriter chunkWriter{*encodingBuffer_};
864852
for (auto& buffer : chunkWriter.encode(encoded)) {
865853
streamSize += buffer.size();
866854
chunkSize += buffer.size();
867-
stream.content.push_back(std::move(buffer));
855+
streamContent.push_back(std::move(buffer));
868856
}
869857
}
870858
wroteChunk = true;
871-
logicalSizeBeforeEncoding += streamData.memoryUsed();
872-
streamData.reset();
873859
}
860+
// Reset erases processed stream data to reclaim memory.
861+
chunker.reset();
862+
logicalSizeBeforeEncoding -= streamData.memoryUsed();
874863
};
875864

876865
const auto& streams = context_->streams();
@@ -924,7 +913,7 @@ bool VeloxWriter::writeStripe() {
924913
// Chunk all streams.
925914
std::vector<uint32_t> streamIndices(context_->streams().size());
926915
std::iota(streamIndices.begin(), streamIndices.end(), 0);
927-
writeChunks(streamIndices, true);
916+
writeChunks(streamIndices, /*ensureFullChunks=*/false, /*lastChunk=*/true);
928917
} else {
929918
writeChunk(true);
930919
}
@@ -1011,32 +1000,50 @@ bool VeloxWriter::evalauateFlushPolicy() {
10111000
};
10121001

10131002
if (context_->options.enableChunking && shouldChunk()) {
1003+
auto batchChunkStreams = [&](const std::vector<uint32_t>& indices,
1004+
bool ensureFullChunks) {
1005+
const size_t indicesCount = indices.size();
1006+
const auto batchSize = context_->options.chunkedStreamBatchSize;
1007+
for (size_t index = 0; index < indicesCount; index += batchSize) {
1008+
size_t currentBatchSize = std::min(batchSize, indicesCount - index);
1009+
std::span<const uint32_t> batchIndices(
1010+
indices.begin() + index, currentBatchSize);
1011+
// Stop attempting chunking once streams are too small to chunk or
1012+
// memory pressure is relieved.
1013+
if (!(writeChunks(batchIndices, ensureFullChunks) && shouldChunk())) {
1014+
return false;
1015+
}
1016+
}
1017+
return true;
1018+
};
1019+
1020+
// Relieve memory pressure by chunking streams above max size.
10141021
const auto& streams = context_->streams();
1015-
const size_t streamCount = streams.size();
1016-
// Sort streams for chunking based on raw memory usage.
1017-
// TODO(T240072104): Improve performance by bucketing the streams by size
1018-
// (most significant bit) instead of sorting.
1019-
std::vector<uint32_t> streamIndices(streamCount);
1020-
std::iota(streamIndices.begin(), streamIndices.end(), 0);
1021-
std::sort(
1022-
streamIndices.begin(),
1023-
streamIndices.end(),
1024-
[&](const uint32_t& a, const uint32_t& b) {
1025-
return streams[a]->memoryUsed() > streams[b]->memoryUsed();
1026-
});
1027-
1028-
// Chunk streams in batches.
1029-
const auto batchSize = context_->options.chunkedStreamBatchSize;
1030-
for (size_t index = 0; index < streamCount; index += batchSize) {
1031-
const size_t currentBatchSize = std::min(batchSize, streamCount - index);
1032-
std::span<const uint32_t> batchIndices(
1033-
streamIndices.begin() + index, currentBatchSize);
1034-
// Stop attempting chunking once streams are too small to chunk or
1035-
// memory pressure is relieved.
1036-
if (!(writeChunks(batchIndices, false) && shouldChunk())) {
1037-
break;
1022+
std::vector<uint32_t> streamIndices;
1023+
streamIndices.reserve(streams.size());
1024+
for (auto streamIndex = 0; streamIndex < streams.size(); ++streamIndex) {
1025+
if (streams[streamIndex]->memoryUsed() >=
1026+
context_->options.maxStreamChunkRawSize) {
1027+
streamIndices.push_back(streamIndex);
10381028
}
10391029
}
1030+
const bool continueChunking =
1031+
batchChunkStreams(streamIndices, /*ensureFullChunks=*/true);
1032+
if (continueChunking) {
1033+
// Relieve memory pressure by chunking small streams.
1034+
// Sort streams for chunking based on raw memory usage.
1035+
// TODO(T240072104): Improve performance by bucketing the streams
1036+
// by size (by most significant bit) instead of sorting them.
1037+
streamIndices.resize(streams.size());
1038+
std::iota(streamIndices.begin(), streamIndices.end(), 0);
1039+
std::sort(
1040+
streamIndices.begin(),
1041+
streamIndices.end(),
1042+
[&](const uint32_t& a, const uint32_t& b) {
1043+
return streams[a]->memoryUsed() > streams[b]->memoryUsed();
1044+
});
1045+
batchChunkStreams(streamIndices, /*ensureFullChunks=*/false);
1046+
}
10401047
}
10411048

10421049
if (shouldFlush()) {

dwio/nimble/velox/VeloxWriter.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ class VeloxWriter {
9292
// Returns 'true' if chunks were written.
9393
bool writeChunks(
9494
std::span<const uint32_t> streamIndices,
95-
bool lastChunk = true);
95+
bool ensureFullChunks = false,
96+
bool lastChunk = false);
9697
};
9798

9899
} // namespace facebook::nimble

dwio/nimble/velox/VeloxWriterOptions.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,11 @@ struct VeloxWriterOptions {
100100
// Note: this is ignored when it is time to flush a stripe.
101101
size_t chunkedStreamBatchSize = 1024;
102102

103+
// When flushing data streams into chunks, streams with raw data size larger
104+
// than this threshold will be broken down into multiple smaller chunks. Each
105+
// chunk will be at most this size.
106+
uint64_t maxStreamChunkRawSize = 4 << 20;
107+
103108
// The factory function that produces the root encoding selection policy.
104109
// Encoding selection policy is the way to balance the tradeoffs of
105110
// different performance factors (at both read and write times). Heuristics

dwio/nimble/velox/tests/VeloxWriterTests.cpp

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1944,6 +1944,7 @@ struct ChunkFlushPolicyTestCase {
19441944
const uint64_t writerMemoryLowThresholdBytes{75 << 10};
19451945
const double estimatedCompressionFactor{1.3};
19461946
const uint32_t minStreamChunkRawSize{100};
1947+
const uint32_t maxStreamChunkRawSize{128 << 10};
19471948
const uint32_t expectedStripeCount{0};
19481949
const uint32_t expectedMaxChunkCount{0};
19491950
const uint32_t expectedMinChunkCount{0};
@@ -1959,6 +1960,7 @@ TEST_P(ChunkFlushPolicyTest, ChunkFlushPolicyIntegration) {
19591960
{{"BIGINT", velox::BIGINT()}, {"SMALLINT", velox::SMALLINT()}});
19601961
nimble::VeloxWriterOptions writerOptions{
19611962
.minStreamChunkRawSize = GetParam().minStreamChunkRawSize,
1963+
.maxStreamChunkRawSize = GetParam().maxStreamChunkRawSize,
19621964
.chunkedStreamBatchSize = GetParam().chunkedStreamBatchSize,
19631965
.flushPolicyFactory = GetParam().enableChunking
19641966
? []() -> std::unique_ptr<nimble::FlushPolicy> {
@@ -2074,6 +2076,7 @@ INSTANTIATE_TEST_CASE_P(
20742076
.writerMemoryLowThresholdBytes = 75 << 10,
20752077
.estimatedCompressionFactor = 1.3,
20762078
.minStreamChunkRawSize = 100,
2079+
.maxStreamChunkRawSize = 128 << 10,
20772080
.expectedStripeCount = 4,
20782081
.expectedMaxChunkCount = 1,
20792082
.expectedMinChunkCount = 1,
@@ -2088,13 +2091,29 @@ INSTANTIATE_TEST_CASE_P(
20882091
.writerMemoryLowThresholdBytes = 75 << 10,
20892092
.estimatedCompressionFactor = 1.3,
20902093
.minStreamChunkRawSize = 100,
2094+
.maxStreamChunkRawSize = 128 << 10,
20912095
.expectedStripeCount = 7,
20922096
.expectedMaxChunkCount = 2,
20932097
.expectedMinChunkCount = 1,
20942098
.chunkedStreamBatchSize = 2,
20952099
},
2100+
// Reducing maxStreamChunkRawSize produces more chunks
2101+
ChunkFlushPolicyTestCase{
2102+
.batchCount = 20,
2103+
.enableChunking = true,
2104+
.targetStripeSizeBytes = 250 << 10, // 250KB
2105+
.writerMemoryHighThresholdBytes = 80 << 10,
2106+
.writerMemoryLowThresholdBytes = 75 << 10,
2107+
.estimatedCompressionFactor = 1.0,
2108+
.minStreamChunkRawSize = 100,
2109+
.maxStreamChunkRawSize = 12 << 10, // -126KB
2110+
.expectedStripeCount = 8,
2111+
.expectedMaxChunkCount = 9, // +7
2112+
.expectedMinChunkCount = 2, // +1
2113+
.chunkedStreamBatchSize = 10,
2114+
},
20962115
// High memory regression threshold and no compression
2097-
// Produces file identical to RawStripeSizeFlushPolicy
2116+
// Stripe count identical to RawStripeSizeFlushPolicy
20982117
ChunkFlushPolicyTestCase{
20992118
.batchCount = 20,
21002119
.enableChunking = true,
@@ -2103,8 +2122,9 @@ INSTANTIATE_TEST_CASE_P(
21032122
.writerMemoryLowThresholdBytes = 75 << 10,
21042123
.estimatedCompressionFactor = 1.0,
21052124
.minStreamChunkRawSize = 100,
2125+
.maxStreamChunkRawSize = 128 << 10,
21062126
.expectedStripeCount = 4,
2107-
.expectedMaxChunkCount = 1,
2127+
.expectedMaxChunkCount = 2,
21082128
.expectedMinChunkCount = 1,
21092129
.chunkedStreamBatchSize = 2,
21102130
},
@@ -2118,13 +2138,14 @@ INSTANTIATE_TEST_CASE_P(
21182138
.writerMemoryLowThresholdBytes = 35 << 10, // -40KB
21192139
.estimatedCompressionFactor = 1.3,
21202140
.minStreamChunkRawSize = 100,
2141+
.maxStreamChunkRawSize = 128 << 10,
21212142
.expectedStripeCount = 10,
21222143
.expectedMaxChunkCount = 2,
21232144
.expectedMinChunkCount = 2, // +1 chunk
21242145
.chunkedStreamBatchSize = 2,
21252146
},
21262147
// High target stripe size bytes (with disabled memory pressure
2127-
// optimization) produces fewer stripes. Single chunks.
2148+
// optimization) produces fewer stripes.
21282149
ChunkFlushPolicyTestCase{
21292150
.batchCount = 20,
21302151
.enableChunking = true,
@@ -2133,9 +2154,10 @@ INSTANTIATE_TEST_CASE_P(
21332154
.writerMemoryLowThresholdBytes = 1 << 20, // +1MB
21342155
.estimatedCompressionFactor = 1.3,
21352156
.minStreamChunkRawSize = 100,
2157+
.maxStreamChunkRawSize = 128 << 10,
21362158
.expectedStripeCount = 1, // -3 stripes
2137-
.expectedMaxChunkCount = 1,
2138-
.expectedMinChunkCount = 1,
2159+
.expectedMaxChunkCount = 5,
2160+
.expectedMinChunkCount = 2,
21392161
.chunkedStreamBatchSize = 2,
21402162

21412163
},
@@ -2149,6 +2171,7 @@ INSTANTIATE_TEST_CASE_P(
21492171
.writerMemoryLowThresholdBytes = 1 << 20, // +1MB
21502172
.estimatedCompressionFactor = 1.3,
21512173
.minStreamChunkRawSize = 100,
2174+
.maxStreamChunkRawSize = 128 << 10,
21522175
.expectedStripeCount = 7, // +6 stripes
21532176
.expectedMaxChunkCount = 1,
21542177
.expectedMinChunkCount = 1,
@@ -2164,6 +2187,7 @@ INSTANTIATE_TEST_CASE_P(
21642187
.writerMemoryLowThresholdBytes = 75 << 10,
21652188
.estimatedCompressionFactor = 1.0,
21662189
.minStreamChunkRawSize = 100,
2190+
.maxStreamChunkRawSize = 128 << 10,
21672191
.expectedStripeCount = 7,
21682192
.expectedMaxChunkCount = 2,
21692193
.expectedMinChunkCount = 1,

0 commit comments

Comments
 (0)