@@ -2016,6 +2016,7 @@ struct ChunkFlushPolicyTestCase {
20162016 const uint32_t expectedStripeCount{0 };
20172017 const uint32_t expectedMaxChunkCount{0 };
20182018 const uint32_t expectedMinChunkCount{0 };
2019+ const uint32_t chunkedStreamBatchSize{2 };
20192020};
20202021
20212022class ChunkFlushPolicyTest
@@ -2027,6 +2028,7 @@ TEST_P(ChunkFlushPolicyTest, ChunkFlushPolicyIntegration) {
20272028 {{" BIGINT" , velox::BIGINT ()}, {" SMALLINT" , velox::SMALLINT ()}});
20282029 nimble::VeloxWriterOptions writerOptions{
20292030 .minStreamChunkRawSize = GetParam ().minStreamChunkRawSize ,
2031+ .chunkedStreamBatchSize = GetParam ().chunkedStreamBatchSize ,
20302032 .flushPolicyFactory = GetParam ().enableChunking
20312033 ? []() -> std::unique_ptr<nimble::FlushPolicy> {
20322034 return std::make_unique<nimble::ChunkFlushPolicy>(
@@ -2164,6 +2166,85 @@ TEST_F(VeloxWriterTests, FuzzComplex) {
21642166 }
21652167}
21662168
2169+ TEST_F (VeloxWriterTests, BatchedChunkingRelievesMemoryPressure) {
2170+ // Verify we stop chunking early when chunking relieves memory pressure.
2171+ const uint32_t seed = FLAGS_writer_tests_seed > 0 ? FLAGS_writer_tests_seed
2172+ : folly::Random::rand32 ();
2173+ LOG (INFO) << " seed: " << seed;
2174+ std::mt19937 rng{seed};
2175+ const uint32_t rowCount =
2176+ std::uniform_int_distribution<uint32_t >(1000 , 4096 )(rng);
2177+
2178+ velox::VectorFuzzer fuzzer ({.vectorSize = rowCount}, leafPool_.get (), seed);
2179+ const auto stringColumn = fuzzer.fuzzFlat (velox::VARCHAR ());
2180+ const auto intColumn = fuzzer.fuzzFlat (velox::INTEGER ());
2181+
2182+ nimble::RawSizeContext context;
2183+ nimble::OrderedRanges ranges;
2184+ ranges.add (0 , rowCount);
2185+ const uint64_t stringColumnRawSize =
2186+ nimble::getRawSizeFromVector (stringColumn, ranges, context) +
2187+ sizeof (std::string_view) * rowCount;
2188+ const uint64_t intColumnRawSize =
2189+ nimble::getRawSizeFromVector (intColumn, ranges, context);
2190+
2191+ constexpr size_t kColumnCount = 20 ;
2192+ constexpr size_t kBatchSize = 4 ;
2193+ std::vector<velox::VectorPtr> children (kColumnCount );
2194+ std::vector<std::string> columnNames (kColumnCount );
2195+ uint64_t totalRawSize = 0 ;
2196+ for (size_t i = 0 ; i < kColumnCount ; i += 2 ) {
2197+ columnNames[i] = fmt::format (" string_column_{}" , i);
2198+ columnNames[i + 1 ] = fmt::format (" int_column_{}" , i);
2199+ children[i] = stringColumn;
2200+ children[i + 1 ] = intColumn;
2201+ totalRawSize += intColumnRawSize + stringColumnRawSize;
2202+ }
2203+
2204+ velox::test::VectorMaker vectorMaker{leafPool_.get ()};
2205+ const auto rowVector = vectorMaker.rowVector (columnNames, children);
2206+
2207+ // We will return true twice and false once
2208+ const std::vector<bool > expectedChunkingDecisions{true , true , false };
2209+ std::vector<bool > actualChunkingDecisions;
2210+
2211+ // We will be chunking the large streams in the first two batches. 8 string
2212+ // streams in total. We set the expected rawSize after chunking these two
2213+ // batches as our memory threshold.
2214+ const uint64_t memoryPressureThreshold =
2215+ totalRawSize - (2 * kBatchSize * stringColumnRawSize);
2216+
2217+ nimble::VeloxWriterOptions writerOptions;
2218+ writerOptions.chunkedStreamBatchSize = kBatchSize ;
2219+ writerOptions.enableChunking = true ;
2220+ writerOptions.flushPolicyFactory =
2221+ [&]() -> std::unique_ptr<nimble::FlushPolicy> {
2222+ return std::make_unique<nimble::LambdaFlushPolicy>(
2223+ [](const auto &) { return true ; },
2224+ [&](const nimble::StripeProgress& stripeProgress) {
2225+ const bool shouldChunk =
2226+ stripeProgress.stripeRawSize > memoryPressureThreshold;
2227+ actualChunkingDecisions.push_back (shouldChunk);
2228+ return shouldChunk;
2229+ });
2230+ };
2231+
2232+ std::string file;
2233+ auto writeFile = std::make_unique<velox::InMemoryWriteFile>(&file);
2234+ nimble::VeloxWriter writer (
2235+ *rootPool_, rowVector->type (), std::move (writeFile), writerOptions);
2236+ writer.write (rowVector);
2237+ writer.close ();
2238+
2239+ EXPECT_THAT (
2240+ expectedChunkingDecisions,
2241+ ::testing::ElementsAreArray (actualChunkingDecisions));
2242+
2243+ velox::InMemoryReadFile readFile (file);
2244+ nimble::VeloxReader reader (*leafPool_, &readFile);
2245+ validateChunkSize (reader, writerOptions.minStreamChunkRawSize );
2246+ }
2247+
21672248INSTANTIATE_TEST_CASE_P (
21682249 StripeRawSizeFlushPolicyTestSuite,
21692250 StripeRawSizeFlushPolicyTest,
@@ -2205,6 +2286,7 @@ INSTANTIATE_TEST_CASE_P(
22052286 .expectedStripeCount = 4 ,
22062287 .expectedMaxChunkCount = 1 ,
22072288 .expectedMinChunkCount = 1 ,
2289+ .chunkedStreamBatchSize = 2 ,
22082290 },
22092291 // Baseline with default settings (has chunking)
22102292 ChunkFlushPolicyTestCase{
@@ -2218,6 +2300,7 @@ INSTANTIATE_TEST_CASE_P(
22182300 .expectedStripeCount = 7 ,
22192301 .expectedMaxChunkCount = 2 ,
22202302 .expectedMinChunkCount = 1 ,
2303+ .chunkedStreamBatchSize = 2 ,
22212304 },
22222305 // High memory regression threshold and no compression
22232306 // Produces file identical to RawStripeSizeFlushPolicy
@@ -2234,6 +2317,7 @@ INSTANTIATE_TEST_CASE_P(
22342317 .expectedStripeCount = 4 ,
22352318 .expectedMaxChunkCount = 1 ,
22362319 .expectedMinChunkCount = 1 ,
2320+ .chunkedStreamBatchSize = 2 ,
22372321 },
22382322 // Low memory regression threshold
22392323 // Produces file with more min chunks per stripe
@@ -2249,7 +2333,8 @@ INSTANTIATE_TEST_CASE_P(
22492333 .minStreamChunkRawSize = 100 ,
22502334 .expectedStripeCount = 10 ,
22512335 .expectedMaxChunkCount = 2 ,
2252- .expectedMinChunkCount = 2 ,
2336+ .expectedMinChunkCount = 2 , // +1 chunk
2337+ .chunkedStreamBatchSize = 2 ,
22532338 },
22542339 // High target stripe size bytes (with disabled memory pressure
22552340 // optimization) produces fewer stripes. Single chunks.
@@ -2267,6 +2352,8 @@ INSTANTIATE_TEST_CASE_P(
22672352 .expectedStripeCount = 1 ,
22682353 .expectedMaxChunkCount = 1 ,
22692354 .expectedMinChunkCount = 1 ,
2355+ .chunkedStreamBatchSize = 2 ,
2356+
22702357 },
22712358 // Low target stripe size bytes (with disabled memory pressure
22722359 // optimization) produces more stripes. Single chunks.
@@ -2284,5 +2371,20 @@ INSTANTIATE_TEST_CASE_P(
22842371 .expectedStripeCount = 7 ,
22852372 .expectedMaxChunkCount = 1 ,
22862373 .expectedMinChunkCount = 1 ,
2287- }));
2374+ .chunkedStreamBatchSize = 2 ,
2375+
2376+ },
2377+ // Higher chunked stream batch size (no change in policy)
2378+ ChunkFlushPolicyTestCase{
2379+ .batchCount = 20 ,
2380+ .enableChunking = true ,
2381+ .targetStripeSizeBytes = 250 << 10 , // 250KB
2382+ .writerMemoryHighThresholdBytes = 80 << 10 ,
2383+ .writerMemoryLowThresholdBytes = 75 << 10 ,
2384+ .estimatedCompressionFactor = 1.0 ,
2385+ .minStreamChunkRawSize = 100 ,
2386+ .expectedStripeCount = 7 ,
2387+ .expectedMaxChunkCount = 2 ,
2388+ .expectedMinChunkCount = 1 ,
2389+ .chunkedStreamBatchSize = 10 }));
22882390} // namespace facebook
0 commit comments