Skip to content

Commit df2c18e

Browse files
macvincentmeta-codesync[bot]
authored andcommitted
Support Chunking in StreamData (facebookincubator#248)
Summary: Pull Request resolved: facebookincubator#248 Another feature in the new chunking policy described in this [doc](https://fburl.com/gdoc/gkdwwju1) is the ability to split large streams above a specified limit into smaller chunks. In this diff, we implement a `popChunk` method in each `StreamData` class to handle this functionality. With this feature we are not forced to encode extremely large streams into a single chunk. Integration will happen in the next diff. Differential Revision: D81824143
1 parent ca44263 commit df2c18e

File tree

5 files changed

+1797
-1
lines changed

5 files changed

+1797
-1
lines changed
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "dwio/nimble/velox/StreamChunker.h"
18+
19+
namespace facebook::nimble {
20+
bool omitStream(
21+
const StreamData& streamData,
22+
uint64_t minChunkSize,
23+
bool isNullStream,
24+
bool hasEmptyStreamContent,
25+
bool isLastChunk) {
26+
bool shouldChunkStream;
27+
minChunkSize = isLastChunk ? 0 : minChunkSize;
28+
if (isNullStream) {
29+
// When all values are non-nulls, we omit the entire null stream.
30+
shouldChunkStream =
31+
streamData.hasNulls() && streamData.nonNulls().size() > minChunkSize;
32+
if (!shouldChunkStream && !hasEmptyStreamContent) {
33+
shouldChunkStream = !streamData.empty();
34+
}
35+
} else {
36+
// When all values are null, the values stream is omitted.
37+
shouldChunkStream = streamData.data().size() > minChunkSize;
38+
if (!shouldChunkStream && !hasEmptyStreamContent) {
39+
shouldChunkStream = !streamData.nonNulls().empty();
40+
}
41+
}
42+
43+
return !shouldChunkStream;
44+
}
45+
46+
template <typename T>
47+
std::unique_ptr<StreamChunker> getStreamChunkerTyped(
48+
StreamData& streamData,
49+
uint64_t maxChunkSize,
50+
uint64_t minChunkSize,
51+
bool ensureFullChunks,
52+
bool emptyStreamContent,
53+
bool isNullStream,
54+
bool isLastChunk) {
55+
const auto& streamDataType = streamData.type();
56+
if (streamDataType == ContentStreamData<T>::TYPE_NAME) {
57+
return std::make_unique<ContentStreamChunker<T>>(
58+
static_cast<ContentStreamData<T>&>(streamData),
59+
maxChunkSize,
60+
minChunkSize,
61+
ensureFullChunks,
62+
isLastChunk);
63+
} else if (streamDataType == NullsStreamData::TYPE_NAME) {
64+
return std::make_unique<NullsStreamChunker>(
65+
static_cast<NullsStreamData&>(streamData),
66+
maxChunkSize,
67+
minChunkSize,
68+
ensureFullChunks,
69+
emptyStreamContent,
70+
isNullStream,
71+
isLastChunk);
72+
} else if (streamDataType == NullableContentStreamData<T>::TYPE_NAME) {
73+
return std::make_unique<NullableContentStreamChunker<T>>(
74+
static_cast<NullableContentStreamData<T>&>(streamData),
75+
maxChunkSize,
76+
minChunkSize,
77+
ensureFullChunks,
78+
emptyStreamContent,
79+
isNullStream,
80+
isLastChunk);
81+
}
82+
NIMBLE_UNREACHABLE(
83+
fmt::format("Unsupported streamData type {}", streamDataType))
84+
}
85+
86+
std::unique_ptr<StreamChunker> getStreamChunker(
87+
StreamData& streamData,
88+
uint64_t maxChunkSize,
89+
uint64_t minChunkSize,
90+
bool ensureFullChunks,
91+
bool emptyStreamContent,
92+
bool isNullStream,
93+
bool isLastChunk) {
94+
const auto scalarKind = streamData.descriptor().scalarKind();
95+
switch (scalarKind) {
96+
#define HANDLE_SCALAR_KIND(kind, type) \
97+
case ScalarKind::kind: \
98+
return getStreamChunkerTyped<type>( \
99+
streamData, \
100+
maxChunkSize, \
101+
minChunkSize, \
102+
ensureFullChunks, \
103+
emptyStreamContent, \
104+
isNullStream, \
105+
isLastChunk);
106+
HANDLE_SCALAR_KIND(Bool, bool);
107+
HANDLE_SCALAR_KIND(Int8, int8_t);
108+
HANDLE_SCALAR_KIND(Int16, int16_t);
109+
HANDLE_SCALAR_KIND(Int32, int32_t);
110+
HANDLE_SCALAR_KIND(UInt32, uint32_t);
111+
HANDLE_SCALAR_KIND(Int64, int64_t);
112+
HANDLE_SCALAR_KIND(Float, float);
113+
HANDLE_SCALAR_KIND(Double, double);
114+
HANDLE_SCALAR_KIND(String, std::string_view);
115+
HANDLE_SCALAR_KIND(Binary, std::string_view);
116+
default:
117+
NIMBLE_UNREACHABLE(
118+
fmt::format("Unsupported scalar kind {}", toString(scalarKind)));
119+
}
120+
}
121+
} // namespace facebook::nimble

0 commit comments

Comments
 (0)