@@ -156,6 +156,69 @@ std::string_view encode(
156156 }
157157}
158158
159+ template <typename T>
160+ std::string_view encode (
161+ std::optional<EncodingLayout> encodingLayout,
162+ detail::WriterContext& context,
163+ Buffer& buffer,
164+ const StreamDataView& streamData) {
165+ NIMBLE_DASSERT (
166+ streamData.data ().size () % sizeof (T) == 0 ,
167+ fmt::format (" Unexpected size {}" , streamData.data ().size ()));
168+ std::span<const T> data{
169+ reinterpret_cast <const T*>(streamData.data ().data ()),
170+ streamData.data ().size () / sizeof (T)};
171+
172+ std::unique_ptr<EncodingSelectionPolicy<T>> policy;
173+ if (encodingLayout.has_value ()) {
174+ policy = std::make_unique<ReplayedEncodingSelectionPolicy<T>>(
175+ encodingLayout.value (),
176+ context.options .compressionOptions ,
177+ context.options .encodingSelectionPolicyFactory );
178+
179+ } else {
180+ policy = std::unique_ptr<EncodingSelectionPolicy<T>>(
181+ static_cast <EncodingSelectionPolicy<T>*>(
182+ context.options
183+ .encodingSelectionPolicyFactory (TypeTraits<T>::dataType)
184+ .release ()));
185+ }
186+
187+ if (streamData.hasNulls ()) {
188+ std::span<const bool > notNulls = streamData.nonNulls ();
189+ return EncodingFactory::encodeNullable (
190+ std::move (policy), data, notNulls, buffer);
191+ } else {
192+ return EncodingFactory::encode (std::move (policy), data, buffer);
193+ }
194+ }
195+
196+ template <typename T>
197+ std::string_view encodeStreamTyped (
198+ detail::WriterContext& context,
199+ Buffer& buffer,
200+ const StreamDataView& streamData) {
201+ const auto * streamContext =
202+ streamData.descriptor ().context <WriterStreamContext>();
203+
204+ std::optional<EncodingLayout> encodingLayout;
205+ if (streamContext && streamContext->encoding ) {
206+ encodingLayout.emplace (*streamContext->encoding );
207+ }
208+
209+ try {
210+ return encode<T>(encodingLayout, context, buffer, streamData);
211+ } catch (const NimbleUserError& e) {
212+ if (e.errorCode () != error_code::IncompatibleEncoding ||
213+ !encodingLayout.has_value ()) {
214+ throw ;
215+ }
216+
217+ // Incompatible captured encoding.Try again without a captured encoding.
218+ return encode<T>(std::nullopt , context, buffer, streamData);
219+ }
220+ }
221+
159222template <typename T>
160223std::string_view encodeStreamTyped (
161224 detail::WriterContext& context,
@@ -213,6 +276,37 @@ std::string_view encodeStream(
213276 }
214277}
215278
279+ std::string_view encodeStream (
280+ detail::WriterContext& context,
281+ Buffer& buffer,
282+ const StreamDataView& streamData) {
283+ auto scalarKind = streamData.descriptor ().scalarKind ();
284+ switch (scalarKind) {
285+ case ScalarKind::Bool:
286+ return encodeStreamTyped<bool >(context, buffer, streamData);
287+ case ScalarKind::Int8:
288+ return encodeStreamTyped<int8_t >(context, buffer, streamData);
289+ case ScalarKind::Int16:
290+ return encodeStreamTyped<int16_t >(context, buffer, streamData);
291+ case ScalarKind::Int32:
292+ return encodeStreamTyped<int32_t >(context, buffer, streamData);
293+ case ScalarKind::UInt32:
294+ return encodeStreamTyped<uint32_t >(context, buffer, streamData);
295+ case ScalarKind::Int64:
296+ return encodeStreamTyped<int64_t >(context, buffer, streamData);
297+ case ScalarKind::Float:
298+ return encodeStreamTyped<float >(context, buffer, streamData);
299+ case ScalarKind::Double:
300+ return encodeStreamTyped<double >(context, buffer, streamData);
301+ case ScalarKind::String:
302+ case ScalarKind::Binary:
303+ return encodeStreamTyped<std::string_view>(context, buffer, streamData);
304+ default :
305+ NIMBLE_UNREACHABLE (
306+ fmt::format (" Unsupported scalar kind {}" , toString (scalarKind)));
307+ }
308+ }
309+
216310template <typename Set>
217311void findNodeIds (
218312 const velox::dwio::common::TypeWithId& typeWithId,
@@ -643,6 +737,7 @@ void VeloxWriter::flush() {
643737
644738bool VeloxWriter::writeChunk (
645739 bool lastChunk,
740+ bool encodeBelowMax,
646741 const std::unordered_set<uint32_t >& streamIndices) {
647742 uint64_t previousFlushWallTime = context_->stripeFlushTiming .wallNanos ;
648743 std::atomic<uint64_t > chunkSize = 0 ;
@@ -713,20 +808,48 @@ bool VeloxWriter::writeChunk(
713808
714809 auto encode = [&](StreamData& streamData, uint64_t & streamSize) {
715810 const auto offset = streamData.descriptor ().offset ();
716- auto encoded = encodeStream (*context_, *encodingBuffer_, streamData);
717- if (!encoded.empty ()) {
718- ChunkedStreamWriter chunkWriter{*encodingBuffer_};
719- NIMBLE_DASSERT (offset < streams_.size (), " Stream offset out of range." );
720- auto & stream = streams_[offset];
721- for (auto & buffer : chunkWriter.encode (encoded)) {
722- streamSize += buffer.size ();
723- chunkSize += buffer.size ();
724- stream.content .push_back (std::move (buffer));
811+ uint64_t streamSizeBeforeEncoding = streamData.memoryUsed ();
812+ auto writeEncoded = [&](std::string_view encoded) {
813+ if (!encoded.empty ()) {
814+ ChunkedStreamWriter chunkWriter{*encodingBuffer_};
815+ NIMBLE_DASSERT (
816+ offset < streams_.size (), " Stream offset out of range." );
817+ auto & stream = streams_[offset];
818+ for (auto & buffer : chunkWriter.encode (encoded)) {
819+ streamSize += buffer.size ();
820+ chunkSize += buffer.size ();
821+ stream.content .push_back (std::move (buffer));
822+ }
823+ }
824+ };
825+
826+ // Encoded large streams as smaller chunks.
827+ if (context_->options .enableChunking ) {
828+ while (auto chunkedStream = streamData.nextChunk (
829+ context_->options .maxStreamChunkRawSize )) {
830+ auto encoded =
831+ encodeStream (*context_, *encodingBuffer_, *chunkedStream);
832+ writeEncoded (encoded);
833+ }
834+
835+ // Encode small streams.
836+ if ((lastChunk || encodeBelowMax) && streamData.memoryUsed ()) {
837+ if (auto lastStreamDataChunk = streamData.lastChunk ()) {
838+ auto encoded =
839+ encodeStream (*context_, *encodingBuffer_, *lastStreamDataChunk);
840+ writeEncoded (encoded);
841+ }
842+ streamData.reset ();
725843 }
844+ } else {
845+ auto encoded = encodeStream (*context_, *encodingBuffer_, streamData);
846+ writeEncoded (encoded);
847+ streamData.reset ();
726848 }
849+
850+ logicalSizeBeforeEncoding +=
851+ streamSizeBeforeEncoding - streamData.memoryUsed ();
727852 wroteChunk = true ;
728- logicalSizeBeforeEncoding += streamData.memoryUsed ();
729- streamData.reset ();
730853 };
731854
732855 auto processStream = [&](StreamData& streamData,
@@ -894,38 +1017,66 @@ bool VeloxWriter::tryWriteStripe(bool force) {
8941017 // TODO: we can improve merge the last chunk write with stripe
8951018 if (context_->options .enableChunking &&
8961019 shouldChunk () == ChunkDecision::Chunk) {
1020+ ChunkDecision decision = ChunkDecision::Chunk;
8971021 const auto & streams = context_->streams ();
898- // Sort streams for chunking based on raw memory usage.
899- std::vector<uint32_t > streamIndices (streams.size ());
900- std::iota (streamIndices.begin (), streamIndices.end (), 0 );
901- std::sort (
902- streamIndices.begin (),
903- streamIndices.end (),
904- [&](const uint32_t & a, const uint32_t & b) {
905- return streams[a]->memoryUsed () > streams[b]->memoryUsed ();
906- });
1022+ auto batchChunkStreams = [&](const std::vector<uint32_t >& indices,
1023+ bool encodeBelowMax) {
1024+ uint32_t currentIndex = 0 ;
1025+ NIMBLE_DASSERT (
1026+ context_->options .chunkedStreamBatchSize > 0 ,
1027+ " streamEncodingBatchSize must be greater than 0" );
1028+ while (currentIndex < indices.size () &&
1029+ decision == ChunkDecision::Chunk) {
1030+ uint32_t endStreamIndex = std::min (
1031+ static_cast <uint32_t >(indices.size ()),
1032+ currentIndex + context_->options .chunkedStreamBatchSize );
1033+ std::unordered_set<uint32_t > streamIndicesToChunk (
1034+ indices.begin () + currentIndex, indices.begin () + endStreamIndex);
1035+ currentIndex = endStreamIndex;
1036+ // Stop attempting chunking once streams are too small to chunk.
1037+ if (!writeChunk (
1038+ /* lastChunk=*/ false ,
1039+ /* encodeBelowMax=*/ encodeBelowMax,
1040+ streamIndicesToChunk)) {
1041+ decision = ChunkDecision::None;
1042+ break ;
1043+ }
1044+ decision = shouldChunk ();
1045+ }
1046+ };
1047+
1048+ // Chunk streams above maxStreamChunkRawSize to relieve memory pressure
1049+ auto chunkStreamsAboveMaxSize = [&]() {
1050+ std::vector<uint32_t > streamsAboveMaxChunkSize;
1051+ for (auto streamIndex = 0 ; streamIndex < streams.size ();
1052+ streamIndex++) {
1053+ if (streams[streamIndex]->memoryUsed () >=
1054+ context_->options .maxStreamChunkRawSize ) {
1055+ streamsAboveMaxChunkSize.push_back (streamIndex);
1056+ }
1057+ }
1058+ batchChunkStreams (streamsAboveMaxChunkSize, /* encodeBelowMax=*/ false );
1059+ };
9071060
908- // Chunk streams in batches.
909- uint32_t currentIndex = 0 ;
910- ChunkDecision decision = ChunkDecision::Chunk;
911- NIMBLE_DASSERT (
912- context_->options .chunkedStreamBatchSize > 0 ,
913- " streamEncodingBatchSize must be greater than 0" );
914- while (currentIndex < streams.size () &&
915- decision == ChunkDecision::Chunk) {
916- uint32_t endStreamIndex = std::min (
917- static_cast <uint32_t >(streams.size ()),
918- currentIndex + context_->options .chunkedStreamBatchSize );
919- std::unordered_set<uint32_t > streamIndicesToChunk (
920- streamIndices.begin () + currentIndex,
921- streamIndices.begin () + endStreamIndex);
922- currentIndex = endStreamIndex;
923- // Stop attempting chunking once streams are too small to chunk.
924- if (!writeChunk (false , streamIndicesToChunk)) {
925- break ;
1061+ // Relieve memory pressure by chunking small streams.
1062+ auto chunkSmallStreams = [&]() {
1063+ if (decision != ChunkDecision::Chunk) {
1064+ return ;
9261065 }
927- decision = shouldChunk ();
928- }
1066+ // Sort streams for chunking based on raw memory usage.
1067+ std::vector<uint32_t > streamIndices (streams.size ());
1068+ std::iota (streamIndices.begin (), streamIndices.end (), 0 );
1069+ std::sort (
1070+ streamIndices.begin (),
1071+ streamIndices.end (),
1072+ [&](const uint32_t & a, const uint32_t & b) {
1073+ return streams[a]->memoryUsed () > streams[b]->memoryUsed ();
1074+ });
1075+ batchChunkStreams (streamIndices, /* encodeBelowMax=*/ true );
1076+ };
1077+
1078+ chunkStreamsAboveMaxSize ();
1079+ chunkSmallStreams ();
9291080 }
9301081
9311082 auto decision = force ? FlushDecision::Stripe : shouldFlush ();
0 commit comments