@@ -2020,6 +2020,7 @@ struct ChunkFlushPolicyTestCase {
20202020 const uint32_t expectedStripeCount{0 };
20212021 const uint32_t expectedMaxChunkCount{0 };
20222022 const uint32_t expectedMinChunkCount{0 };
2023+ const uint32_t chunkedStreamBatchSize{2 };
20232024};
20242025
20252026class ChunkFlushPolicyTest
@@ -2031,6 +2032,7 @@ TEST_P(ChunkFlushPolicyTest, ChunkFlushPolicyIntegration) {
20312032 {{" BIGINT" , velox::BIGINT ()}, {" SMALLINT" , velox::SMALLINT ()}});
20322033 nimble::VeloxWriterOptions writerOptions{
20332034 .minStreamChunkRawSize = GetParam ().minStreamChunkRawSize ,
2035+ .chunkedStreamBatchSize = GetParam ().chunkedStreamBatchSize ,
20342036 .flushPolicyFactory = GetParam ().enableChunking
20352037 ? []() -> std::unique_ptr<nimble::FlushPolicy> {
20362038 return std::make_unique<nimble::ChunkFlushPolicy>(
@@ -2168,6 +2170,87 @@ TEST_F(VeloxWriterTests, FuzzComplex) {
21682170 }
21692171}
21702172
2173+ TEST_F (VeloxWriterTests, BatchedChunkingRelievesMemoryPressure) {
2174+ // Verify we stop chunking early when chunking relieves memory pressure.
2175+ const uint32_t seed = FLAGS_writer_tests_seed > 0 ? FLAGS_writer_tests_seed
2176+ : folly::Random::rand32 ();
2177+ LOG (INFO) << " seed: " << seed;
2178+ std::mt19937 rng{seed};
2179+ const uint32_t rowCount =
2180+ std::uniform_int_distribution<uint32_t >(1 , 4096 )(rng);
2181+
2182+ velox::VectorFuzzer fuzzer ({.vectorSize = rowCount}, leafPool_.get (), seed);
2183+ const auto stringColumn = fuzzer.fuzzFlat (velox::VARCHAR ());
2184+ const auto intColumn = fuzzer.fuzzFlat (velox::INTEGER ());
2185+
2186+ nimble::RawSizeContext context;
2187+ nimble::OrderedRanges ranges;
2188+ ranges.add (0 , rowCount);
2189+ const uint64_t stringColumnRawSize =
2190+ nimble::getRawSizeFromVector (stringColumn, ranges, context) +
2191+ sizeof (std::string_view) * rowCount;
2192+ const uint64_t intColumnRawSize =
2193+ nimble::getRawSizeFromVector (intColumn, ranges, context);
2194+
2195+ constexpr size_t kColumnCount = 20 ;
2196+ constexpr size_t kBatchSize = 4 ;
2197+ std::vector<velox::VectorPtr> children (kColumnCount );
2198+ std::vector<std::string> columnNames (kColumnCount );
2199+ uint64_t totalRawSize = 0 ;
2200+ for (size_t i = 0 ; i < kColumnCount ; i += 2 ) {
2201+ columnNames[i] = fmt::format (" string_column_{}" , i);
2202+ columnNames[i + 1 ] = fmt::format (" int_column_{}" , i);
2203+ children[i] = stringColumn;
2204+ children[i + 1 ] = intColumn;
2205+ totalRawSize += intColumnRawSize + stringColumnRawSize;
2206+ }
2207+
2208+ velox::test::VectorMaker vectorMaker{leafPool_.get ()};
2209+ const auto rowVector = vectorMaker.rowVector (columnNames, children);
2210+
2211+ // We will return true twice and false once
2212+ const std::vector<bool > expectedChunkingDecisions{true , true , false };
2213+ std::vector<bool > actualChunkingDecisions;
2214+
2215+ // We will be chunking the large streams in the first two batches. 8 string
2216+ // streams in total. We set the expected rawSize after chunking these two
2217+ // batches as our memory threshold.
2218+ const uint64_t memoryPressureThreshold =
2219+ totalRawSize - (2 * kBatchSize * stringColumnRawSize);
2220+
2221+ nimble::VeloxWriterOptions writerOptions;
2222+ writerOptions.chunkedStreamBatchSize = kBatchSize ;
2223+ writerOptions.enableChunking = true ;
2224+ writerOptions.minStreamChunkRawSize = intColumnRawSize / 2 ;
2225+ writerOptions.flushPolicyFactory =
2226+ [&]() -> std::unique_ptr<nimble::FlushPolicy> {
2227+ return std::make_unique<nimble::LambdaFlushPolicy>(
2228+ /* shouldFlush */ [](const auto &) { return true ; },
2229+ /* shouldChunk */
2230+ [&](const nimble::StripeProgress& stripeProgress) {
2231+ const bool shouldChunk =
2232+ stripeProgress.stripeRawSize > memoryPressureThreshold;
2233+ actualChunkingDecisions.push_back (shouldChunk);
2234+ return shouldChunk;
2235+ });
2236+ };
2237+
2238+ std::string file;
2239+ auto writeFile = std::make_unique<velox::InMemoryWriteFile>(&file);
2240+ nimble::VeloxWriter writer (
2241+ *rootPool_, rowVector->type (), std::move (writeFile), writerOptions);
2242+ writer.write (rowVector);
2243+ writer.close ();
2244+
2245+ EXPECT_THAT (
2246+ actualChunkingDecisions,
2247+ ::testing::ElementsAreArray (expectedChunkingDecisions));
2248+
2249+ velox::InMemoryReadFile readFile (file);
2250+ nimble::VeloxReader reader (*leafPool_, &readFile);
2251+ validateChunkSize (reader, writerOptions.minStreamChunkRawSize );
2252+ }
2253+
21712254INSTANTIATE_TEST_CASE_P (
21722255 StripeRawSizeFlushPolicyTestSuite,
21732256 StripeRawSizeFlushPolicyTest,
@@ -2209,6 +2292,7 @@ INSTANTIATE_TEST_CASE_P(
22092292 .expectedStripeCount = 4 ,
22102293 .expectedMaxChunkCount = 1 ,
22112294 .expectedMinChunkCount = 1 ,
2295+ .chunkedStreamBatchSize = 2 ,
22122296 },
22132297 // Baseline with default settings (has chunking)
22142298 ChunkFlushPolicyTestCase{
@@ -2222,6 +2306,7 @@ INSTANTIATE_TEST_CASE_P(
22222306 .expectedStripeCount = 7 ,
22232307 .expectedMaxChunkCount = 2 ,
22242308 .expectedMinChunkCount = 1 ,
2309+ .chunkedStreamBatchSize = 2 ,
22252310 },
22262311 // High memory regression threshold and no compression
22272312 // Produces file identical to RawStripeSizeFlushPolicy
@@ -2238,6 +2323,7 @@ INSTANTIATE_TEST_CASE_P(
22382323 .expectedStripeCount = 4 ,
22392324 .expectedMaxChunkCount = 1 ,
22402325 .expectedMinChunkCount = 1 ,
2326+ .chunkedStreamBatchSize = 2 ,
22412327 },
22422328 // Low memory regression threshold
22432329 // Produces file with more min chunks per stripe
@@ -2253,7 +2339,8 @@ INSTANTIATE_TEST_CASE_P(
22532339 .minStreamChunkRawSize = 100 ,
22542340 .expectedStripeCount = 10 ,
22552341 .expectedMaxChunkCount = 2 ,
2256- .expectedMinChunkCount = 2 ,
2342+ .expectedMinChunkCount = 2 , // +1 chunk
2343+ .chunkedStreamBatchSize = 2 ,
22572344 },
22582345 // High target stripe size bytes (with disabled memory pressure
22592346 // optimization) produces fewer stripes. Single chunks.
@@ -2271,6 +2358,8 @@ INSTANTIATE_TEST_CASE_P(
22712358 .expectedStripeCount = 1 ,
22722359 .expectedMaxChunkCount = 1 ,
22732360 .expectedMinChunkCount = 1 ,
2361+ .chunkedStreamBatchSize = 2 ,
2362+
22742363 },
22752364 // Low target stripe size bytes (with disabled memory pressure
22762365 // optimization) produces more stripes. Single chunks.
@@ -2288,5 +2377,20 @@ INSTANTIATE_TEST_CASE_P(
22882377 .expectedStripeCount = 7 ,
22892378 .expectedMaxChunkCount = 1 ,
22902379 .expectedMinChunkCount = 1 ,
2291- }));
2380+ .chunkedStreamBatchSize = 2 ,
2381+
2382+ },
2383+ // Higher chunked stream batch size (no change in policy)
2384+ ChunkFlushPolicyTestCase{
2385+ .batchCount = 20 ,
2386+ .enableChunking = true ,
2387+ .targetStripeSizeBytes = 250 << 10 , // 250KB
2388+ .writerMemoryHighThresholdBytes = 80 << 10 ,
2389+ .writerMemoryLowThresholdBytes = 75 << 10 ,
2390+ .estimatedCompressionFactor = 1.0 ,
2391+ .minStreamChunkRawSize = 100 ,
2392+ .expectedStripeCount = 7 ,
2393+ .expectedMaxChunkCount = 2 ,
2394+ .expectedMinChunkCount = 1 ,
2395+ .chunkedStreamBatchSize = 10 }));
22922396} // namespace facebook
0 commit comments