@@ -44,7 +44,6 @@ namespace detail {
4444class WriterContext : public FieldWriterContext {
4545 public:
4646 const VeloxWriterOptions options;
47- std::unique_ptr<FlushPolicy> flushPolicy;
4847 velox::CpuWallTiming totalFlushTiming;
4948 velox::CpuWallTiming stripeFlushTiming;
5049 velox::CpuWallTiming encodingSelectionTiming;
@@ -56,8 +55,11 @@ class WriterContext : public FieldWriterContext {
5655 uint64_t bytesWritten{0 };
5756 uint64_t rowsInFile{0 };
5857 uint64_t rowsInStripe{0 };
59- uint64_t stripeSize{0 };
60- uint64_t rawSize{0 };
58+ // Physical size of the encoded stripe data.
59+ uint64_t stripeEncodedPhysicalSize{0 };
60+ // Logical size of the encoded stripe data.
61+ uint64_t stripeEncodedLogicalSize{0 };
62+ uint64_t fileRawSize{0 };
6163 std::vector<uint64_t > rowsPerStripe;
6264
6365 WriterContext (
@@ -66,7 +68,6 @@ class WriterContext : public FieldWriterContext {
6668 : FieldWriterContext{memoryPool, options.reclaimerFactory (), options.vectorDecoderVisitor },
6769 options{std::move (options)},
6870 logger{this ->options .metricsLogger } {
69- flushPolicy = this ->options .flushPolicyFactory ();
7071 inputBufferGrowthPolicy = this ->options .lowMemoryMode
7172 ? std::make_unique<ExactGrowthPolicy>()
7273 : this ->options .inputGrowthPolicyFactory ();
@@ -81,7 +82,8 @@ class WriterContext : public FieldWriterContext {
8182 rowsPerStripe.push_back (rowsInStripe);
8283 memoryUsed = 0 ;
8384 rowsInStripe = 0 ;
84- stripeSize = 0 ;
85+ stripeEncodedPhysicalSize = 0 ;
86+ stripeEncodedLogicalSize = 0 ;
8587 ++stripeIndex_;
8688 }
8789
@@ -99,6 +101,45 @@ namespace {
99101
100102constexpr uint32_t kInitialSchemaSectionSize = 1 << 20 ; // 1MB
101103
104+ // When writing null streams, we write the nulls as data, and the stream itself
105+ // is non-nullable. This adpater class is how we expose the nulls as values.
106+ class NullsAsDataStreamData : public StreamData {
107+ public:
108+ explicit NullsAsDataStreamData (StreamData& streamData)
109+ : StreamData(streamData.descriptor()), streamData_{streamData} {
110+ streamData_.materialize ();
111+ }
112+
113+ inline virtual std::string_view data () const override {
114+ return {
115+ reinterpret_cast <const char *>(streamData_.nonNulls ().data ()),
116+ streamData_.nonNulls ().size ()};
117+ }
118+
119+ inline virtual std::span<const bool > nonNulls () const override {
120+ return {};
121+ }
122+
123+ inline virtual bool hasNulls () const override {
124+ return false ;
125+ }
126+
127+ inline virtual bool empty () const override {
128+ return streamData_.empty ();
129+ }
130+
131+ inline virtual uint64_t memoryUsed () const override {
132+ return streamData_.memoryUsed ();
133+ }
134+
135+ inline virtual void reset () override {
136+ streamData_.reset ();
137+ }
138+
139+ private:
140+ StreamData& streamData_;
141+ };
142+
102143class WriterStreamContext : public StreamContext {
103144 public:
104145 bool isNullStream = false ;
@@ -132,7 +173,7 @@ std::string_view encode(
132173 std::unique_ptr<EncodingSelectionPolicy<T>> policy;
133174 if (encodingLayout.has_value ()) {
134175 policy = std::make_unique<ReplayedEncodingSelectionPolicy<T>>(
135- encodingLayout.value (),
176+ std::move ( encodingLayout) .value (),
136177 context.options .compressionOptions ,
137178 context.options .encodingSelectionPolicyFactory );
138179
@@ -167,7 +208,7 @@ std::string_view encodeStreamTyped(
167208 }
168209
169210 try {
170- return encode<T>(encodingLayout, context, buffer, streamData);
211+ return encode<T>(std::move ( encodingLayout) , context, buffer, streamData);
171212 } catch (const NimbleUserError& e) {
172213 if (e.errorCode () != error_code::IncompatibleEncoding ||
173214 !encodingLayout.has_value ()) {
@@ -214,7 +255,8 @@ template <typename Set>
214255void findNodeIds (
215256 const velox::dwio::common::TypeWithId& typeWithId,
216257 Set& output,
217- std::function<bool (const velox::dwio::common::TypeWithId&)> predicate) {
258+ const std::function<bool (const velox::dwio::common::TypeWithId&)>&
259+ predicate) {
218260 if (predicate (typeWithId)) {
219261 output.insert (typeWithId.id ());
220262 }
@@ -515,7 +557,7 @@ bool VeloxWriter::write(const velox::VectorPtr& vector) {
515557 auto rawSize = nimble::getRawSizeFromVector (
516558 vector, velox::common::Ranges::of (0 , size));
517559 DWIO_ENSURE_GE (rawSize, 0 , " Invalid raw size" );
518- context_->rawSize += rawSize;
560+ context_->fileRawSize += rawSize;
519561
520562 if (context_->options .writeExecutor ) {
521563 velox::dwio::common::ExecutorBarrier barrier{
@@ -580,7 +622,8 @@ void VeloxWriter::close() {
580622 *context_->schemaBuilder .getRoot (), context_->columnStats );
581623 // TODO(T228118622): Write column stats to file.
582624 flatbuffers::FlatBufferBuilder builder;
583- builder.Finish (serialization::CreateStats (builder, context_->rawSize ));
625+ builder.Finish (
626+ serialization::CreateStats (builder, context_->fileRawSize ));
584627 writer_.writeOptionalSection (
585628 std::string (kStatsSection ),
586629 {reinterpret_cast <const char *>(builder.GetBufferPointer ()),
@@ -650,45 +693,6 @@ void VeloxWriter::writeChunk(bool lastChunk) {
650693 }
651694 streams_.resize (context_->schemaBuilder .nodeCount ());
652695
653- // When writing null streams, we write the nulls as data, and the stream
654- // itself is non-nullable. This adpater class is how we expose the nulls as
655- // values.
656- class NullsAsDataStreamData : public StreamData {
657- public:
658- explicit NullsAsDataStreamData (StreamData& streamData)
659- : StreamData(streamData.descriptor()), streamData_{streamData} {
660- streamData_.materialize ();
661- }
662-
663- inline virtual std::string_view data () const override {
664- return {
665- reinterpret_cast <const char *>(streamData_.nonNulls ().data ()),
666- streamData_.nonNulls ().size ()};
667- }
668-
669- inline virtual std::span<const bool > nonNulls () const override {
670- return {};
671- }
672-
673- inline virtual bool hasNulls () const override {
674- return false ;
675- }
676-
677- inline virtual bool empty () const override {
678- return streamData_.empty ();
679- }
680- inline virtual uint64_t memoryUsed () const override {
681- return streamData_.memoryUsed ();
682- }
683-
684- inline virtual void reset () override {
685- streamData_.reset ();
686- }
687-
688- private:
689- StreamData& streamData_;
690- };
691-
692696 auto encode = [&](StreamData& streamData, uint64_t & streamSize) {
693697 const auto offset = streamData.descriptor ().offset ();
694698 auto encoded = encodeStream (*context_, *encodingBuffer_, streamData);
@@ -777,8 +781,110 @@ void VeloxWriter::writeChunk(bool lastChunk) {
777781 if (lastChunk) {
778782 root_->reset ();
779783 }
784+ }
785+
786+ // Consider getting this from flush timing.
787+ auto flushWallTimeMs =
788+ (context_->stripeFlushTiming .wallNanos - previousFlushWallTime) /
789+ 1'000'000 ;
790+ VLOG (1 ) << " writeChunk milliseconds: " << flushWallTimeMs
791+ << " , chunk bytes: " << chunkSize;
792+ }
793+
794+ bool VeloxWriter::writeChunks (bool lastChunk) {
795+ uint64_t previousFlushWallTime = context_->stripeFlushTiming .wallNanos ;
796+ std::atomic<uint64_t > chunkSize = 0 ;
797+ std::atomic<uint64_t > logicalSizeBeforeEncoding = 0 ;
798+ std::atomic<bool > wroteChunk = false ;
799+ {
800+ LoggingScope scope{*context_->logger };
801+ velox::CpuWallTimer veloxTimer{context_->stripeFlushTiming };
802+
803+ if (!encodingBuffer_) {
804+ encodingBuffer_ = std::make_unique<Buffer>(*encodingMemoryPool_);
805+ }
806+ streams_.resize (context_->schemaBuilder .nodeCount ());
780807
781- context_->stripeSize += chunkSize;
808+ auto encode = [&](StreamData& streamData) {
809+ const auto * context =
810+ streamData.descriptor ().context <WriterStreamContext>();
811+ std::string_view encoded;
812+ if (context && context->isNullStream ) {
813+ encoded = encodeStream (
814+ *context_, *encodingBuffer_, NullsAsDataStreamData{streamData});
815+ } else {
816+ encoded = encodeStream (*context_, *encodingBuffer_, streamData);
817+ }
818+
819+ if (!encoded.empty ()) {
820+ const auto offset = streamData.descriptor ().offset ();
821+ NIMBLE_DASSERT (offset < streams_.size (), " Stream offset out of range." );
822+ auto & stream = streams_[offset];
823+ auto & streamSize = context_->columnStats [offset].physicalSize ;
824+ ChunkedStreamWriter chunkWriter{*encodingBuffer_};
825+ for (auto & buffer : chunkWriter.encode (encoded)) {
826+ streamSize += buffer.size ();
827+ chunkSize += buffer.size ();
828+ stream.content .push_back (std::move (buffer));
829+ }
830+ }
831+ wroteChunk = true ;
832+ logicalSizeBeforeEncoding += streamData.memoryUsed ();
833+ streamData.reset ();
834+ };
835+
836+ auto processStream = [&](StreamData& streamData) {
837+ // TODO: Breakdown large streams above a threshold into smaller chunks.
838+ // For null streams we will promote the null values to be written as
839+ // boolean data. We still apply the same null logic, where if all values
840+ // are non-nulls, we omit the entire stream.
841+ const auto minStreamSize =
842+ lastChunk ? 0 : context_->options .minStreamChunkRawSize ;
843+ const auto * context =
844+ streamData.descriptor ().context <WriterStreamContext>();
845+ bool isNullStream = context && context->isNullStream ;
846+
847+ bool shouldChunkStream = false ;
848+ if (isNullStream) {
849+ shouldChunkStream = streamData.hasNulls () &&
850+ streamData.nonNulls ().size () > minStreamSize;
851+ } else {
852+ shouldChunkStream = streamData.data ().size () > minStreamSize;
853+ }
854+
855+ // If we have previous written chunks for this stream, during final
856+ // chunk, always write any remaining data.
857+ if (lastChunk && !shouldChunkStream &&
858+ !streams_[streamData.descriptor ().offset ()].content .empty ()) {
859+ shouldChunkStream =
860+ !streamData.empty () || !streamData.nonNulls ().empty ();
861+ }
862+
863+ if (shouldChunkStream) {
864+ encode (streamData);
865+ }
866+ };
867+
868+ if (context_->options .encodingExecutor ) {
869+ velox::dwio::common::ExecutorBarrier barrier{
870+ context_->options .encodingExecutor };
871+ for (auto & streamData : context_->streams ()) {
872+ barrier.add ([&] { processStream (*streamData); });
873+ }
874+ barrier.waitAll ();
875+ } else {
876+ for (auto & streamData : context_->streams ()) {
877+ processStream (*streamData);
878+ }
879+ }
880+
881+ if (lastChunk) {
882+ root_->reset ();
883+ }
884+
885+ context_->stripeEncodedPhysicalSize += chunkSize;
886+ context_->stripeEncodedLogicalSize += logicalSizeBeforeEncoding;
887+ context_->memoryUsed -= logicalSizeBeforeEncoding;
782888 }
783889
784890 // Consider getting this from flush timing.
@@ -787,10 +893,16 @@ void VeloxWriter::writeChunk(bool lastChunk) {
787893 1'000'000 ;
788894 VLOG (1 ) << " writeChunk milliseconds: " << flushWallTimeMs
789895 << " , chunk bytes: " << chunkSize;
896+ return wroteChunk.load ();
790897}
791898
792899uint32_t VeloxWriter::writeStripe () {
793- writeChunk (true );
900+ if (context_->options .enableChunking ) {
901+ writeChunks (true );
902+
903+ } else {
904+ writeChunk (true );
905+ }
794906
795907 uint64_t previousFlushWallTime = context_->stripeFlushTiming .wallNanos ;
796908 uint64_t stripeSize = 0 ;
@@ -840,37 +952,43 @@ bool VeloxWriter::tryWriteStripe(bool force) {
840952 return false ;
841953 }
842954
955+ auto flushPolicy = context_->options .flushPolicyFactory ();
956+ NIMBLE_DASSERT (flushPolicy != nullptr , " Flush policy must not be null" );
957+
843958 auto shouldFlush = [&]() {
844- return context_-> flushPolicy ->shouldFlush (StripeProgress{
959+ return flushPolicy->shouldFlush (StripeProgress{
845960 .stripeRawSize = context_->memoryUsed ,
846- .stripeEncodedSize = context_->stripeSize });
961+ .stripeEncodedSize = context_->stripeEncodedPhysicalSize ,
962+ .stripeEncodedLogicalSize = context_->stripeEncodedLogicalSize });
847963 };
848964
849965 auto shouldChunk = [&]() {
850- return context_-> flushPolicy ->shouldChunk (StripeProgress{
966+ return flushPolicy->shouldChunk (StripeProgress{
851967 .stripeRawSize = context_->memoryUsed ,
852- .stripeEncodedSize = context_->stripeSize });
968+ .stripeEncodedSize = context_->stripeEncodedPhysicalSize ,
969+ .stripeEncodedLogicalSize = context_->stripeEncodedLogicalSize ,
970+ });
853971 };
854972
855973 try {
856974 // TODO: we can improve merge the last chunk write with stripe
857- if (context_->options .enableChunking &&
858- shouldChunk () == ChunkDecision::Chunk) {
859- writeChunk ( false );
975+ if (context_->options .enableChunking ) {
976+ while ( shouldChunk () == ChunkDecision::Chunk && writeChunks ( false ) ) {
977+ }
860978 }
861979
862980 auto decision = force ? FlushDecision::Stripe : shouldFlush ();
863981 if (decision != FlushDecision::Stripe) {
864982 return false ;
865983 }
866984
985+ uint32_t stripeSize = writeStripe ();
867986 StripeFlushMetrics metrics{
868- .inputSize = context_->stripeSize ,
987+ .inputSize = context_->stripeEncodedPhysicalSize ,
869988 .rowCount = context_->rowsInStripe ,
989+ .stripeSize = stripeSize,
870990 .trackedMemory = context_->memoryUsed ,
871991 };
872-
873- metrics.stripeSize = writeStripe ();
874992 context_->logger ->logStripeFlush (metrics);
875993
876994 context_->nextStripe ();
@@ -890,7 +1008,7 @@ VeloxWriter::RunStats VeloxWriter::getRunStats() const {
8901008 return RunStats{
8911009 .bytesWritten = context_->bytesWritten ,
8921010 .stripeCount = folly::to<uint32_t >(context_->getStripeIndex ()),
893- .rawSize = context_->rawSize ,
1011+ .rawSize = context_->fileRawSize ,
8941012 .rowsPerStripe = context_->rowsPerStripe ,
8951013 .flushCpuTimeUsec = context_->totalFlushTiming .cpuNanos / 1000 ,
8961014 .flushWallTimeUsec = context_->totalFlushTiming .wallNanos / 1000 ,
0 commit comments