@@ -641,7 +641,9 @@ void VeloxWriter::flush() {
641641 }
642642}
643643
644- bool VeloxWriter::writeChunk (bool lastChunk) {
644+ bool VeloxWriter::writeChunk (
645+ bool lastChunk,
646+ std::optional<uint32_t > streamIndex) {
645647 uint64_t previousFlushWallTime = context_->stripeFlushTiming .wallNanos ;
646648 std::atomic<uint64_t > chunkSize = 0 ;
647649 std::atomic<uint64_t > sizeBeforeEncoding = 0 ;
@@ -742,7 +744,17 @@ bool VeloxWriter::writeChunk(bool lastChunk) {
742744 }
743745 };
744746
745- if (context_->options .encodingExecutor ) {
747+ #define ENCODE_STREAM_DATA (innerStreamData, isNullStream, streamSize ) \
748+ do { \
749+ if (isNullStream) { \
750+ NullsAsDataStreamData nullsStreamData{innerStreamData}; \
751+ encode (nullsStreamData, streamSize); \
752+ } else { \
753+ encode (innerStreamData, streamSize); \
754+ } \
755+ } while (0 )
756+
757+ if (!streamIndex.has_value () && context_->options .encodingExecutor ) {
746758 velox::dwio::common::ExecutorBarrier barrier{
747759 context_->options .encodingExecutor };
748760 for (auto & streamData : context_->streams ()) {
@@ -753,16 +765,25 @@ bool VeloxWriter::writeChunk(bool lastChunk) {
753765 *streamData, [&](StreamData& innerStreamData, bool isNullStream) {
754766 barrier.add (
755767 [&innerStreamData, isNullStream, &encode, &streamSize]() {
756- if (isNullStream) {
757- NullsAsDataStreamData nullsStreamData{innerStreamData};
758- encode (nullsStreamData, streamSize);
759- } else {
760- encode (innerStreamData, streamSize);
761- }
768+ ENCODE_STREAM_DATA (
769+ innerStreamData, isNullStream, streamSize);
762770 });
763771 });
764772 }
765773 barrier.waitAll ();
774+ } else if (streamIndex.has_value ()) {
775+ const auto & streams = context_->streams ();
776+ NIMBLE_DASSERT (
777+ streams.size () >= streamIndex.value (), " Invalid stream index" );
778+ const auto & streamData = streams[streamIndex.value ()];
779+ auto & streamSize =
780+ context_->columnStats [streamData->descriptor ().offset ()].physicalSize ;
781+ processStream (
782+ *streamData,
783+ [&encode, &streamSize](
784+ StreamData& innerStreamData, bool isNullStream) {
785+ ENCODE_STREAM_DATA (innerStreamData, isNullStream, streamSize);
786+ });
766787 } else {
767788 for (auto & streamData : context_->streams ()) {
768789 auto & streamSize =
@@ -772,12 +793,7 @@ bool VeloxWriter::writeChunk(bool lastChunk) {
772793 *streamData,
773794 [&encode, &streamSize](
774795 StreamData& innerStreamData, bool isNullStream) {
775- if (isNullStream) {
776- NullsAsDataStreamData nullsStreamData{innerStreamData};
777- encode (nullsStreamData, streamSize);
778- } else {
779- encode (innerStreamData, streamSize);
780- }
796+ ENCODE_STREAM_DATA (innerStreamData, isNullStream, streamSize);
781797 });
782798 }
783799 }
@@ -802,6 +818,8 @@ bool VeloxWriter::writeChunk(bool lastChunk) {
802818 VLOG (1 ) << " writeChunk milliseconds: " << flushWallTimeMs
803819 << " , chunk bytes: " << chunkSize;
804820 return wroteChunk.load ();
821+
822+ #undef ENCODE_STREAM_DATA
805823}
806824
807825uint32_t VeloxWriter::writeStripe () {
@@ -886,8 +904,22 @@ bool VeloxWriter::tryWriteStripe(bool force) {
886904 try {
887905 // TODO: we can improve merge the last chunk write with stripe
888906 if (decision == FlushDecision::Chunk && context_->options .enableChunking ) {
889- while (decision == FlushDecision::Chunk && writeChunk (false )) {
907+ const auto & streams = context_->streams ();
908+ // Sort streams for chunking based on raw memory usage.
909+ std::vector<uint32_t > streamIndices (streams.size ());
910+ std::iota (streamIndices.begin (), streamIndices.end (), 0 );
911+ std::sort (
912+ streamIndices.begin (),
913+ streamIndices.end (),
914+ [&](const uint32_t & a, const uint32_t & b) {
915+ return streams[a]->memoryUsed () > streams[b]->memoryUsed ();
916+ });
917+ uint32_t currentIndex = 0 ;
918+ while (decision == FlushDecision::Chunk &&
919+ currentIndex < streams.size () &&
920+ writeChunk (false , streamIndices[currentIndex])) {
890921 decision = shouldChunk ();
922+ ++currentIndex;
891923 }
892924 }
893925 decision = (decision != FlushDecision::Stripe) ? shouldFlush () : decision;
0 commit comments