@@ -641,7 +641,9 @@ void VeloxWriter::flush() {
641641 }
642642}
643643
644- bool VeloxWriter::writeChunk (bool lastChunk) {
644+ bool VeloxWriter::writeChunk (
645+ bool lastChunk,
646+ std::optional<size_t > streamIndex) {
645647 uint64_t previousFlushWallTime = context_->stripeFlushTiming .wallNanos ;
646648 std::atomic<uint64_t > chunkSize = 0 ;
647649 std::atomic<uint64_t > sizeBeforeEncoding = 0 ;
@@ -742,7 +744,17 @@ bool VeloxWriter::writeChunk(bool lastChunk) {
742744 }
743745 };
744746
745- if (context_->options .encodingExecutor ) {
747+ #define ENCODE_STREAM_DATA (innerStreamData, isNullStream, streamSize ) \
748+ do { \
749+ if (isNullStream) { \
750+ NullsAsDataStreamData nullsStreamData{innerStreamData}; \
751+ encode (nullsStreamData, streamSize); \
752+ } else { \
753+ encode (innerStreamData, streamSize); \
754+ } \
755+ } while (0 )
756+
757+ if (!streamIndex.has_value () && context_->options .encodingExecutor ) {
746758 velox::dwio::common::ExecutorBarrier barrier{
747759 context_->options .encodingExecutor };
748760 for (auto & streamData : context_->streams ()) {
@@ -753,16 +765,25 @@ bool VeloxWriter::writeChunk(bool lastChunk) {
753765 *streamData, [&](StreamData& innerStreamData, bool isNullStream) {
754766 barrier.add (
755767 [&innerStreamData, isNullStream, &encode, &streamSize]() {
756- if (isNullStream) {
757- NullsAsDataStreamData nullsStreamData{innerStreamData};
758- encode (nullsStreamData, streamSize);
759- } else {
760- encode (innerStreamData, streamSize);
761- }
768+ ENCODE_STREAM_DATA (
769+ innerStreamData, isNullStream, streamSize);
762770 });
763771 });
764772 }
765773 barrier.waitAll ();
774+ } else if (streamIndex.has_value ()) {
775+ const auto & streams = context_->streams ();
776+ NIMBLE_DASSERT (
777+ streams.size () >= streamIndex.value (), " Invalid stream index" );
778+ const auto & streamData = streams[streamIndex.value ()];
779+ auto & streamSize =
780+ context_->columnStats [streamData->descriptor ().offset ()].physicalSize ;
781+ processStream (
782+ *streamData,
783+ [&encode, &streamSize](
784+ StreamData& innerStreamData, bool isNullStream) {
785+ ENCODE_STREAM_DATA (innerStreamData, isNullStream, streamSize);
786+ });
766787 } else {
767788 for (auto & streamData : context_->streams ()) {
768789 auto & streamSize =
@@ -772,12 +793,7 @@ bool VeloxWriter::writeChunk(bool lastChunk) {
772793 *streamData,
773794 [&encode, &streamSize](
774795 StreamData& innerStreamData, bool isNullStream) {
775- if (isNullStream) {
776- NullsAsDataStreamData nullsStreamData{innerStreamData};
777- encode (nullsStreamData, streamSize);
778- } else {
779- encode (innerStreamData, streamSize);
780- }
796+ ENCODE_STREAM_DATA (innerStreamData, isNullStream, streamSize);
781797 });
782798 }
783799 }
@@ -802,6 +818,8 @@ bool VeloxWriter::writeChunk(bool lastChunk) {
802818 VLOG (1 ) << " writeChunk milliseconds: " << flushWallTimeMs
803819 << " , chunk bytes: " << chunkSize;
804820 return wroteChunk.load ();
821+
822+ #undef ENCODE_STREAM_DATA
805823}
806824
807825uint32_t VeloxWriter::writeStripe () {
@@ -876,10 +894,23 @@ bool VeloxWriter::tryWriteStripe(bool force) {
876894 try {
877895 // TODO: we can improve merge the last chunk write with stripe
878896 if (decision == FlushDecision::Chunk && context_->options .enableChunking ) {
897+ const auto & streams = context_->streams ();
898+ // Sort streams for chunking based on raw memory usage.
899+ std::vector<size_t > streamIndices (streams.size ());
900+ std::iota (streamIndices.begin (), streamIndices.end (), 0 );
901+ std::sort (
902+ streamIndices.begin (),
903+ streamIndices.end (),
904+ [&](const size_t & a, const size_t & b) {
905+ return streams.at (a)->memoryUsed () > streams.at (b)->memoryUsed ();
906+ });
907+ size_t currentIndex = 0 ;
879908 bool successfullyChunked = true ;
880- while (decision == FlushDecision::Chunk && successfullyChunked) {
881- successfullyChunked = writeChunk (false );
909+ while (decision == FlushDecision::Chunk && successfullyChunked &&
910+ currentIndex < streams.size ()) {
911+ successfullyChunked = writeChunk (false , streamIndices.at (currentIndex));
882912 decision = shouldFlush (successfullyChunked);
913+ ++currentIndex;
883914 }
884915 }
885916
0 commit comments