1616
1717#pragma once
1818
19+ #include < memory>
1920#include < span>
2021#include < string_view>
2122
2223#include " dwio/nimble/common/Vector.h"
2324#include " dwio/nimble/velox/SchemaBuilder.h"
24- #include " velox/common/memory/Memory.h"
2525
2626namespace facebook ::nimble {
2727
@@ -46,6 +46,9 @@ class StreamData {
4646 virtual void reset () = 0;
4747 virtual void materialize () {}
4848
49+ // Break the stream data into chunks, each respecting the maximum size limit.
50+ virtual std::unique_ptr<StreamData> popChunk (uint64_t maxChunkSize) = 0;
51+
4952 const StreamDescriptorBuilder& descriptor () const {
5053 return descriptor_;
5154 }
@@ -64,13 +67,35 @@ class ContentStreamData final : public StreamData {
6467 ContentStreamData (
6568 velox::memory::MemoryPool& memoryPool,
6669 const StreamDescriptorBuilder& descriptor)
67- : StreamData(descriptor), data_{&memoryPool}, extraMemory_{0 } {}
70+ : StreamData(descriptor),
71+ data_{&memoryPool},
72+ extraMemory_{0 },
73+ memoryPool_{memoryPool} {}
6874
6975 inline virtual std::string_view data () const override {
7076 return {
7177 reinterpret_cast <const char *>(data_.data ()), data_.size () * sizeof (T)};
7278 }
7379
80+ inline virtual std::unique_ptr<StreamData> popChunk (
81+ uint64_t maxChunkSize) override {
82+ size_t chunkSize = maxChunkSize / sizeof (T);
83+ if (chunkSize > data_.size ()) {
84+ return nullptr ;
85+ }
86+
87+ Vector<T> dataToBeReturned (
88+ &memoryPool_, data_.begin (), data_.begin () + chunkSize);
89+ Vector<T> dataToBePreserved (
90+ &memoryPool_, data_.begin () + chunkSize, data_.end ());
91+ auto chunk =
92+ std::make_unique<ContentStreamData<T>>(memoryPool_, descriptor ());
93+ chunk->mutableData () = std::move (dataToBeReturned);
94+ chunk->extraMemory_ = extraMemory_;
95+ data_ = std::move (dataToBePreserved);
96+ return chunk;
97+ }
98+
7499 inline virtual std::span<const bool > nonNulls () const override {
75100 return {};
76101 }
@@ -103,6 +128,7 @@ class ContentStreamData final : public StreamData {
103128 private:
104129 Vector<T> data_;
105130 uint64_t extraMemory_;
131+ velox::memory::MemoryPool& memoryPool_;
106132};
107133
108134// Nulls only data stream.
@@ -119,12 +145,38 @@ class NullsStreamData : public StreamData {
119145 : StreamData(descriptor),
120146 nonNulls_{&memoryPool},
121147 hasNulls_{false },
122- bufferedCount_{0 } {}
148+ bufferedCount_{0 },
149+ memoryPool_{memoryPool} {}
123150
124151 inline virtual std::string_view data () const override {
125152 return {};
126153 }
127154
155+ inline virtual std::unique_ptr<StreamData> popChunk (
156+ uint64_t maxChunkSize) override {
157+ uint64_t chunkSize = maxChunkSize / sizeof (bool );
158+ if (chunkSize > bufferedCount_) {
159+ return nullptr ;
160+ }
161+ materialize ();
162+
163+ Vector<bool > nonNullsToBeReturned (
164+ &memoryPool_, nonNulls_.begin (), nonNulls_.begin () + chunkSize);
165+ Vector<bool > nonNullsToBePreserved (
166+ &memoryPool_, nonNulls_.begin () + chunkSize, nonNulls_.end ());
167+
168+ auto chunk = std::make_unique<NullsStreamData>(memoryPool_, descriptor ());
169+ chunk->mutableNonNulls () = std::move (nonNullsToBeReturned);
170+ chunk->bufferedCount_ = static_cast <uint32_t >(chunkSize);
171+
172+ nonNulls_ = std::move (nonNullsToBePreserved);
173+ bufferedCount_ -= chunkSize;
174+
175+ // Assume hasNulls_ is equal for both chunks to avoid costly iteration.
176+ chunk->hasNulls_ = hasNulls_;
177+ return chunk;
178+ }
179+
128180 inline virtual std::span<const bool > nonNulls () const override {
129181 return nonNulls_;
130182 }
@@ -166,6 +218,7 @@ class NullsStreamData : public StreamData {
166218 Vector<bool > nonNulls_;
167219 bool hasNulls_;
168220 uint32_t bufferedCount_;
221+ velox::memory::MemoryPool& memoryPool_;
169222};
170223
171224// Nullable content data stream.
@@ -185,6 +238,47 @@ class NullableContentStreamData final : public NullsStreamData {
185238 reinterpret_cast <const char *>(data_.data ()), data_.size () * sizeof (T)};
186239 }
187240
241+ inline virtual std::unique_ptr<StreamData> popChunk (
242+ uint64_t maxChunkSize) override {
243+ uint64_t chunkSize = maxChunkSize / (sizeof (T) + sizeof (bool ));
244+ if (chunkSize > data_.size ()) {
245+ return nullptr ;
246+ }
247+
248+ auto chunk = std::make_unique<NullableContentStreamData<T>>(
249+ memoryPool_, descriptor ());
250+
251+ // Chunk Content
252+ {
253+ Vector<T> dataToBeReturned (
254+ &memoryPool_, data_.begin (), data_.begin () + chunkSize);
255+ Vector<T> dataToBePreserved (
256+ &memoryPool_, data_.begin () + chunkSize, data_.end ());
257+ chunk->mutableData () = std::move (dataToBeReturned);
258+ chunk->extraMemory_ = extraMemory_;
259+ data_ = std::move (dataToBePreserved);
260+ }
261+
262+ // Chunk Nulls
263+ {
264+ NullsStreamData::materialize ();
265+ Vector<bool > nonNullsToBeReturned (
266+ &memoryPool_, nonNulls_.begin (), nonNulls_.begin () + chunkSize);
267+ Vector<bool > nonNullsToBePreserved (
268+ &memoryPool_, nonNulls_.begin () + chunkSize, nonNulls_.end ());
269+
270+ chunk->mutableNonNulls () = std::move (nonNullsToBeReturned);
271+ chunk->bufferedCount_ = static_cast <uint32_t >(chunkSize);
272+
273+ nonNulls_ = std::move (nonNullsToBePreserved);
274+ bufferedCount_ -= chunkSize;
275+
276+ // Assume hasNulls_ is equal for both chunks to avoid costly iteration.
277+ chunk->hasNulls_ = hasNulls_;
278+ }
279+ return chunk;
280+ }
281+
188282 inline virtual bool empty () const override {
189283 return NullsStreamData::empty () && data_.empty ();
190284 }
@@ -213,4 +307,96 @@ class NullableContentStreamData final : public NullsStreamData {
213307 uint64_t extraMemory_;
214308};
215309
310+ // Template specialization for std::string_view to handle extra memory properly
311+ template <>
312+ inline std::unique_ptr<StreamData>
313+ ContentStreamData<std::string_view>::popChunk(uint64_t maxChunkSize) {
314+ if (memoryUsed () < maxChunkSize) {
315+ return nullptr ;
316+ }
317+
318+ size_t chunkSize = 0 ;
319+ uint64_t rollingChunkSize = 0 ;
320+ uint64_t rollingExtraMemory = 0 ;
321+ for (size_t i = 0 ; i < data_.size (); ++i) {
322+ const auto & str = data_[i];
323+ uint64_t strSize = str.size () + sizeof (std::string_view);
324+ if (rollingChunkSize + strSize > maxChunkSize) {
325+ break ;
326+ }
327+ rollingExtraMemory += str.size ();
328+ rollingChunkSize += strSize;
329+ chunkSize = i + 1 ;
330+ }
331+
332+ Vector<std::string_view> dataToBeReturned (
333+ &memoryPool_, data_.begin (), data_.begin () + chunkSize);
334+ Vector<std::string_view> dataToBePreserved (
335+ &memoryPool_, data_.begin () + chunkSize, data_.end ());
336+ auto chunk = std::make_unique<ContentStreamData<std::string_view>>(
337+ memoryPool_, descriptor ());
338+ chunk->mutableData () = std::move (dataToBeReturned);
339+ data_ = std::move (dataToBePreserved);
340+
341+ chunk->extraMemory_ = rollingExtraMemory;
342+ extraMemory_ -= rollingExtraMemory;
343+ return chunk;
344+ }
345+
346+ template <>
347+ inline std::unique_ptr<StreamData>
348+ NullableContentStreamData<std::string_view>::popChunk(uint64_t maxChunkSize) {
349+ if (memoryUsed () < maxChunkSize) {
350+ return nullptr ;
351+ }
352+
353+ size_t chunkSize = 0 ;
354+ uint64_t rollingChunkSize = 0 ;
355+ uint64_t rollingExtraMemory = 0 ;
356+ for (size_t i = 0 ; i < data_.size (); ++i) {
357+ const auto & str = data_[i];
358+ uint64_t strSize = str.size () + sizeof (std::string_view) + sizeof (bool );
359+ if (rollingChunkSize + strSize > maxChunkSize) {
360+ break ;
361+ }
362+ rollingExtraMemory += str.size ();
363+ rollingChunkSize += strSize;
364+ chunkSize = i + 1 ;
365+ }
366+
367+ auto chunk = std::make_unique<NullableContentStreamData<std::string_view>>(
368+ memoryPool_, descriptor ());
369+ // Chunk content
370+ {
371+ Vector<std::string_view> dataToBeReturned (
372+ &memoryPool_, data_.begin (), data_.begin () + chunkSize);
373+ Vector<std::string_view> dataToBePreserved (
374+ &memoryPool_, data_.begin () + chunkSize, data_.end ());
375+ chunk->mutableData () = std::move (dataToBeReturned);
376+ data_ = std::move (dataToBePreserved);
377+
378+ chunk->extraMemory_ = rollingExtraMemory;
379+ extraMemory_ -= rollingExtraMemory;
380+ }
381+
382+ // Chunk Nulls
383+ {
384+ NullsStreamData::materialize ();
385+ Vector<bool > nonNullsToBeReturned (
386+ &memoryPool_, nonNulls_.begin (), nonNulls_.begin () + chunkSize);
387+ Vector<bool > nonNullsToBePreserved (
388+ &memoryPool_, nonNulls_.begin () + chunkSize, nonNulls_.end ());
389+
390+ chunk->mutableNonNulls () = std::move (nonNullsToBeReturned);
391+ chunk->bufferedCount_ = static_cast <uint32_t >(chunkSize);
392+
393+ nonNulls_ = std::move (nonNullsToBePreserved);
394+ bufferedCount_ -= chunkSize;
395+
396+ // Assume hasNulls_ is equal for both chunks to avoid costly iteration.
397+ chunk->hasNulls_ = hasNulls_;
398+ }
399+ return chunk;
400+ }
401+
216402} // namespace facebook::nimble
0 commit comments