diff --git a/dwio/nimble/common/MetricsLogger.h b/dwio/nimble/common/MetricsLogger.h index 63efd935..0e53dce7 100644 --- a/dwio/nimble/common/MetricsLogger.h +++ b/dwio/nimble/common/MetricsLogger.h @@ -74,6 +74,7 @@ struct FileCloseMetrics { }; enum class LogOperation { + StripeWrite, StripeLoad, StripeFlush, FileClose, diff --git a/dwio/nimble/velox/CMakeLists.txt b/dwio/nimble/velox/CMakeLists.txt index cc2870c1..ec86bfd1 100644 --- a/dwio/nimble/velox/CMakeLists.txt +++ b/dwio/nimble/velox/CMakeLists.txt @@ -153,6 +153,7 @@ add_library( VeloxWriter.cpp ChunkedStreamWriter.cpp VeloxWriterDefaultMetadataOSS.cpp + StreamChunker.cpp ) target_link_libraries( nimble_velox_writer diff --git a/dwio/nimble/velox/FlushPolicy.cpp b/dwio/nimble/velox/FlushPolicy.cpp index 84e3d628..2caf682e 100644 --- a/dwio/nimble/velox/FlushPolicy.cpp +++ b/dwio/nimble/velox/FlushPolicy.cpp @@ -16,15 +16,35 @@ #include "dwio/nimble/velox/FlushPolicy.h" namespace facebook::nimble { - -FlushDecision RawStripeSizeFlushPolicy::shouldFlush( - const StripeProgress& stripeProgress) { - return stripeProgress.rawStripeSize >= rawStripeSize_ ? FlushDecision::Stripe - : FlushDecision::None; +// Relieve memory pressure with chunking. +bool ChunkFlushPolicy::shouldChunk(const StripeProgress& stripeProgress) { + const uint64_t inMemoryBytes = + stripeProgress.stripeRawSize + stripeProgress.stripeEncodedSize; + const auto writerMemoryThreshold = (lastChunkDecision_ == false) + ? config_.writerMemoryHighThresholdBytes + : config_.writerMemoryLowThresholdBytes; + lastChunkDecision_ = inMemoryBytes > writerMemoryThreshold; + return lastChunkDecision_; } -void RawStripeSizeFlushPolicy::onClose() { - // No-op +// Optimize for expected storage stripe size. +bool ChunkFlushPolicy::shouldFlush(const StripeProgress& stripeProgress) { + // When chunking is unable to relieve memory pressure, we flush stripe. + if (stripeProgress.stripeRawSize + stripeProgress.stripeEncodedSize > + config_.writerMemoryHighThresholdBytes) { + return true; + } + + double compressionFactor = config_.estimatedCompressionFactor; + // Use historical compression ratio as a heuristic when available. + if (stripeProgress.stripeEncodedSize > 0) { + compressionFactor = + static_cast(stripeProgress.stripeEncodedLogicalSize) / + stripeProgress.stripeEncodedSize; + } + double expectedEncodedStripeSize = stripeProgress.stripeEncodedSize + + stripeProgress.stripeRawSize / std::max(compressionFactor, 1.0); + return (expectedEncodedStripeSize >= config_.targetStripeSizeBytes); } } // namespace facebook::nimble diff --git a/dwio/nimble/velox/FlushPolicy.h b/dwio/nimble/velox/FlushPolicy.h index 1ad86671..241801ef 100644 --- a/dwio/nimble/velox/FlushPolicy.h +++ b/dwio/nimble/velox/FlushPolicy.h @@ -20,59 +20,89 @@ namespace facebook::nimble { +// TODO: Set default values for these parameters based on DISCO experiments. +// Use abitrary values for now. +struct ChunkFlushPolicyConfig { + // Threshold to trigger chunking to relieve memory pressure + const uint64_t writerMemoryHighThresholdBytes{200 * 1024L * 1024L}; + // Threshold below which chunking stops and stripe size optimization resumes + const uint64_t writerMemoryLowThresholdBytes{100 * 1024L * 1024L}; + // Target size for encoded stripes + const uint64_t targetStripeSizeBytes{100 * 1024L * 1024L}; + // Expected ratio of raw to encoded data + const double estimatedCompressionFactor{1.3}; +}; + struct StripeProgress { // Size of the stripe data when it's fully decompressed and decoded - const uint64_t rawStripeSize; + const uint64_t stripeRawSize; // Size of the stripe after buffered data is encoded and optionally compressed - const uint64_t stripeSize; - // Size of the allocated buffer in the writer - const uint64_t bufferSize; -}; - -enum class FlushDecision : uint8_t { - None = 0, - Stripe = 1, - Chunk = 2, + const uint64_t stripeEncodedSize; + // Logical size of the now encoded stripe data + const uint64_t stripeEncodedLogicalSize; }; class FlushPolicy { public: virtual ~FlushPolicy() = default; - virtual FlushDecision shouldFlush(const StripeProgress& stripeProgress) = 0; - // Required for memory pressure coordination for now. Will remove in the - // future. - virtual void onClose() = 0; + virtual bool shouldFlush(const StripeProgress& stripeProgress) = 0; + virtual bool shouldChunk(const StripeProgress& stripeProgress) = 0; }; -class RawStripeSizeFlushPolicy final : public FlushPolicy { +class StripeRawSizeFlushPolicy final : public FlushPolicy { public: - explicit RawStripeSizeFlushPolicy(uint64_t rawStripeSize) - : rawStripeSize_{rawStripeSize} {} + explicit StripeRawSizeFlushPolicy(uint64_t stripeRawSize) + : stripeRawSize_{stripeRawSize} {} - FlushDecision shouldFlush(const StripeProgress& stripeProgress) override; + bool shouldFlush(const StripeProgress& stripeProgress) override { + return stripeProgress.stripeRawSize >= stripeRawSize_; + } - void onClose() override; + bool shouldChunk(const StripeProgress&) override { + return false; + } private: - const uint64_t rawStripeSize_; + const uint64_t stripeRawSize_; }; class LambdaFlushPolicy : public FlushPolicy { public: explicit LambdaFlushPolicy( - std::function lambda) - : lambda_{lambda} {} + std::function flushLambda = + [](const StripeProgress&) { return false; }, + std::function chunkLambda = + [](const StripeProgress&) { return false; }) + : flushLambda_{std::move(flushLambda)}, + chunkLambda_{std::move(chunkLambda)} {} - FlushDecision shouldFlush(const StripeProgress& stripeProgress) override { - return lambda_(stripeProgress); + bool shouldFlush(const StripeProgress& stripeProgress) override { + return flushLambda_(stripeProgress); } - void onClose() override { - // No-op + bool shouldChunk(const StripeProgress& stripeProgress) override { + return chunkLambda_(stripeProgress); } private: - std::function lambda_; + std::function flushLambda_; + std::function chunkLambda_; +}; + +class ChunkFlushPolicy : public FlushPolicy { + public: + explicit ChunkFlushPolicy(ChunkFlushPolicyConfig config) + : config_{std::move(config)}, lastChunkDecision_{false} {} + + // Optimize for expected storage stripe size. + bool shouldFlush(const StripeProgress& stripeProgress) override; + + // Relieve memory pressure with chunking. + bool shouldChunk(const StripeProgress& stripeProgress) override; + + private: + const ChunkFlushPolicyConfig config_; + bool lastChunkDecision_; }; } // namespace facebook::nimble diff --git a/dwio/nimble/velox/StreamChunker.cpp b/dwio/nimble/velox/StreamChunker.cpp new file mode 100644 index 00000000..4caf6d3e --- /dev/null +++ b/dwio/nimble/velox/StreamChunker.cpp @@ -0,0 +1,63 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "dwio/nimble/velox/StreamChunker.h" + +namespace facebook::nimble { +template +std::unique_ptr getStreamChunkerTyped( + StreamData& streamData, + const StreamChunkerOptions& options) { + if (auto* contentStreamChunker = + dynamic_cast*>(&streamData)) { + return std::make_unique>( + *contentStreamChunker, options); + } else if ( + auto* nullableContentStreamData = + dynamic_cast*>(&streamData)) { + return std::make_unique>( + *nullableContentStreamData, options); + } else if ( + auto* nullsStreamData = dynamic_cast(&streamData)) { + return std::make_unique(*nullsStreamData, options); + } + NIMBLE_UNREACHABLE(fmt::format("Unsupported streamData type")) +} + +std::unique_ptr getStreamChunker( + StreamData& streamData, + const StreamChunkerOptions& options) { + const auto scalarKind = streamData.descriptor().scalarKind(); + switch (scalarKind) { +#define HANDLE_SCALAR_KIND(kind, type) \ + case ScalarKind::kind: \ + return getStreamChunkerTyped(streamData, options); + HANDLE_SCALAR_KIND(Bool, bool); + HANDLE_SCALAR_KIND(Int8, int8_t); + HANDLE_SCALAR_KIND(Int16, int16_t); + HANDLE_SCALAR_KIND(Int32, int32_t); + HANDLE_SCALAR_KIND(UInt32, uint32_t); + HANDLE_SCALAR_KIND(Int64, int64_t); + HANDLE_SCALAR_KIND(Float, float); + HANDLE_SCALAR_KIND(Double, double); + HANDLE_SCALAR_KIND(String, std::string_view); + HANDLE_SCALAR_KIND(Binary, std::string_view); + default: + NIMBLE_UNREACHABLE( + fmt::format("Unsupported scalar kind {}", toString(scalarKind))); + } +} +} // namespace facebook::nimble diff --git a/dwio/nimble/velox/StreamChunker.h b/dwio/nimble/velox/StreamChunker.h new file mode 100644 index 00000000..7e24b9cc --- /dev/null +++ b/dwio/nimble/velox/StreamChunker.h @@ -0,0 +1,538 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "dwio/nimble/velox/StreamData.h" + +namespace facebook::nimble { +namespace detail { +// When we create new buffers, it's likely that these buffers will end up +// growing to the size that previously triggered chunking. As a trafeoff between +// minimizing reallocations (for performance) and actually releasing memory (to +// relieve memory pressure), we heuristically set the new buffer size to be +// maxChunkSize_ plus 50% of the difference between the current buffer capacity +// and maxChunkSize_. This ensures the buffer is not too small (causing frequent +// reallocations) nor too large (wasting memory). +template +uint64_t getNewBufferCapacity( + uint64_t maxChunkSize, + uint64_t currentCapacityCount, + uint64_t requiredCapacityCount) { + const auto maxChunkElementCount = maxChunkSize / sizeof(T); + if (currentCapacityCount < maxChunkElementCount) { + return currentCapacityCount; + } + NIMBLE_DASSERT( + requiredCapacityCount <= currentCapacityCount, + "Required capacity should not exceed current capacity"); + return maxChunkElementCount + + (currentCapacityCount - maxChunkElementCount) * 0.5; +} + +inline bool shouldOmitDataStream( + const StreamData& streamData, + uint64_t minChunkSize, + bool isEmptyStreamContent, + bool isLastChunk) { + minChunkSize = isLastChunk ? 0 : minChunkSize; + // When all values are null, the values stream is omitted. + if (streamData.data().size() > minChunkSize) { + return false; + } + return isEmptyStreamContent || streamData.nonNulls().empty(); +} + +inline bool shouldOmitNullStream( + const StreamData& streamData, + uint64_t minChunkSize, + bool isEmptyStreamContent, + bool isLastChunk) { + minChunkSize = isLastChunk ? 0 : minChunkSize; + // When all values are non-nulls, we omit the entire null stream. + if (streamData.hasNulls() && streamData.nonNulls().size() > minChunkSize) { + return false; + } + return isEmptyStreamContent || streamData.empty(); +} +} // namespace detail + +/** + * Options for configuring StreamChunker behavior. + */ +struct StreamChunkerOptions { + uint64_t minChunkSize; + uint64_t maxChunkSize; + bool ensureFullChunks; + bool isEmptyStreamContent; + bool isLastChunk; +}; + +/** + * Breaks streams into manageable chunks based on size thresholds. + * The `compact()` method should be called after use to reclaim memory from + * processed StreamData values. + */ +class StreamChunker { + public: + StreamChunker() = default; + + // Returns the next chunk of stream data. + virtual std::optional next() = 0; + + // Erases processed data to reclaim memory. + virtual void compact() = 0; + + virtual ~StreamChunker() = default; + + protected: + // Helper struct to hold result of nextChunkSize helper methods. + struct ChunkSize { + size_t dataElementCount = 0; + size_t nullElementCount = 0; + size_t rollingChunkSize = 0; + uint64_t extraMemory = 0; + }; +}; + +std::unique_ptr getStreamChunker( + StreamData& streamData, + const StreamChunkerOptions& options); + +template +class ContentStreamChunker final : public StreamChunker { + public: + explicit ContentStreamChunker( + ContentStreamData& streamData, + const StreamChunkerOptions& options) + : streamData_{streamData}, + minChunkSize_{options.isLastChunk ? 0 : options.minChunkSize}, + maxChunkSize_{options.maxChunkSize}, + dataElementOffset_{0}, + extraMemory_{streamData_.extraMemory()}, + ensureFullChunks_{ + options.isLastChunk ? false : options.ensureFullChunks} {} + + std::optional next() override { + const auto& chunkSize = nextChunkSize(); + if (chunkSize.rollingChunkSize == 0) { + return std::nullopt; + } + + std::string_view dataChunk = { + reinterpret_cast( + streamData_.mutableData().data() + dataElementOffset_), + chunkSize.dataElementCount * sizeof(T)}; + dataElementOffset_ += chunkSize.dataElementCount; + extraMemory_ -= chunkSize.extraMemory; + + return StreamDataView{ + streamData_.descriptor(), + dataChunk, + /*nonNulls=*/std::span{}, + /*hasNulls=*/false}; + } + + private: + ChunkSize nextChunkSize() { + const size_t maxChunkValuesCount = maxChunkSize_ / sizeof(T); + const size_t remainingValuesCount = + streamData_.mutableData().size() - dataElementOffset_; + if ((ensureFullChunks_ && remainingValuesCount < maxChunkValuesCount) || + (remainingValuesCount < minChunkSize_ / sizeof(T))) { + return ChunkSize{}; + } + const size_t chunkValuesCount = + std::min(maxChunkValuesCount, remainingValuesCount); + return ChunkSize{ + .dataElementCount = chunkValuesCount, + .rollingChunkSize = chunkValuesCount * sizeof(T)}; + } + + void compact() override { + auto& currentData = streamData_.mutableData(); + const uint64_t remainingDataCount = currentData.size() - dataElementOffset_; + const auto newDataCapactity = detail::getNewBufferCapacity( + maxChunkSize_, currentData.capacity(), remainingDataCount); + + // Move and clear existing buffer + auto tempData = std::move(currentData); + streamData_.reset(); + NIMBLE_DASSERT( + streamData_.empty(), "StreamData should be empty after reset"); + + auto& mutableData = streamData_.mutableData(); + mutableData.reserve(newDataCapactity); + NIMBLE_DASSERT( + mutableData.capacity() >= newDataCapactity, + "Data buffer capacity should be at least new capacity"); + + mutableData.resize(remainingDataCount); + NIMBLE_DASSERT( + mutableData.size() == remainingDataCount, + "Data buffer size should be equal to remaining data count"); + + std::copy_n( + tempData.begin() + dataElementOffset_, + remainingDataCount, + mutableData.begin()); + dataElementOffset_ = 0; + streamData_.extraMemory() = extraMemory_; + } + + ContentStreamData& streamData_; + uint64_t minChunkSize_; + uint64_t maxChunkSize_; + size_t dataElementOffset_; + size_t extraMemory_; + bool ensureFullChunks_; +}; + +template <> +inline StreamChunker::ChunkSize +ContentStreamChunker::nextChunkSize() { + const auto& data = streamData_.mutableData(); + size_t stringCount = 0; + uint64_t rollingChunkSize = 0; + uint64_t rollingExtraMemory = 0; + for (size_t i = dataElementOffset_; i < data.size(); ++i) { + const auto& str = data[i]; + const uint64_t strSize = str.size() + sizeof(std::string_view); + if (rollingChunkSize + strSize > maxChunkSize_) { + break; + } + + rollingExtraMemory += str.size(); + rollingChunkSize += strSize; + ++stringCount; + } + + if ((ensureFullChunks_ && rollingChunkSize < maxChunkSize_) || + (rollingChunkSize < minChunkSize_)) { + return ChunkSize{}; + } + + return ChunkSize{ + .dataElementCount = stringCount, + .rollingChunkSize = rollingChunkSize, + .extraMemory = rollingExtraMemory}; +} + +class NullsStreamChunker final : public StreamChunker { + public: + explicit NullsStreamChunker( + NullsStreamData& streamData, + const StreamChunkerOptions& options) + : streamData_{streamData}, + minChunkSize_{options.isLastChunk ? 0 : options.minChunkSize}, + maxChunkSize_{options.maxChunkSize}, + nonNullsOffset_{0}, + omitStream_{detail::shouldOmitNullStream( + streamData, + options.minChunkSize, + options.isEmptyStreamContent, + options.isLastChunk)}, + ensureFullChunks_{ + options.isLastChunk ? false : options.ensureFullChunks} { + static_assert(sizeof(bool) == 1); + streamData.materialize(); + } + + std::optional next() override { + if (omitStream_) { + return std::nullopt; + } + + auto& nonNulls = streamData_.mutableNonNulls(); + size_t remainingNonNulls = nonNulls.size() - nonNullsOffset_; + size_t nonNullsInChunk = std::min(maxChunkSize_, remainingNonNulls); + if (nonNullsInChunk == 0 || nonNullsInChunk < minChunkSize_ || + (ensureFullChunks_ && nonNullsInChunk < maxChunkSize_)) { + return std::nullopt; + } + + // Null stream values are converted to boolean data for encoding. + std::string_view dataChunk = { + reinterpret_cast(nonNulls.data() + nonNullsOffset_), + nonNullsInChunk}; + nonNullsOffset_ += nonNullsInChunk; + + return StreamDataView{ + streamData_.descriptor(), + dataChunk, + /*nonNulls=*/std::span{}, + /*hasNulls=*/false}; + } + + private: + void compact() override { + if (omitStream_) { + return; + } + auto& nonNulls = streamData_.mutableNonNulls(); + const auto remainingNonNullsCount = nonNulls.size() - nonNullsOffset_; + const auto newCapacity = detail::getNewBufferCapacity( + maxChunkSize_, nonNulls.capacity(), remainingNonNullsCount); + + // Move and clear existing buffer + auto tempNonNulls = std::move(nonNulls); + const bool hasNulls = streamData_.hasNulls(); + streamData_.reset(); + NIMBLE_DASSERT( + streamData_.empty(), "StreamData should be empty after reset"); + + auto& mutableNonNulls = streamData_.mutableNonNulls(); + mutableNonNulls.reserve(newCapacity); + NIMBLE_DASSERT( + mutableNonNulls.capacity() >= newCapacity, + "NonNulls buffer capacity should be at least new capacity"); + + streamData_.ensureAdditionalNullsCapacity( + hasNulls, static_cast(remainingNonNullsCount)); + if (hasNulls) { + mutableNonNulls.resize(remainingNonNullsCount); + NIMBLE_DASSERT( + mutableNonNulls.size() == remainingNonNullsCount, + "NonNulls buffer size should be equal to remaining non-nulls count"); + + std::copy_n( + tempNonNulls.begin() + nonNullsOffset_, + remainingNonNullsCount, + mutableNonNulls.begin()); + } + nonNullsOffset_ = 0; + } + + NullsStreamData& streamData_; + uint64_t minChunkSize_; + uint64_t maxChunkSize_; + size_t nonNullsOffset_; + bool omitStream_; + bool ensureFullChunks_; +}; + +template +class NullableContentStreamChunker final : public StreamChunker { + public: + explicit NullableContentStreamChunker( + NullableContentStreamData& streamData, + const StreamChunkerOptions& options) + : streamData_{streamData}, + minChunkSize_{options.isLastChunk ? 0 : options.minChunkSize}, + maxChunkSize_{options.maxChunkSize}, + dataElementOffset_{0}, + nonNullsOffset_{0}, + extraMemory_{streamData_.extraMemory()}, + omitStream_{detail::shouldOmitDataStream( + streamData, + options.minChunkSize, + options.isEmptyStreamContent, + options.isLastChunk)}, + ensureFullChunks_{ + options.isLastChunk ? false : options.ensureFullChunks} { + static_assert(sizeof(bool) == 1); + streamData.materialize(); + } + + std::optional next() override { + if (omitStream_) { + return std::nullopt; + } + const auto& chunkSize = nextChunkSize(); + if (chunkSize.rollingChunkSize == 0) { + return std::nullopt; + } + + // Chunk content + std::string_view dataChunk = { + reinterpret_cast( + streamData_.mutableData().data() + dataElementOffset_), + chunkSize.dataElementCount * sizeof(T)}; + + // Chunk nulls + std::span nonNullsChunk( + streamData_.mutableNonNulls().data() + nonNullsOffset_, + chunkSize.nullElementCount); + + dataElementOffset_ += chunkSize.dataElementCount; + nonNullsOffset_ += chunkSize.nullElementCount; + extraMemory_ -= chunkSize.extraMemory; + + return StreamDataView{ + streamData_.descriptor(), + dataChunk, + nonNullsChunk, + chunkSize.nullElementCount && + chunkSize.nullElementCount > chunkSize.dataElementCount}; + } + + private: + ChunkSize nextChunkSize() { + const auto& nonNulls = streamData_.mutableNonNulls(); + const auto bufferedCount = nonNulls.size(); + + size_t dataSize = 0; + size_t nullSize = 0; + uint64_t rollingChunkSize = 0; + + // Calculate how many entries we can fit in the chunk + for (size_t i = nonNullsOffset_; i < bufferedCount; ++i) { + uint64_t entrySize = sizeof(bool); // Always account for null indicator + if (nonNulls[i]) { + entrySize += sizeof(T); // Add data size if non-null + } + + if (rollingChunkSize + entrySize > maxChunkSize_) { + break; + } + + if (nonNulls[i]) { + dataSize += 1; + } + nullSize += 1; + rollingChunkSize += entrySize; + } + + if ((ensureFullChunks_ && rollingChunkSize < maxChunkSize_) || + (rollingChunkSize < minChunkSize_)) { + return ChunkSize{}; + } + + return ChunkSize{ + .dataElementCount = dataSize, + .nullElementCount = nullSize, + .rollingChunkSize = rollingChunkSize, + .extraMemory = 0}; + } + + void compact() override { + if (omitStream_) { + return; + } + auto& currentNonNulls = streamData_.mutableNonNulls(); + auto& currentData = streamData_.mutableData(); + const auto remainingNonNullsCount = + currentNonNulls.size() - nonNullsOffset_; + const auto remainingDataCount = currentData.size() - dataElementOffset_; + + const auto newNonNullsCapacity = detail::getNewBufferCapacity( + maxChunkSize_, currentNonNulls.capacity(), remainingNonNullsCount); + const auto newDataCapactity = detail::getNewBufferCapacity( + maxChunkSize_, currentData.capacity(), remainingDataCount); + + // Move and clear existing buffers + auto tempNonNulls = std::move(currentNonNulls); + auto tempData = std::move(currentData); + const bool hasNulls = streamData_.hasNulls(); + streamData_.reset(); + NIMBLE_DASSERT( + streamData_.empty(), "StreamData should be empty after reset"); + + auto& mutableData = streamData_.mutableData(); + mutableData.reserve(newDataCapactity); + NIMBLE_DASSERT( + mutableData.capacity() >= newDataCapactity, + "Data buffer capacity should be at least new capacity"); + + mutableData.resize(remainingDataCount); + NIMBLE_DASSERT( + mutableData.size() == remainingDataCount, + "Data buffer size should be equal to remaining data count"); + + std::copy_n( + tempData.begin() + dataElementOffset_, + remainingDataCount, + mutableData.begin()); + + auto& mutableNonNulls = streamData_.mutableNonNulls(); + mutableNonNulls.reserve(newNonNullsCapacity); + NIMBLE_DASSERT( + mutableNonNulls.capacity() >= newNonNullsCapacity, + "NonNulls buffer capacity should be at least new capacity"); + + streamData_.ensureAdditionalNullsCapacity( + hasNulls, static_cast(remainingNonNullsCount)); + if (hasNulls) { + mutableNonNulls.resize(remainingNonNullsCount); + NIMBLE_DASSERT( + mutableNonNulls.size() == remainingNonNullsCount, + "NonNulls buffer size should be equal to remaining non-nulls count"); + + std::copy_n( + tempNonNulls.begin() + nonNullsOffset_, + remainingNonNullsCount, + mutableNonNulls.begin()); + } + streamData_.extraMemory() = extraMemory_; + dataElementOffset_ = 0; + nonNullsOffset_ = 0; + } + + NullableContentStreamData& streamData_; + uint64_t minChunkSize_; + uint64_t maxChunkSize_; + size_t dataElementOffset_; + size_t nonNullsOffset_; + size_t extraMemory_; + bool omitStream_; + bool ensureFullChunks_; +}; + +template <> +inline StreamChunker::ChunkSize +NullableContentStreamChunker::nextChunkSize() { + const auto& data = streamData_.mutableData(); + const auto& nonNulls = streamData_.mutableNonNulls(); + const auto bufferedCount = nonNulls.size(); + + size_t dataSize = 0; + size_t nullSize = 0; + uint64_t rollingChunkSize = 0; + uint64_t extraMemory = 0; + + // Calculate how many entries we can fit in the chunk + for (size_t i = nonNullsOffset_; i < bufferedCount; ++i) { + uint64_t strSize = 0; + if (nonNulls[i]) { + const auto& str = data[dataElementOffset_ + dataSize]; + strSize = str.size() + sizeof(std::string_view); + } + + if (rollingChunkSize + sizeof(bool) + strSize > maxChunkSize_) { + break; + } + + rollingChunkSize += sizeof(bool) + strSize; + nullSize += 1; + + if (nonNulls[i]) { + extraMemory += data[dataElementOffset_ + dataSize].size(); + dataSize += 1; + } + } + + if ((ensureFullChunks_ && rollingChunkSize < maxChunkSize_) || + (rollingChunkSize < minChunkSize_)) { + return ChunkSize{}; + } + + return ChunkSize{ + .dataElementCount = dataSize, + .nullElementCount = nullSize, + .rollingChunkSize = rollingChunkSize, + .extraMemory = extraMemory}; +} +} // namespace facebook::nimble diff --git a/dwio/nimble/velox/StreamData.cpp b/dwio/nimble/velox/StreamData.cpp index ac35d29d..a1cd6626 100644 --- a/dwio/nimble/velox/StreamData.cpp +++ b/dwio/nimble/velox/StreamData.cpp @@ -21,6 +21,10 @@ namespace facebook::nimble { void NullsStreamData::ensureAdditionalNullsCapacity( bool mayHaveNulls, uint64_t additionalSize) { + if (additionalSize == 0) { + return; + } + if (mayHaveNulls || hasNulls_) { const auto newSize = bufferedCount_ + additionalSize; ensureDataCapacity(nonNulls_, newSize); diff --git a/dwio/nimble/velox/StreamData.h b/dwio/nimble/velox/StreamData.h index 8e103d24..05f0a307 100644 --- a/dwio/nimble/velox/StreamData.h +++ b/dwio/nimble/velox/StreamData.h @@ -77,6 +77,66 @@ class MutableStreamData : public StreamData { const InputBufferGrowthPolicy& growthPolicy_; }; +// Represents a view into a chunk of stream data. +// Provides a lightweight, non-owning view into a portion of stream data, +// containing references to the data content and null indicators for efficient +// processing of large streams without copying data. +class StreamDataView final : public StreamData { + public: + StreamDataView( + const StreamDescriptorBuilder& descriptor, + std::string_view data, + std::span nonNulls, + bool hasNulls) + : StreamData(descriptor), + data_{data}, + nonNulls_{nonNulls}, + hasNulls_{hasNulls} {} + + StreamDataView(StreamDataView&& other) noexcept + : StreamData(other.descriptor()), + data_{other.data_}, + nonNulls_{other.nonNulls_}, + hasNulls_{other.hasNulls_} {} + + StreamDataView(const StreamDataView&) = delete; + + StreamDataView& operator=(const StreamDataView&) = delete; + + StreamDataView& operator=(StreamDataView&& other) noexcept = delete; + + ~StreamDataView() override = default; + + std::string_view data() const override { + return data_; + } + + std::span nonNulls() const override { + return nonNulls_; + } + + bool hasNulls() const override { + return hasNulls_; + } + + uint64_t memoryUsed() const override { + NIMBLE_UNREACHABLE("StreamDataView is non-owning"); + } + + bool empty() const override { + return data_.empty() && nonNulls_.empty(); + } + + void reset() override { + NIMBLE_UNREACHABLE("StreamDataView is non-owning"); + } + + private: + const std::string_view data_; + const std::span nonNulls_; + bool hasNulls_; +}; + // Content only data stream. // Used when a stream doesn't contain nulls. template diff --git a/dwio/nimble/velox/VeloxWriter.cpp b/dwio/nimble/velox/VeloxWriter.cpp index 82f87a2c..4fb55dab 100644 --- a/dwio/nimble/velox/VeloxWriter.cpp +++ b/dwio/nimble/velox/VeloxWriter.cpp @@ -33,7 +33,7 @@ #include "dwio/nimble/velox/SchemaSerialization.h" #include "dwio/nimble/velox/SchemaTypes.h" #include "dwio/nimble/velox/StatsGenerated.h" -#include "folly/ScopeGuard.h" +#include "dwio/nimble/velox/StreamChunker.h" #include "velox/common/time/CpuWallTimer.h" #include "velox/dwio/common/ExecutorBarrier.h" #include "velox/type/Type.h" @@ -45,7 +45,6 @@ namespace detail { class WriterContext : public FieldWriterContext { public: const VeloxWriterOptions options; - std::unique_ptr flushPolicy; velox::CpuWallTiming totalFlushTiming; velox::CpuWallTiming stripeFlushTiming; velox::CpuWallTiming encodingSelectionTiming; @@ -56,9 +55,12 @@ class WriterContext : public FieldWriterContext { uint64_t memoryUsed{0}; uint64_t bytesWritten{0}; uint64_t rowsInFile{0}; - uint64_t rowsInStripe{0}; - uint64_t stripeSize{0}; - uint64_t rawSize{0}; + uint32_t rowsInStripe{0}; + // Physical size of the encoded stripe data. + uint64_t stripeEncodedPhysicalSize{0}; + // Logical size of the encoded stripe data. + uint64_t stripeEncodedLogicalSize{0}; + uint64_t fileRawSize{0}; std::vector rowsPerStripe; WriterContext( @@ -67,7 +69,6 @@ class WriterContext : public FieldWriterContext { : FieldWriterContext{memoryPool, options.reclaimerFactory(), options.vectorDecoderVisitor}, options{std::move(options)}, logger{this->options.metricsLogger} { - flushPolicy = this->options.flushPolicyFactory(); inputBufferGrowthPolicy = this->options.lowMemoryMode ? std::make_unique() : this->options.inputGrowthPolicyFactory(); @@ -82,7 +83,8 @@ class WriterContext : public FieldWriterContext { rowsPerStripe.push_back(rowsInStripe); memoryUsed = 0; rowsInStripe = 0; - stripeSize = 0; + stripeEncodedPhysicalSize = 0; + stripeEncodedLogicalSize = 0; ++stripeIndex_; } @@ -100,6 +102,45 @@ namespace { constexpr uint32_t kInitialSchemaSectionSize = 1 << 20; // 1MB +// When writing null streams, we write the nulls as data, and the stream itself +// is non-nullable. This adapter class is how we expose the nulls as values. +class NullsAsDataStreamData : public StreamData { + public: + explicit NullsAsDataStreamData(StreamData& streamData) + : StreamData(streamData.descriptor()), streamData_{streamData} { + streamData_.materialize(); + } + + inline virtual std::string_view data() const override { + return { + reinterpret_cast(streamData_.nonNulls().data()), + streamData_.nonNulls().size()}; + } + + inline virtual std::span nonNulls() const override { + return {}; + } + + inline virtual bool hasNulls() const override { + return false; + } + + inline virtual bool empty() const override { + return streamData_.empty(); + } + + inline virtual uint64_t memoryUsed() const override { + return streamData_.memoryUsed(); + } + + inline virtual void reset() override { + streamData_.reset(); + } + + private: + StreamData& streamData_; +}; + class WriterStreamContext : public StreamContext { public: bool isNullStream = false; @@ -133,7 +174,7 @@ std::string_view encode( std::unique_ptr> policy; if (encodingLayout.has_value()) { policy = std::make_unique>( - encodingLayout.value(), + std::move(encodingLayout.value()), context.options.compressionOptions, context.options.encodingSelectionPolicyFactory); @@ -168,7 +209,7 @@ std::string_view encodeStreamTyped( } try { - return encode(encodingLayout, context, buffer, streamData); + return encode(std::move(encodingLayout), context, buffer, streamData); } catch (const NimbleUserError& e) { if (e.errorCode() != error_code::IncompatibleEncoding || !encodingLayout.has_value()) { @@ -215,7 +256,8 @@ template void findNodeIds( const velox::dwio::common::TypeWithId& typeWithId, Set& output, - std::function predicate) { + const std::function& + predicate) { if (predicate(typeWithId)) { output.insert(typeWithId.id()); } @@ -516,7 +558,7 @@ bool VeloxWriter::write(const velox::VectorPtr& vector) { auto rawSize = nimble::getRawSizeFromVector( vector, velox::common::Ranges::of(0, size)); DWIO_ENSURE_GE(rawSize, 0, "Invalid raw size"); - context_->rawSize += rawSize; + context_->fileRawSize += rawSize; if (context_->options.writeExecutor) { velox::dwio::common::ExecutorBarrier barrier{ @@ -537,9 +579,16 @@ bool VeloxWriter::write(const velox::VectorPtr& vector) { context_->rowsInStripe += size; context_->bytesWritten = file_->size(); - return tryWriteStripe(); + return evalauateFlushPolicy(); + } catch (const std::exception& e) { + lastException_ = std::current_exception(); + context_->logger->logException(LogOperation::StripeWrite, e.what()); + throw; } catch (...) { lastException_ = std::current_exception(); + context_->logger->logException( + LogOperation::StripeWrite, + folly::to(folly::exceptionStr(std::current_exception()))); throw; } } @@ -551,8 +600,6 @@ void VeloxWriter::close() { if (file_) { try { - auto exitGuard = - folly::makeGuard([this]() { context_->flushPolicy->onClose(); }); flush(); root_->close(); @@ -583,7 +630,8 @@ void VeloxWriter::close() { *context_->schemaBuilder.getRoot(), context_->columnStats); // TODO(T228118622): Write column stats to file. flatbuffers::FlatBufferBuilder builder; - builder.Finish(serialization::CreateStats(builder, context_->rawSize)); + builder.Finish( + serialization::CreateStats(builder, context_->fileRawSize)); writer_.writeOptionalSection( std::string(kStatsSection), {reinterpret_cast(builder.GetBufferPointer()), @@ -634,9 +682,16 @@ void VeloxWriter::flush() { } try { - tryWriteStripe(true); + writeStripe(); + } catch (const std::exception& e) { + lastException_ = std::current_exception(); + context_->logger->logException(LogOperation::StripeFlush, e.what()); + throw; } catch (...) { lastException_ = std::current_exception(); + context_->logger->logException( + LogOperation::StripeFlush, + folly::to(folly::exceptionStr(std::current_exception()))); throw; } } @@ -653,45 +708,6 @@ void VeloxWriter::writeChunk(bool lastChunk) { } streams_.resize(context_->schemaBuilder.nodeCount()); - // When writing null streams, we write the nulls as data, and the stream - // itself is non-nullable. This adapter class is how we expose the nulls as - // values. - class NullsAsDataStreamData : public StreamData { - public: - explicit NullsAsDataStreamData(StreamData& streamData) - : StreamData(streamData.descriptor()), streamData_{streamData} { - streamData_.materialize(); - } - - inline virtual std::string_view data() const override { - return { - reinterpret_cast(streamData_.nonNulls().data()), - streamData_.nonNulls().size()}; - } - - inline virtual std::span nonNulls() const override { - return {}; - } - - inline virtual bool hasNulls() const override { - return false; - } - - inline virtual bool empty() const override { - return streamData_.empty(); - } - inline virtual uint64_t memoryUsed() const override { - return streamData_.memoryUsed(); - } - - inline virtual void reset() override { - streamData_.reset(); - } - - private: - StreamData& streamData_; - }; - auto encode = [&](StreamData& streamData, uint64_t& streamSize) { const auto offset = streamData.descriptor().offset(); auto encoded = encodeStream(*context_, *encodingBuffer_, streamData); @@ -780,8 +796,87 @@ void VeloxWriter::writeChunk(bool lastChunk) { if (lastChunk) { root_->reset(); } + } + + // Consider getting this from flush timing. + auto flushWallTimeMs = + (context_->stripeFlushTiming.wallNanos - previousFlushWallTime) / + 1'000'000; + VLOG(1) << "writeChunk milliseconds: " << flushWallTimeMs + << ", chunk bytes: " << chunkSize; +} + +bool VeloxWriter::writeChunks( + std::span streamIndices, + bool ensureFullChunks, + bool lastChunk) { + uint64_t previousFlushWallTime = context_->stripeFlushTiming.wallNanos; + std::atomic chunkSize = 0; + std::atomic logicalSizeBeforeEncoding = 0; + std::atomic wroteChunk = false; + { + LoggingScope scope{*context_->logger}; + velox::CpuWallTimer veloxTimer{context_->stripeFlushTiming}; + + if (!encodingBuffer_) { + encodingBuffer_ = std::make_unique(*encodingMemoryPool_); + } + streams_.resize(context_->schemaBuilder.nodeCount()); + + auto processStream = [&](StreamData& streamData) { + const auto& offset = streamData.descriptor().offset(); + auto& streamSize = context_->columnStats[offset].physicalSize; + logicalSizeBeforeEncoding += streamData.memoryUsed(); + auto& streamContent = streams_[offset].content; + auto chunker = getStreamChunker( + streamData, + StreamChunkerOptions{ + .minChunkSize = context_->options.minStreamChunkRawSize, + .maxChunkSize = context_->options.maxStreamChunkRawSize, + .ensureFullChunks = ensureFullChunks, + .isEmptyStreamContent = streamContent.empty(), + .isLastChunk = lastChunk}); + while (auto streamDataView = chunker->next()) { + std::string_view encoded = + encodeStream(*context_, *encodingBuffer_, *streamDataView); + if (!encoded.empty()) { + ChunkedStreamWriter chunkWriter{*encodingBuffer_}; + for (auto& buffer : chunkWriter.encode(encoded)) { + streamSize += buffer.size(); + chunkSize += buffer.size(); + streamContent.push_back(std::move(buffer)); + } + } + wroteChunk = true; + } + // Compact erases processed stream data to reclaim memory. + chunker->compact(); + logicalSizeBeforeEncoding -= streamData.memoryUsed(); + }; + + const auto& streams = context_->streams(); + if (context_->options.encodingExecutor) { + velox::dwio::common::ExecutorBarrier barrier{ + context_->options.encodingExecutor}; + for (auto streamIndex : streamIndices) { + auto& streamData = streams[streamIndex]; + barrier.add([&] { processStream(*streamData); }); + } + barrier.waitAll(); + } else { + for (auto streamIndex : streamIndices) { + auto& streamData = streams[streamIndex]; + processStream(*streamData); + } + } + + if (lastChunk) { + root_->reset(); + } - context_->stripeSize += chunkSize; + context_->stripeEncodedPhysicalSize += chunkSize; + context_->stripeEncodedLogicalSize += logicalSizeBeforeEncoding; + context_->memoryUsed -= logicalSizeBeforeEncoding; } // Consider getting this from flush timing. @@ -790,10 +885,22 @@ void VeloxWriter::writeChunk(bool lastChunk) { 1'000'000; VLOG(1) << "writeChunk milliseconds: " << flushWallTimeMs << ", chunk bytes: " << chunkSize; + return wroteChunk; } -uint32_t VeloxWriter::writeStripe() { - writeChunk(true); +bool VeloxWriter::writeStripe() { + if (context_->rowsInStripe == 0) { + return false; + } + + if (context_->options.enableChunking) { + // Chunk all streams. + std::vector streamIndices(context_->streams().size()); + std::iota(streamIndices.begin(), streamIndices.end(), 0); + writeChunks(streamIndices, /*ensureFullChunks=*/false, /*lastChunk=*/true); + } else { + writeChunk(true); + } uint64_t previousFlushWallTime = context_->stripeFlushTiming.wallNanos; uint64_t stripeSize = 0; @@ -835,66 +942,95 @@ uint32_t VeloxWriter::writeStripe() { VLOG(1) << "writeStripe milliseconds: " << flushWallTimeMs << ", on disk stripe bytes: " << stripeSize; - return static_cast(stripeSize); + StripeFlushMetrics metrics{ + .inputSize = context_->stripeEncodedPhysicalSize, + .rowCount = context_->rowsInStripe, + .stripeSize = stripeSize, + .trackedMemory = context_->memoryUsed, + }; + context_->logger->logStripeFlush(metrics); + + context_->nextStripe(); + return true; } -bool VeloxWriter::tryWriteStripe(bool force) { - if (context_->rowsInStripe == 0) { - return false; - } +bool VeloxWriter::evalauateFlushPolicy() { + auto flushPolicy = context_->options.flushPolicyFactory(); + NIMBLE_DASSERT(flushPolicy != nullptr, "Flush policy must not be null"); auto shouldFlush = [&]() { - return context_->flushPolicy->shouldFlush(StripeProgress{ - .rawStripeSize = context_->memoryUsed, - .stripeSize = context_->stripeSize, - .bufferSize = - static_cast(context_->bufferMemoryPool->usedBytes()), + return flushPolicy->shouldFlush(StripeProgress{ + .stripeRawSize = context_->memoryUsed, + .stripeEncodedSize = context_->stripeEncodedPhysicalSize, + .stripeEncodedLogicalSize = context_->stripeEncodedLogicalSize}); + }; + + auto shouldChunk = [&]() { + return flushPolicy->shouldChunk(StripeProgress{ + .stripeRawSize = context_->memoryUsed, + .stripeEncodedSize = context_->stripeEncodedPhysicalSize, + .stripeEncodedLogicalSize = context_->stripeEncodedLogicalSize, }); }; - auto decision = force ? FlushDecision::Stripe : shouldFlush(); - if (decision == FlushDecision::None) { - return false; - } + if (context_->options.enableChunking && shouldChunk()) { + auto batchChunkStreams = [&](const std::vector& indices, + bool ensureFullChunks) { + const size_t indicesCount = indices.size(); + const auto batchSize = context_->options.chunkedStreamBatchSize; + for (size_t index = 0; index < indicesCount; index += batchSize) { + size_t currentBatchSize = std::min(batchSize, indicesCount - index); + std::span batchIndices( + indices.begin() + index, currentBatchSize); + // Stop attempting chunking once streams are too small to chunk or + // memory pressure is relieved. + if (!writeChunks(batchIndices, ensureFullChunks) || !shouldChunk()) { + return false; + } + } + return true; + }; - try { - // TODO: we can improve merge the last chunk write with stripe - if (decision == FlushDecision::Chunk && context_->options.enableChunking) { - writeChunk(false); - decision = shouldFlush(); + // Relieve memory pressure by chunking streams above max size. + const auto& streams = context_->streams(); + std::vector streamIndices; + streamIndices.reserve(streams.size()); + for (auto streamIndex = 0; streamIndex < streams.size(); ++streamIndex) { + if (streams[streamIndex]->memoryUsed() >= + context_->options.maxStreamChunkRawSize) { + streamIndices.push_back(streamIndex); + } } - - if (decision != FlushDecision::Stripe) { - return false; + const bool continueChunking = + batchChunkStreams(streamIndices, /*ensureFullChunks=*/true); + if (continueChunking) { + // Relieve memory pressure by chunking small streams. + // Sort streams for chunking based on raw memory usage. + // TODO(T240072104): Improve performance by bucketing the streams + // by size (by most significant bit) instead of sorting them. + streamIndices.resize(streams.size()); + std::iota(streamIndices.begin(), streamIndices.end(), 0); + std::sort( + streamIndices.begin(), + streamIndices.end(), + [&](const uint32_t& a, const uint32_t& b) { + return streams[a]->memoryUsed() > streams[b]->memoryUsed(); + }); + batchChunkStreams(streamIndices, /*ensureFullChunks=*/false); } + } - StripeFlushMetrics metrics{ - .inputSize = context_->stripeSize, - .rowCount = context_->rowsInStripe, - .trackedMemory = context_->memoryUsed, - }; - - metrics.stripeSize = writeStripe(); - context_->logger->logStripeFlush(metrics); - - context_->nextStripe(); - return true; - } catch (const std::exception& e) { - context_->logger->logException(LogOperation::StripeFlush, e.what()); - throw; - } catch (...) { - context_->logger->logException( - LogOperation::StripeFlush, - folly::to(folly::exceptionStr(std::current_exception()))); - throw; + if (shouldFlush()) { + return writeStripe(); } + return false; } VeloxWriter::RunStats VeloxWriter::getRunStats() const { return RunStats{ .bytesWritten = context_->bytesWritten, .stripeCount = folly::to(context_->getStripeIndex()), - .rawSize = context_->rawSize, + .rawSize = context_->fileRawSize, .rowsPerStripe = context_->rowsPerStripe, .flushCpuTimeUsec = context_->totalFlushTiming.cpuNanos / 1000, .flushWallTimeUsec = context_->totalFlushTiming.wallNanos / 1000, diff --git a/dwio/nimble/velox/VeloxWriter.h b/dwio/nimble/velox/VeloxWriter.h index 2d90fa09..1df7191d 100644 --- a/dwio/nimble/velox/VeloxWriter.h +++ b/dwio/nimble/velox/VeloxWriter.h @@ -84,10 +84,16 @@ class VeloxWriter { std::exception_ptr lastException_; const velox::common::SpillConfig* const spillConfig_; + // Returning 'true' if stripe was flushed. + bool evalauateFlushPolicy(); // Returning 'true' if stripe was written. - bool tryWriteStripe(bool force = false); + bool writeStripe(); void writeChunk(bool lastChunk = true); - uint32_t writeStripe(); + // Returns 'true' if chunks were written. + bool writeChunks( + std::span streamIndices, + bool ensureFullChunks = false, + bool lastChunk = false); }; } // namespace facebook::nimble diff --git a/dwio/nimble/velox/VeloxWriterOptions.h b/dwio/nimble/velox/VeloxWriterOptions.h index 4fbc842b..5a6f1b0b 100644 --- a/dwio/nimble/velox/VeloxWriterOptions.h +++ b/dwio/nimble/velox/VeloxWriterOptions.h @@ -96,6 +96,15 @@ struct VeloxWriterOptions { // Note: this threshold is ignored when it is time to flush a stripe. uint64_t minStreamChunkRawSize = 1024; + // Number of streams to try chunking between memory pressure evaluations. + // Note: this is ignored when it is time to flush a stripe. + size_t chunkedStreamBatchSize = 1024; + + // When flushing data streams into chunks, streams with raw data size larger + // than this threshold will be broken down into multiple smaller chunks. Each + // chunk will be at most this size. + uint64_t maxStreamChunkRawSize = 4 << 20; + // The factory function that produces the root encoding selection policy. // Encoding selection policy is the way to balance the tradeoffs of // different performance factors (at both read and write times). Heuristics @@ -109,7 +118,7 @@ struct VeloxWriterOptions { // Provides policy that controls stripe sizes and memory footprint. std::function()> flushPolicyFactory = []() { // Buffering 256MB data before encoding stripes. - return std::make_unique(256 << 20); + return std::make_unique(256 << 20); }; // When the writer needs to buffer data, and internal buffers don't have diff --git a/dwio/nimble/velox/selective/tests/E2EFilterTest.cpp b/dwio/nimble/velox/selective/tests/E2EFilterTest.cpp index 11adf53d..18756c7e 100644 --- a/dwio/nimble/velox/selective/tests/E2EFilterTest.cpp +++ b/dwio/nimble/velox/selective/tests/E2EFilterTest.cpp @@ -128,14 +128,11 @@ class E2EFilterTest : public dwio::common::E2EFilterTestBase { writeSchema_ = rowType_; VeloxWriterOptions options; options.enableChunking = true; - options.flushPolicyFactory = [] { + auto i = 0; + options.flushPolicyFactory = [&] { return std::make_unique( - [i = 0](const StripeProgress&) mutable { - if (i++ % 3 == 2) { - return FlushDecision::Stripe; - } - return FlushDecision::Chunk; - }); + [&](const StripeProgress&) { return (i++ % 3 == 2) ? true : false; }, + [&](const StripeProgress&) { return (i++ % 3 == 2) ? true : false; }); }; if (!flatMapColumns_.empty()) { setUpFlatMapColumns(); diff --git a/dwio/nimble/velox/selective/tests/SelectiveNimbleReaderTest.cpp b/dwio/nimble/velox/selective/tests/SelectiveNimbleReaderTest.cpp index 3c98fde5..0d8fabea 100644 --- a/dwio/nimble/velox/selective/tests/SelectiveNimbleReaderTest.cpp +++ b/dwio/nimble/velox/selective/tests/SelectiveNimbleReaderTest.cpp @@ -566,7 +566,8 @@ TEST_F(SelectiveNimbleReaderTest, multiChunkNulls) { options.minStreamChunkRawSize = 0; options.flushPolicyFactory = [] { return std::make_unique( - [](const StripeProgress&) { return FlushDecision::Chunk; }); + /*flushLambda=*/[](const StripeProgress&) { return false; }, + /*chunkLambda=*/[](const StripeProgress&) { return true; }); }; auto file = test::createNimbleFile( *rootPool(), {chunk1, chunk2, chunk3}, options, false); @@ -612,7 +613,8 @@ TEST_F(SelectiveNimbleReaderTest, multiChunkInt16RowSetOverBoundary) { options.minStreamChunkRawSize = 0; options.flushPolicyFactory = [] { return std::make_unique( - [](const StripeProgress&) { return FlushDecision::Chunk; }); + /*flushLambda=*/[](const StripeProgress&) { return false; }, + /*chunkLambda=*/[](const StripeProgress&) { return true; }); }; auto file = test::createNimbleFile(*rootPool(), {chunk1, chunk2}, options, false); diff --git a/dwio/nimble/velox/tests/FlushPolicyTests.cpp b/dwio/nimble/velox/tests/FlushPolicyTests.cpp new file mode 100644 index 00000000..e02f20cf --- /dev/null +++ b/dwio/nimble/velox/tests/FlushPolicyTests.cpp @@ -0,0 +1,227 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include + +#include "dwio/nimble/velox/FlushPolicy.h" + +namespace facebook::nimble { + +class ChunkFlushPolicyTest : public ::testing::Test {}; + +TEST_F(ChunkFlushPolicyTest, InitialNoMemoryPressure) { + ChunkFlushPolicy policy(ChunkFlushPolicyConfig{ + .writerMemoryHighThresholdBytes = 1000, + .writerMemoryLowThresholdBytes = 500, + .targetStripeSizeBytes = 800, + .estimatedCompressionFactor = 1.5}); + + StripeProgress progress{ + .stripeRawSize = 100, + .stripeEncodedSize = 50, + .stripeEncodedLogicalSize = 80}; + + // Total Memory: 100 + 50 = 150 < 1000 (writerMemoryHighThresholdBytes) + EXPECT_FALSE(policy.shouldChunk(progress)); + + // shouldFlush checks two conditions: + // 1. Memory pressure check: 100 + 50 = 150 < 1000 (passes, no flush) + // 2. Stripe size check (only if condition 1 passes): + // compressionFactor = stripeEncodedLogicalSize / stripeEncodedSize + // = 80 / 50 = 1.6 + // expectedEncodedStripeSize = stripeEncodedSize + stripeRawSize / + // max(compressionFactor, 1.0) + // = 50 + 100 / max(1.6, 1.0) + // = 50 + 100 / 1.6 + // = 50 + 62.5 = 112.5 < 800 + // (targetStripeSizeBytes) + EXPECT_FALSE(policy.shouldFlush(progress)); + ; +} + +TEST_F(ChunkFlushPolicyTest, MemoryPressureChunkingLifecycle) { + ChunkFlushPolicy policy(ChunkFlushPolicyConfig{ + .writerMemoryHighThresholdBytes = 1000, + .writerMemoryLowThresholdBytes = 500, + .targetStripeSizeBytes = 800, + .estimatedCompressionFactor = 1.5}); + + // Start chunking when memory exceeds high threshold + StripeProgress progress1{ + .stripeRawSize = 600, + .stripeEncodedSize = 500, + .stripeEncodedLogicalSize = 400}; + // Total Memory: 600 + 500 = 1100 > 1000 (writerMemoryHighThresholdBytes) + EXPECT_TRUE(policy.shouldChunk(progress1)); + + // Continue chunking with decreasing raw size but still above low threshold + // Raw size decreased (600 → 550) and memory still above low threshold + StripeProgress progress2{ + .stripeRawSize = 550, + .stripeEncodedSize = 520, + .stripeEncodedLogicalSize = 400}; + // Total Memory: 550 + 520 = 1070 > 500 (writerMemoryLowThresholdBytes) + EXPECT_TRUE(policy.shouldChunk(progress2)); + + // Continue chunking as raw size decreases further + StripeProgress progress3{ + .stripeRawSize = 300, + .stripeEncodedSize = 300, + .stripeEncodedLogicalSize = 400}; + // Total memory: 300 + 300 = 600 > 500 + EXPECT_TRUE(policy.shouldChunk(progress3)); + + // Memory drops below low threshold - stop chunking + StripeProgress progress4{ + .stripeRawSize = 200, + .stripeEncodedSize = 200, + .stripeEncodedLogicalSize = 400}; + // Total Memory: 200 + 200 = 400 < 500 (writerMemoryLowThresholdBytes) + EXPECT_FALSE(policy.shouldChunk(progress4)); +} + +TEST_F(ChunkFlushPolicyTest, MemoryPressureTriggersFlush) { + ChunkFlushPolicy policy(ChunkFlushPolicyConfig{ + .writerMemoryHighThresholdBytes = 1000, + .writerMemoryLowThresholdBytes = 500, + .targetStripeSizeBytes = 800, + .estimatedCompressionFactor = 1.5}); + + StripeProgress progress{ + .stripeRawSize = 600, + .stripeEncodedSize = 500, + .stripeEncodedLogicalSize = 400}; + // Total Memory: 600 + 500 = 1100 > 1000 (writerMemoryHighThresholdBytes) + EXPECT_TRUE(policy.shouldChunk(progress)); + + // Total Memory: 600 + 500 = 1100 > 1000 (writerMemoryHighThresholdBytes) + EXPECT_TRUE(policy.shouldFlush(progress)); +} + +TEST_F(ChunkFlushPolicyTest, StripeSizeTriggersFlush) { + ChunkFlushPolicy policy(ChunkFlushPolicyConfig{ + .writerMemoryHighThresholdBytes = 2000, + .writerMemoryLowThresholdBytes = 500, + .targetStripeSizeBytes = 800, + .estimatedCompressionFactor = 1.5}); + + StripeProgress progress{ + .stripeRawSize = 1000, + .stripeEncodedSize = 400, + .stripeEncodedLogicalSize = 600}; + + // Expected Encoded Memory calculation: + // compressionFactor = stripeEncodedLogicalSize / stripeEncodedSize + // = 600 / 400 = 1.5 + // expectedEncodedStripeSize = stripeEncodedSize + stripeRawSize / + // max(compressionFactor, 1.0) + // = 400 + 1000 / max(1.5, 1.0) + // = 400 + 1000 / 1.5 + // = 400 + 666.67 = 1066.67 > 800 + EXPECT_TRUE(policy.shouldFlush(progress)); + ; +} + +TEST_F(ChunkFlushPolicyTest, ZeroEncodedSize) { + ChunkFlushPolicy policy(ChunkFlushPolicyConfig{ + .writerMemoryHighThresholdBytes = 1000, + .writerMemoryLowThresholdBytes = 500, + .targetStripeSizeBytes = 800, + .estimatedCompressionFactor = 2.0}); + + StripeProgress progress{ + .stripeRawSize = 1200, + .stripeEncodedSize = 0, + .stripeEncodedLogicalSize = 0}; + + // Memory pressure check: + // tripeRawSize + stripeEncodedSize = 1200 + 0 = 1200 > 1000 + // This triggers chunking due to memory pressure + EXPECT_TRUE(policy.shouldChunk(progress)); + ; + + // Flush check: Memory pressure (1200 > 1000) triggers stripe flush + EXPECT_TRUE(policy.shouldFlush(progress)); + ; +} + +TEST(FlushPolicyTest, StripeRawSizeFlushPolicyTest) { + StripeRawSizeFlushPolicy policy(/*stripeRawSize=*/1000); + + StripeProgress progress{ + .stripeRawSize = 1200, // Exceeds threshold + .stripeEncodedSize = 600, + .stripeEncodedLogicalSize = 800}; + + EXPECT_TRUE(policy.shouldFlush(progress)); + ; + + StripeProgress progress2{ + .stripeRawSize = 800, // Below threshold + .stripeEncodedSize = 600, + .stripeEncodedLogicalSize = 700}; + + EXPECT_FALSE(policy.shouldFlush(progress2)); +} + +TEST(FlushPolicyTest, LambdaFlushPolicyTest) { + LambdaFlushPolicy policy( + /*flushLambda_=*/ + [](const StripeProgress& progress) { + return progress.stripeRawSize > 1000; + }, + /*chunkLambda_=*/ + [](const StripeProgress& progress) { + return progress.stripeEncodedSize > 500; + }); + + // Test None case + StripeProgress noneProgress{ + .stripeRawSize = 300, + .stripeEncodedSize = 200, + .stripeEncodedLogicalSize = 250}; + EXPECT_FALSE(policy.shouldFlush(noneProgress)); + EXPECT_FALSE(policy.shouldChunk(noneProgress)); + + // Test Chunk case + StripeProgress chunkProgress{ + .stripeRawSize = 500, + .stripeEncodedSize = 600, + .stripeEncodedLogicalSize = 450}; + EXPECT_TRUE(policy.shouldChunk(chunkProgress)); + + // Test Stripe case + StripeProgress stripeProgress{ + .stripeRawSize = 1200, + .stripeEncodedSize = 400, + .stripeEncodedLogicalSize = 800}; + EXPECT_TRUE(policy.shouldFlush(stripeProgress)); +} + +TEST(FlushPolicyTest, LambdaFlushPolicyDefaultLambdas) { + LambdaFlushPolicy policy; + + StripeProgress progress{ + .stripeRawSize = 1200, + .stripeEncodedSize = 600, + .stripeEncodedLogicalSize = 800}; + + EXPECT_FALSE(policy.shouldFlush(progress)); + ; + EXPECT_FALSE(policy.shouldChunk(progress)); +} + +} // namespace facebook::nimble diff --git a/dwio/nimble/velox/tests/StreamChunkerTests.cpp b/dwio/nimble/velox/tests/StreamChunkerTests.cpp new file mode 100644 index 00000000..6289fd81 --- /dev/null +++ b/dwio/nimble/velox/tests/StreamChunkerTests.cpp @@ -0,0 +1,1043 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include + +#include "dwio/nimble/velox/SchemaBuilder.h" +#include "dwio/nimble/velox/StreamChunker.h" +#include "dwio/nimble/velox/StreamData.h" +#include "velox/common/memory/Memory.h" +#include "velox/common/memory/MemoryArbitrator.h" +#include "velox/common/memory/SharedArbitrator.h" + +namespace facebook::nimble { +template +void populateData(Vector& vec, const std::vector& data) { + vec.reserve(data.size()); + for (const auto& item : data) { + vec.push_back(item); + } +} + +template +std::vector toVector(std::string_view data) { + const T* chunkData = reinterpret_cast(data.data()); + return std::vector(chunkData, chunkData + data.size() / sizeof(T)); +} + +// Expected chunk result structure +template +struct ExpectedChunk { + std::vector chunkData; + std::vector chunkNonNulls = {}; + bool hasNulls = false; + size_t extraMemory = 0; +}; + +class StreamChunkerTestsBase : public ::testing::Test { + protected: + // Helper method to validate chunk results against expected data + template + void validateChunk( + const StreamData& stream, + std::unique_ptr chunker, + const std::vector>& expectedChunks, + const ExpectedChunk& expectedRetainedData) { + int chunkIndex = 0; + while (const auto chunkView = chunker->next()) { + ASSERT_LT(chunkIndex, expectedChunks.size()); + ASSERT_TRUE(chunkView.has_value()); + const auto chunkData = toVector(chunkView->data()); + const auto& expectedChunk = expectedChunks[chunkIndex]; + const auto& expectedChunkData = expectedChunk.chunkData; + const auto& expectedChunkNonNulls = expectedChunk.chunkNonNulls; + EXPECT_THAT(chunkData, ::testing::ElementsAreArray(expectedChunkData)); + EXPECT_THAT( + chunkView->nonNulls(), + ::testing::ElementsAreArray(expectedChunkNonNulls)); + EXPECT_EQ(chunkView->hasNulls(), expectedChunk.hasNulls); + ++chunkIndex; + } + ASSERT_EQ(chunkIndex, expectedChunks.size()); + + // Erases processed data to reclaim memory. + chunker->compact(); + + // Validate buffer is properly compacted to only retain expected data + EXPECT_THAT( + toVector(stream.data()), + ::testing::ElementsAreArray(expectedRetainedData.chunkData)); + EXPECT_THAT( + stream.nonNulls(), + ::testing::ElementsAreArray(expectedRetainedData.chunkNonNulls)); + EXPECT_EQ(stream.hasNulls(), expectedRetainedData.hasNulls); + } + + static void SetUpTestCase() { + velox::memory::SharedArbitrator::registerFactory(); + velox::memory::MemoryManager::Options options; + options.arbitratorKind = "SHARED"; + velox::memory::MemoryManager::testingSetInstance(options); + } + + void SetUp() override { + rootPool_ = velox::memory::memoryManager()->addRootPool("default_root"); + leafPool_ = rootPool_->addLeafChild("default_leaf"); + schemaBuilder_ = std::make_unique(); + inputBufferGrowthPolicy_ = + DefaultInputBufferGrowthPolicy::withDefaultRanges(); + } + + std::shared_ptr rootPool_; + std::shared_ptr leafPool_; + std::unique_ptr schemaBuilder_; + std::unique_ptr inputBufferGrowthPolicy_; +}; + +TEST_F(StreamChunkerTestsBase, getNewBufferCapacityTest) { + // currentCapacityCount < maxChunkElementCount + uint64_t maxChunkElementCount = 8; + uint64_t currentCapacityCount = 4; + const uint64_t requiredCapacityCount = 2; + EXPECT_EQ( + detail::getNewBufferCapacity( + /*maxChunkSize=*/maxChunkElementCount * sizeof(int32_t), + /*currentCapacityCount=*/currentCapacityCount, + /*requiredCapacityCount=*/requiredCapacityCount), + currentCapacityCount); + + currentCapacityCount = 40; + // currentCapacityCount > maxChunkElementCount = 8 + (40 - 8) * 0.5 = 24 + EXPECT_EQ( + detail::getNewBufferCapacity( + /*maxChunkSize=*/maxChunkElementCount * sizeof(int32_t), + /*currentCapacityCount=*/currentCapacityCount, + /*requiredCapacityCount=*/requiredCapacityCount), + 24); +} + +TEST_F(StreamChunkerTestsBase, omitStreamTest) { + auto scalarTypeBuilder = + schemaBuilder_->createScalarTypeBuilder(ScalarKind::Bool); + auto descriptor = &scalarTypeBuilder->scalarDescriptor(); + NullsStreamData nullsStream( + *leafPool_, *descriptor, *inputBufferGrowthPolicy_); + + // Setup test data with some nulls + std::vector nonNullsTestData = { + true, false, true, true, false, true, false, true, false, false}; + nullsStream.ensureAdditionalNullsCapacity( + /*mayHaveNulls=*/true, static_cast(nonNullsTestData.size())); + auto& nonNulls = nullsStream.mutableNonNulls(); + populateData(nonNulls, nonNullsTestData); + + // Null Stream Basic Test + { + // Non-empty null stream with size above minChunkSize + EXPECT_FALSE(detail::shouldOmitNullStream( + nullsStream, + /*minChunkSize=*/4, + /*isEmptyStreamContent=*/false, + /*isLastChunk=*/false)); + + // Non-empty null stream with size below minChunkSize + EXPECT_TRUE(detail::shouldOmitNullStream( + nullsStream, + /*minChunkSize=*/11, + /*isEmptyStreamContent=*/true, + /*isLastChunk=*/false)); + + // Non-empty null stream with size below minChunkSize and last chunk + EXPECT_FALSE(detail::shouldOmitNullStream( + nullsStream, + /*minChunkSize=*/11, + /*isEmptyStreamContent=*/true, + /*isLastChunk=*/true)); + + // Non-empty null stream with size below minChunkSize + // and non-empty written stream content + EXPECT_FALSE(detail::shouldOmitNullStream( + nullsStream, + /*minChunkSize=*/11, + /*isEmptyStreamContent=*/false, + /*isLastChunk=*/false)); + } + + scalarTypeBuilder = + schemaBuilder_->createScalarTypeBuilder(ScalarKind::Int32); + descriptor = &scalarTypeBuilder->scalarDescriptor(); + NullableContentStreamData nullableContentStream( + *leafPool_, *descriptor, *inputBufferGrowthPolicy_); + std::vector testData = {1, 2, 3, 4, 5}; + // Populate data using ensureAdditionalNullsCapacity and manual data insertion + nullableContentStream.ensureAdditionalNullsCapacity( + /*mayHaveNulls=*/true, static_cast(nonNullsTestData.size())); + populateData(nullableContentStream.mutableData(), testData); + populateData(nullableContentStream.mutableNonNulls(), nonNullsTestData); + + // Nullable Content Stream Basic Test + { + // Non-empty stream with size above minChunkSize + EXPECT_FALSE(detail::shouldOmitDataStream( + nullableContentStream, + /*minChunkSize=*/4, + /*isEmptyStreamContent=*/false, + /*isLastChunk=*/false)); + + // Non-empty stream with size below minChunkSize + EXPECT_TRUE(detail::shouldOmitDataStream( + nullableContentStream, + /*minChunkSize=*/100, + /*isEmptyStreamContent=*/true, + /*isLastChunk=*/false)); + + // Non-empty stream with size below minChunkSize and last chunk + EXPECT_FALSE(detail::shouldOmitDataStream( + nullableContentStream, + /*minChunkSize=*/100, + /*isEmptyStreamContent=*/true, + /*isLastChunk=*/true)); + + // Non-empty stream with size below minChunkSize + // and non-empty written stream content + EXPECT_FALSE(detail::shouldOmitDataStream( + nullableContentStream, + /*minChunkSize=*/100, + /*isEmptyStreamContent=*/false, + /*isLastChunk=*/false)); + } + + // NullsStreamChunker respects omitStream + { + // Test case where omitStream returns true - chunker should return nullopt + auto chunker = getStreamChunker( + nullsStream, + {.minChunkSize = 11, // Set high minChunkSize to trigger omitStream + .maxChunkSize = 4, + .ensureFullChunks = false, + .isEmptyStreamContent = true, + .isLastChunk = false}); + + // Validate that the correct chunker type was created + ASSERT_NE(dynamic_cast(chunker.get()), nullptr); + + // Should return nullopt because omitStream should return true + auto result = chunker->next(); + EXPECT_FALSE(result.has_value()); + + // Test case where omitStream returns false + auto chunker2 = getStreamChunker( + nullsStream, + {.minChunkSize = 4, // Set reasonable minChunkSize + .maxChunkSize = 4, + .ensureFullChunks = false, + .isEmptyStreamContent = false, + .isLastChunk = false}); + + // Should return valid chunks because omitStream should return false + auto result2 = chunker2->next(); + EXPECT_TRUE(result2.has_value()); + } + + // NullableContentStreamChunker respects omitStream + { + // Test case where omitStream returns true - chunker should return nullopt + auto chunker = getStreamChunker( + nullableContentStream, + {.minChunkSize = 100, // Set high minChunkSize to trigger omitStream + .maxChunkSize = 20, + .ensureFullChunks = false, + .isEmptyStreamContent = true, + .isLastChunk = false}); + + // Validate that the correct chunker type was created + ASSERT_NE( + dynamic_cast*>(chunker.get()), + nullptr); + + // Should return nullopt because omitStream should return true + auto result = chunker->next(); + EXPECT_FALSE(result.has_value()); + + // Test case where omitStream returns false + auto chunker2 = getStreamChunker( + nullableContentStream, + {.minChunkSize = 4, // Set reasonable minChunkSize + .maxChunkSize = 20, + .ensureFullChunks = false, + .isEmptyStreamContent = false, + .isLastChunk = false}); + + // Should return valid chunks because omitStream should return false + auto result2 = chunker2->next(); + EXPECT_TRUE(result2.has_value()); + } +} + +TEST_F(StreamChunkerTestsBase, ContentStreamIntChunking) { + auto scalarTypeBuilder = + schemaBuilder_->createScalarTypeBuilder(ScalarKind::Int32); + auto descriptor = &scalarTypeBuilder->scalarDescriptor(); + ContentStreamData stream( + *leafPool_, *descriptor, *inputBufferGrowthPolicy_); + + // Test 1: Do not return last chunk. + { + std::vector testData = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; + auto& data = stream.mutableData(); + populateData(data, testData); + uint64_t maxChunkSize = 18; // 4.5 elements + uint64_t minChunkSize = 12; // 3 elements + auto chunker = getStreamChunker( + stream, + {.minChunkSize = minChunkSize, + .maxChunkSize = maxChunkSize, + .ensureFullChunks = false, + .isEmptyStreamContent = false, + .isLastChunk = false}); + // Validate that the correct chunker type was created + ASSERT_NE( + dynamic_cast*>(chunker.get()), nullptr); + std::vector> expectedChunks = { + {.chunkData = {1, 2, 3, 4}}, // First chunk: 4 elements + {.chunkData = {5, 6, 7, 8}} // Second chunk: 4 elements + // Remaining 2 elements won't be chunked due to minChunkSize + }; + // Expected retained data after compaction + ExpectedChunk expectedRetainedData{.chunkData = {9, 10}}; + validateChunk( + stream, std::move(chunker), expectedChunks, expectedRetainedData); + } + + // Test 2: Nothing returned when ensureFullChunks = true and remaining data in + // StreamData is less than maxChunkSize. + { + uint64_t maxChunkSize = 18; // 4.5 elements + uint64_t minChunkSize = 12; // 3 elements + auto chunker = getStreamChunker( + stream, + {.minChunkSize = minChunkSize, + .maxChunkSize = maxChunkSize, + .ensureFullChunks = true, + .isEmptyStreamContent = false, + .isLastChunk = false}); + // Validate that the correct chunker type was created + ASSERT_NE( + dynamic_cast*>(chunker.get()), nullptr); + std::vector> expectedChunks = {}; + ExpectedChunk expectedRetainedData{.chunkData = {9, 10}}; + validateChunk( + stream, std::move(chunker), expectedChunks, expectedRetainedData); + } + + // Test 3: Return last chunk. + { + auto& data = stream.mutableData(); + data.clear(); // Clear existing data first + std::vector testData = {7, 8, 9, 10, 11}; + populateData(data, testData); + + uint64_t maxChunkSize = 12; // 3 elements + uint64_t minChunkSize = 0; // 1 elements + auto chunker = getStreamChunker( + stream, + {.minChunkSize = minChunkSize, + .maxChunkSize = maxChunkSize, + .ensureFullChunks = true, + .isEmptyStreamContent = false, + .isLastChunk = true}); + // Validate that the correct chunker type was created + ASSERT_NE( + dynamic_cast*>(chunker.get()), nullptr); + + std::vector> expectedChunks = { + {.chunkData = {7, 8, 9}}, // First chunk: 3 elements + {.chunkData = {10, 11}}, // Second chunk: 2 elements (lastChunk=true + // allows smaller chunks) + }; + // No data retained after processing all chunks + ExpectedChunk expectedRetainedData{.chunkData = {}}; + validateChunk( + stream, std::move(chunker), expectedChunks, expectedRetainedData); + } +} + +TEST_F(StreamChunkerTestsBase, ContentStreamStringChunking) { + auto scalarTypeBuilder = + schemaBuilder_->createScalarTypeBuilder(ScalarKind::String); + auto descriptor = &scalarTypeBuilder->scalarDescriptor(); + ContentStreamData stream( + *leafPool_, *descriptor, *inputBufferGrowthPolicy_); + uint64_t maxChunkSize = 55; + uint64_t minChunkSize = 40; + // Test 1: Basic string chunking. lastChunk=false + { + std::vector testData = { + "short", "a_longer_string", "x", "medium_size", "tiny"}; + auto& data = stream.mutableData(); + populateData(data, testData); + + // Calculate extra memory for string content + for (const auto& str : testData) { + stream.extraMemory() += str.size(); + } + + // Total string content sizes: + // "short"(5) + "a_longer_string"(15) + "x"(1) + "medium_size"(11) + + // "tiny"(4) = 36 bytes + ASSERT_EQ(stream.extraMemory(), 36); + auto chunker = getStreamChunker( + stream, + {.minChunkSize = minChunkSize, + .maxChunkSize = maxChunkSize, + .ensureFullChunks = false, + .isEmptyStreamContent = false, + .isLastChunk = false}); + + std::vector> expectedChunks = { + {.chunkData = {"short", "a_longer_string"}, + .chunkNonNulls = {}, + .hasNulls = false, + .extraMemory = + 20}, // "short"(5) + "a_longer_string"(15) = 20 bytes extra + {.chunkData = {"x", "medium_size"}, + .chunkNonNulls = {}, + .hasNulls = false, + .extraMemory = 12} // "x"(1) + "medium_size"(11) = 12 bytes extra + // "tiny"(4) remains due to minChunkSize constraint + }; + + ExpectedChunk expectedRetainedData{ + .chunkData = {"tiny"}, + .chunkNonNulls = {}, + .hasNulls = false, + .extraMemory = 4}; // "tiny"(4) = 4 bytes extra + validateChunk( + stream, std::move(chunker), expectedChunks, expectedRetainedData); + } + + // Test 2: Remaining chunk with lastChunk=true + { + auto chunker = getStreamChunker( + stream, + {.minChunkSize = minChunkSize, + .maxChunkSize = maxChunkSize, + .ensureFullChunks = true, + .isEmptyStreamContent = false, + .isLastChunk = true}); + // Validate that the correct chunker type was created + ASSERT_NE( + dynamic_cast*>(chunker.get()), + nullptr); + + std::vector> expectedChunks = { + {.chunkData = {"tiny"}, + .chunkNonNulls = {}, + .hasNulls = false, + .extraMemory = 4}, // "tiny"(4) = 4 bytes extra + }; + ExpectedChunk expectedRetainedData{ + .chunkData = {}, + .chunkNonNulls = {}, + .hasNulls = false, + .extraMemory = 0}; // No data retained + validateChunk( + stream, std::move(chunker), expectedChunks, expectedRetainedData); + } + + // Test 3: String chunking with lastChunk=true on fresh data + { + auto& data = stream.mutableData(); + data.clear(); // Clear existing data first + stream.extraMemory() = 0; // Reset extra memory + + std::vector testData = { + "hello", "world", "test", "final"}; + populateData(data, testData); + + // Calculate extra memory for string content + for (const auto& str : testData) { + stream.extraMemory() += str.size(); + } + + auto chunker = getStreamChunker( + stream, + {.minChunkSize = minChunkSize, + .maxChunkSize = maxChunkSize, + .ensureFullChunks = true, + .isEmptyStreamContent = false, + .isLastChunk = true}); + // Validate that the correct chunker type was created + ASSERT_NE( + dynamic_cast*>(chunker.get()), + nullptr); + + std::vector> expectedChunks = { + {.chunkData = {"hello", "world"}, + .chunkNonNulls = {}, + .hasNulls = false, + .extraMemory = 10}, // "hello"(5) + "world"(5) = 10 bytes extra + {.chunkData = {"test", "final"}, + .chunkNonNulls = {}, + .hasNulls = false, + .extraMemory = 9} // "test"(4) + "final"(5) = 9 bytes extra + }; + + // All data should be processed with lastChunk=true + ExpectedChunk expectedRetainedData{ + .chunkData = {}, + .chunkNonNulls = {}, + .hasNulls = false, + .extraMemory = 0}; // No data retained + validateChunk( + stream, std::move(chunker), expectedChunks, expectedRetainedData); + } + + // Test 4: String chunking with ensureFullChunks_ + { + minChunkSize = 2; + std::vector testData = {"last_string"}; + auto& data = stream.mutableData(); + populateData(data, testData); + + // Calculate extra memory for string content + for (const auto& str : testData) { + stream.extraMemory() += str.size(); + } + + // Total string content sizes: "last_string"(11) = 11 bytes + ASSERT_EQ(stream.extraMemory(), 11); + auto memoryUsed = stream.memoryUsed(); + + // maxChunkSize is greater than available memory + auto chunker = getStreamChunker( + stream, + {.minChunkSize = minChunkSize, + .maxChunkSize = memoryUsed + 1, + .ensureFullChunks = true, + .isEmptyStreamContent = false, + .isLastChunk = false}); + + std::vector> expectedChunks = {}; + ExpectedChunk expectedRetainedData{ + .chunkData = {"last_string"}, + .chunkNonNulls = {}, + .hasNulls = false, + .extraMemory = 11}; + + validateChunk( + stream, std::move(chunker), expectedChunks, expectedRetainedData); + + // When maxChunkSize is equal to available memory, chunk is returned. + auto chunker2 = getStreamChunker( + stream, + {.minChunkSize = minChunkSize, + .maxChunkSize = memoryUsed, + .ensureFullChunks = true, + .isEmptyStreamContent = false, + .isLastChunk = false}); + expectedChunks = { + {.chunkData = {"last_string"}, + .chunkNonNulls = {}, + .hasNulls = false, + .extraMemory = 11}}; + + validateChunk( + stream, + std::move(chunker2), + expectedChunks, + {.chunkData = {}, + .chunkNonNulls = {}, + .hasNulls = false, + .extraMemory = 0}); + } +} + +TEST_F(StreamChunkerTestsBase, NullsStreamChunking) { + auto scalarTypeBuilder = + schemaBuilder_->createScalarTypeBuilder(ScalarKind::Bool); + auto descriptor = &scalarTypeBuilder->scalarDescriptor(); + NullsStreamData stream(*leafPool_, *descriptor, *inputBufferGrowthPolicy_); + + // Setup test data with some nulls + std::vector testData = { + true, false, true, true, false, true, false, true, false, false}; + stream.ensureAdditionalNullsCapacity( + /*mayHaveNulls=*/true, static_cast(testData.size())); + auto& nonNulls = stream.mutableNonNulls(); + populateData(nonNulls, testData); + + // size of data is 10 * sizeof(bool) = 10 bytes + ASSERT_EQ(stream.memoryUsed(), 10); + EXPECT_EQ(stream.data().size(), 0); + + // 4 bytes (arbitrary chunk size) + uint32_t maxChunkSize = 4; + uint32_t minChunkSize = 3; + + // Test 1: Not last chunk. + { + auto chunker = getStreamChunker( + stream, + {.minChunkSize = minChunkSize, + .maxChunkSize = maxChunkSize, + .ensureFullChunks = false, + .isEmptyStreamContent = true, + .isLastChunk = false}); + // Validate that the correct chunker type was created + ASSERT_NE(dynamic_cast(chunker.get()), nullptr); + + std::vector> expectedChunks = { + {.chunkData = {true, false, true, true}, + .chunkNonNulls = {}, + .hasNulls = false}, + {.chunkData = {false, true, false, true}, + .chunkNonNulls = {}, + .hasNulls = false}}; + // Expected retained data after compaction (no data, but non-nulls remain) + ExpectedChunk expectedRetainedData{ + .chunkData = {}, .chunkNonNulls = {false, false}, .hasNulls = true}; + validateChunk( + stream, std::move(chunker), expectedChunks, expectedRetainedData); + } + + // Test 2: Last chunk. + { + auto chunker = getStreamChunker( + stream, + {.minChunkSize = minChunkSize, + .maxChunkSize = maxChunkSize, + .ensureFullChunks = false, + .isEmptyStreamContent = true, + .isLastChunk = true}); + // Validate that the correct chunker type was created + ASSERT_NE(dynamic_cast(chunker.get()), nullptr); + std::vector> expectedChunks = { + {.chunkData = {false, false}, .chunkNonNulls = {}}}; + ExpectedChunk expectedRetainedData{ + .chunkData = {}, .chunkNonNulls = {}, .hasNulls = false}; + // No data retained after processing all chunks + validateChunk( + stream, std::move(chunker), expectedChunks, expectedRetainedData); + } + + // // Test 3: Generate chunks when ensureFullChunks_ = true + { + testData = { + true, false, true, true, false, true, false, true, false, false}; + stream.ensureAdditionalNullsCapacity( + /*mayHaveNulls=*/true, static_cast(testData.size())); + nonNulls = stream.mutableNonNulls(); + populateData(nonNulls, testData); + + auto memoryUsed = stream.memoryUsed(); + // maxChunkSize is greater than available memory + auto chunker = getStreamChunker( + stream, + {.minChunkSize = minChunkSize, + .maxChunkSize = memoryUsed + 1, + .ensureFullChunks = true, + .isEmptyStreamContent = false, + .isLastChunk = false}); + + // No chunks returned, all data retained + ExpectedChunk expectedRetainedData{ + .chunkData = {}, .chunkNonNulls = testData, .hasNulls = true}; + validateChunk( + stream, + std::move(chunker), + /*expectedChunks=*/{}, + expectedRetainedData); + + // When maxChunkSize is equal to available memory, chunk is returned. + auto chunker2 = getStreamChunker( + stream, + {.minChunkSize = minChunkSize, + .maxChunkSize = memoryUsed, + .ensureFullChunks = true, + .isEmptyStreamContent = false, + .isLastChunk = false}); + auto expectedChunks = {ExpectedChunk{ + .chunkData = testData, .chunkNonNulls = {}, .hasNulls = false}}; + expectedRetainedData.chunkNonNulls = {}; + expectedRetainedData.hasNulls = false; + // No data retained after processing all chunks + validateChunk( + stream, std::move(chunker2), expectedChunks, expectedRetainedData); + } +} + +TEST_F(StreamChunkerTestsBase, NullableContentStreamIntChunking) { + auto scalarTypeBuilder = + schemaBuilder_->createScalarTypeBuilder(ScalarKind::Int32); + auto descriptor = &scalarTypeBuilder->scalarDescriptor(); + NullableContentStreamData stream( + *leafPool_, *descriptor, *inputBufferGrowthPolicy_); + + // Setup test data with some nulls + std::vector testData = {1, 2, 3, 4, 5, 6, 7}; + std::vector nonNullsData = { + true, false, true, true, false, true, false, true, true, true}; + + // Populate data using ensureAdditionalNullsCapacity and manual data insertion + stream.ensureAdditionalNullsCapacity( + /*mayHaveNulls=*/true, static_cast(nonNullsData.size())); + populateData(stream.mutableData(), testData); + populateData(stream.mutableNonNulls(), nonNullsData); + + // 7 * sizeof(int32_t) + 10 * sizeof(bool)) = 38 + ASSERT_EQ(stream.memoryUsed(), 38); + + // Test 1: Not last chunk + uint64_t maxChunkSize = 22; + uint64_t minChunkSize = 20; + { + auto chunker = getStreamChunker( + stream, + {.minChunkSize = minChunkSize, + .maxChunkSize = maxChunkSize, + .ensureFullChunks = false, + .isEmptyStreamContent = true, + .isLastChunk = false}); + + std::vector> expectedChunks = { + {.chunkData = {1, 2, 3, 4}, + .chunkNonNulls = {true, false, true, true, false, true}, + .hasNulls = true}}; + // Expected retained data after compaction + ExpectedChunk expectedRetainedData{ + .chunkData = {5, 6, 7}, + .chunkNonNulls = {false, true, true, true}, + .hasNulls = true}; + validateChunk( + stream, std::move(chunker), expectedChunks, expectedRetainedData); + } + + // Test 2: Last chunk - process remaining data + { + auto chunker = getStreamChunker( + stream, + {.minChunkSize = minChunkSize, + .maxChunkSize = maxChunkSize, + .ensureFullChunks = false, + .isEmptyStreamContent = true, + .isLastChunk = true}); + // Validate that the correct chunker type was created + ASSERT_NE( + dynamic_cast*>(chunker.get()), + nullptr); + std::vector> expectedChunks = { + {.chunkData = {5, 6, 7}, + .chunkNonNulls = {false, true, true, true}, + .hasNulls = true}}; + ExpectedChunk expectedRetainedData{ + .chunkData = {}, .chunkNonNulls = {}, .hasNulls = false}; + // No data retained after processing all chunks + validateChunk( + stream, std::move(chunker), expectedChunks, expectedRetainedData); + } + + // Test 3: Small chunks with mixed null/non-null data + { + std::vector newTestData = {10, 11}; + std::vector test3NonNullsData = { + true, false, false, true, false, false}; + stream.ensureAdditionalNullsCapacity( + /*mayHaveNulls=*/true, static_cast(test3NonNullsData.size())); + auto& data = stream.mutableData(); + auto& nonNulls = stream.mutableNonNulls(); + populateData(data, newTestData); + populateData(nonNulls, test3NonNullsData); + + maxChunkSize = 5; // Very small chunks + auto chunker = getStreamChunker( + stream, + {.minChunkSize = minChunkSize, + .maxChunkSize = maxChunkSize, + .ensureFullChunks = false, + .isEmptyStreamContent = true, + .isLastChunk = true}); + // Validate that the correct chunker type was created + ASSERT_NE( + dynamic_cast*>(chunker.get()), + nullptr); + + std::vector> expectedChunks = { + {.chunkData = {10}, .chunkNonNulls = {true}, .hasNulls = false}, + {.chunkData = {}, .chunkNonNulls = {false, false}, .hasNulls = true}, + {.chunkData = {11}, .chunkNonNulls = {true}, .hasNulls = false}, + {.chunkData = {}, .chunkNonNulls = {false, false}, .hasNulls = true}, + }; + ExpectedChunk expectedRetainedData{ + .chunkData = {}, .chunkNonNulls = {}, .hasNulls = false}; + // No data retained after processing all chunks + validateChunk( + stream, std::move(chunker), expectedChunks, expectedRetainedData); + } + + // Test 4: ensureFullChunks=true with data that can't fill maxChunkSize + { + stream.reset(); + std::vector smallTestData = {20, 21}; + std::vector smallNonNullsData = {true, false, true}; + stream.ensureAdditionalNullsCapacity( + /*mayHaveNulls=*/true, static_cast(smallNonNullsData.size())); + auto& data = stream.mutableData(); + auto& nonNulls = stream.mutableNonNulls(); + populateData(data, smallTestData); + populateData(nonNulls, smallNonNullsData); + + // maxChunkSize greater than available memory + maxChunkSize = stream.memoryUsed() + 1; + minChunkSize = 2; + auto chunker = getStreamChunker( + stream, + {.minChunkSize = minChunkSize, + .maxChunkSize = maxChunkSize, + .ensureFullChunks = true, + .isEmptyStreamContent = true, + .isLastChunk = false}); + ASSERT_NE( + dynamic_cast*>(chunker.get()), + nullptr); + + // Should return nothing due to ensureFullChunks=true and insufficient data + std::vector> expectedChunks = {}; + // Expected retained data after compaction + ExpectedChunk expectedRetainedData{ + .chunkData = {20, 21}, + .chunkNonNulls = {true, false, true}, + .hasNulls = true}; + validateChunk( + stream, std::move(chunker), expectedChunks, expectedRetainedData); + + // maxChunkSize equal to available memory + maxChunkSize = stream.memoryUsed(); + auto chunker2 = getStreamChunker( + stream, + {.minChunkSize = minChunkSize, + .maxChunkSize = maxChunkSize, + .ensureFullChunks = true, + .isEmptyStreamContent = true, + .isLastChunk = false}); + ASSERT_NE( + dynamic_cast*>(chunker2.get()), + nullptr); + + // Should return nothing due to ensureFullChunks=true and insufficient data + std::vector> expectedChunks2 = { + {.chunkData = {20, 21}, + .chunkNonNulls = {true, false, true}, + .hasNulls = true}, + }; + // No data retained after processing all chunks + validateChunk( + stream, std::move(chunker2), expectedChunks2, {.hasNulls = false}); + } +} + +// TODO: For string tests. Manually determine that the left over extra memory is +// correct after compaction. +TEST_F(StreamChunkerTestsBase, NullableContentStreamStringChunking) { + auto scalarTypeBuilder = + schemaBuilder_->createScalarTypeBuilder(ScalarKind::String); + auto descriptor = &scalarTypeBuilder->scalarDescriptor(); + NullableContentStreamData stream( + *leafPool_, *descriptor, *inputBufferGrowthPolicy_); + + // Setup test data with some nulls + std::vector testData = { + "hello", "world", "test", "string", "chunk", "data", "example"}; + std::vector nonNullsData = { + true, false, true, true, false, true, false, true, true, true, false}; + + // Populate data using ensureAdditionalNullsCapacity and manual data insertion + stream.ensureAdditionalNullsCapacity( + /*mayHaveNulls=*/true, static_cast(nonNullsData.size())); + auto& data = stream.mutableData(); + auto& nonNulls = stream.mutableNonNulls(); + populateData(data, testData); + populateData(nonNulls, nonNullsData); + + // Set extra memory for string overhead + // number of strings: 7 + // size of string view data pointers = 7 * 8 = 56 bytes + // size of string view data size = 7 * 8 = 56 bytes + // size of nulls = 11 * 1 = 11 bytes + // total size of stored string data = 36 bytes + // total size of stream data = 56 + 56 + 11 + 36 = 159 bytes + for (const auto& entry : testData) { + stream.extraMemory() += entry.size(); + } + ASSERT_EQ(stream.memoryUsed(), 159); + + // Test 1: Not last chunk + { + uint64_t maxChunkSize = 72; + uint64_t minChunkSize = 50; + auto chunker = getStreamChunker( + stream, + {.minChunkSize = minChunkSize, + .maxChunkSize = maxChunkSize, + .ensureFullChunks = false, + .isEmptyStreamContent = true, + .isLastChunk = false}); + + std::vector> expectedChunks = { + {.chunkData = {"hello", "world", "test"}, + .chunkNonNulls = {true, false, true, true, false}, + .hasNulls = true, + .extraMemory = + 14}, // "hello"(5) + "world"(5) + "test"(4) = 14 bytes extra + {.chunkData = {"string", "chunk", "data"}, + .chunkNonNulls = {true, false, true, true}, + .hasNulls = true, + .extraMemory = + 15} // "string"(6) + "chunk"(5) + "data"(4) = 14 bytes extra + }; + // Expected retained data after compaction + ExpectedChunk expectedRetainedData{ + .chunkData = {"example"}, + .chunkNonNulls = {true, false}, + .hasNulls = true}; + validateChunk( + stream, std::move(chunker), expectedChunks, expectedRetainedData); + } + + // Test 2: Last chunk - process remaining data + { + uint64_t maxChunkSize = 25; + uint64_t minChunkSize = 22; + auto chunker = getStreamChunker( + stream, + {.minChunkSize = minChunkSize, + .maxChunkSize = maxChunkSize, + .ensureFullChunks = false, + .isEmptyStreamContent = true, + .isLastChunk = true}); + // Validate that the correct chunker type was created + ASSERT_NE( + dynamic_cast*>( + chunker.get()), + nullptr); + + std::vector> expectedChunks = { + {.chunkData = {"example"}, + .chunkNonNulls = {true, false}, + .hasNulls = true, + .extraMemory = 7} // "example"(7) = 7 bytes extra + }; + // No data retained after processing all chunks + ExpectedChunk expectedRetainedData{ + .chunkData = {}, .chunkNonNulls = {}, .hasNulls = false}; + validateChunk( + stream, std::move(chunker), expectedChunks, expectedRetainedData); + } + + // Test 3: ensureFullChunks=true with data that can't fill maxChunkSize - + // should return nothing + { + std::vector smallTestData = {"test", "data"}; + std::vector smallNonNullsData = {true, false, true}; + stream.ensureAdditionalNullsCapacity( + /*mayHaveNulls=*/true, static_cast(smallNonNullsData.size())); + auto& smallData = stream.mutableData(); + auto& smallNonNulls = stream.mutableNonNulls(); + populateData(smallData, smallTestData); + populateData(smallNonNulls, smallNonNullsData); + + // Reset extra memory for new test data + stream.extraMemory() = 0; + for (const auto& entry : smallTestData) { + stream.extraMemory() += entry.size(); + } + + uint64_t maxChunkSize = 50; // Larger than available data + uint64_t minChunkSize = 10; + auto chunker = getStreamChunker( + stream, + {.minChunkSize = minChunkSize, + .maxChunkSize = maxChunkSize, + .ensureFullChunks = true, + .isEmptyStreamContent = true, + .isLastChunk = false}); + ASSERT_NE( + dynamic_cast*>( + chunker.get()), + nullptr); + + // Should return nothing due to ensureFullChunks=true and insufficien data + std::vector> expectedChunks = {}; + // Expected retained data after compaction + ExpectedChunk expectedRetainedData{ + .chunkData = {"test", "data"}, + .chunkNonNulls = {true, false, true}, + .hasNulls = true}; + validateChunk( + stream, std::move(chunker), expectedChunks, expectedRetainedData); + } + + // Test 4: ensureFullChunks=true with exact boundary case + { + stream.reset(); + std::vector exactData = {"a", "b"}; + std::vector exactNonNulls = {true, true}; + stream.ensureAdditionalNullsCapacity( + /*mayHaveNulls=*/true, static_cast(exactNonNulls.size())); + auto& exactTestData = stream.mutableData(); + auto& exactTestNonNulls = stream.mutableNonNulls(); + populateData(exactTestData, exactData); + populateData(exactTestNonNulls, exactNonNulls); + + // Reset extra memory for new test data + stream.extraMemory() = 0; + for (const auto& entry : exactData) { + stream.extraMemory() += entry.size(); + } + + // Set maxChunkSize to exactly match the memory used + uint64_t maxChunkSize = stream.memoryUsed(); + uint64_t minChunkSize = 5; + auto chunker = getStreamChunker( + stream, + {.minChunkSize = minChunkSize, + .maxChunkSize = maxChunkSize, + .ensureFullChunks = true, + .isEmptyStreamContent = true, + .isLastChunk = false}); + ASSERT_NE( + dynamic_cast*>( + chunker.get()), + nullptr); + + // Should return chunk since it exactly matches maxChunkSize + std::vector> expectedChunks = { + {.chunkData = {"a", "b"}, + .chunkNonNulls = {true, true}, + .hasNulls = false, + .extraMemory = 2} // "a"(1) + "b"(1) = 2 bytes extra + }; + // No data retained after processing all chunks + ExpectedChunk expectedRetainedData{ + .chunkData = {}, .chunkNonNulls = {}}; + validateChunk( + stream, std::move(chunker), expectedChunks, expectedRetainedData); + } +} +} // namespace facebook::nimble diff --git a/dwio/nimble/velox/tests/VeloxReaderTests.cpp b/dwio/nimble/velox/tests/VeloxReaderTests.cpp index 1f39bd49..0d138962 100644 --- a/dwio/nimble/velox/tests/VeloxReaderTests.cpp +++ b/dwio/nimble/velox/tests/VeloxReaderTests.cpp @@ -563,12 +563,14 @@ class VeloxReaderTests : public ::testing::Test { std::string& file, nimble::VeloxWriterOptions writerOptions) { auto writeFile = std::make_unique(&file); - nimble::FlushDecision decision; + bool flushDecision; + bool chunkDecision; writerOptions.enableChunking = true; writerOptions.minStreamChunkRawSize = folly::Random::rand32(30, rng); writerOptions.flushPolicyFactory = [&]() { return std::make_unique( - [&](auto&) { return decision; }); + /*flushLambda=*/[&](auto&) { return flushDecision; }, + /*chunkLambda=*/[&](auto&) { return chunkDecision; }); }; std::vector expected; @@ -579,16 +581,17 @@ class VeloxReaderTests : public ::testing::Test { auto vector = generator(type, i); int32_t rowIndex = 0; while (rowIndex < vector->size()) { - decision = nimble::FlushDecision::None; + flushDecision = false; + chunkDecision = false; auto batchSize = vector->size() - rowIndex; // Randomly produce chunks if (folly::Random::oneIn(2, rng)) { batchSize = folly::Random::rand32(0, batchSize, rng) + 1; - decision = nimble::FlushDecision::Chunk; + chunkDecision = true; } if ((perBatchFlush || folly::Random::oneIn(5, rng)) && (rowIndex + batchSize == vector->size())) { - decision = nimble::FlushDecision::Stripe; + flushDecision = true; } writer.write(vector->slice(rowIndex, batchSize)); rowIndex += batchSize; @@ -773,7 +776,7 @@ class VeloxReaderTests : public ::testing::Test { writerOptions.enableChunking = true; writerOptions.flushPolicyFactory = [&]() { return std::make_unique( - [&](auto&) { return nimble::FlushDecision::None; }); + /*flushLambda=*/[&](auto&) { return false; }); }; writerOptions.dictionaryArrayColumns.insert("dictionaryArray"); writerOptions.vectorDecoderVisitor = [&decodeCounter]() { @@ -5504,7 +5507,8 @@ TEST_F(VeloxReaderTests, ChunkStreamsWithNulls) { .flushPolicyFactory = [&]() { return std::make_unique( - [&](auto&) { return nimble::FlushDecision::Chunk; }); + /*flushLambda=*/[&](auto&) { return false; }, + /*chunkLambda=*/[&](auto&) { return true; }); }, .enableChunking = enableChunking}; auto file = nimble::test::createNimbleFile( diff --git a/dwio/nimble/velox/tests/VeloxWriterTests.cpp b/dwio/nimble/velox/tests/VeloxWriterTests.cpp index a4d11419..5cd4e1ba 100644 --- a/dwio/nimble/velox/tests/VeloxWriterTests.cpp +++ b/dwio/nimble/velox/tests/VeloxWriterTests.cpp @@ -18,10 +18,13 @@ #include "dwio/nimble/common/Exceptions.h" #include "dwio/nimble/common/tests/TestUtils.h" +#include "dwio/nimble/encodings/EncodingFactory.h" #include "dwio/nimble/encodings/EncodingLayoutCapture.h" +#include "dwio/nimble/encodings/EncodingUtils.h" #include "dwio/nimble/tablet/Constants.h" #include "dwio/nimble/velox/ChunkedStream.h" #include "dwio/nimble/velox/EncodingLayoutTree.h" +#include "dwio/nimble/velox/FlushPolicy.h" #include "dwio/nimble/velox/SchemaSerialization.h" #include "dwio/nimble/velox/StatsGenerated.h" #include "dwio/nimble/velox/VeloxReader.h" @@ -35,6 +38,12 @@ namespace facebook { +DEFINE_uint32( + writer_tests_seed, + 0, + "If provided, this seed will be used when executing tests. " + "Otherwise, a random seed will be used."); + class VeloxWriterTests : public ::testing::Test { protected: static void SetUpTestCase() { @@ -104,7 +113,7 @@ TEST_F(VeloxWriterTests, ExceptionOnClose) { std::move(writeFile), {.flushPolicyFactory = [&]() { return std::make_unique( - [&](auto&) { return nimble::FlushDecision::Stripe; }); + /*flushLambda=*/[&](auto&) { return true; }); }}); std::string error; try { @@ -289,29 +298,88 @@ std::vector generateBatches( velox::VectorFuzzer fuzzer( {.vectorSize = size, .nullRatio = 0.1}, &pool, seed); std::vector batches; - + batches.reserve(batchCount); for (size_t i = 0; i < batchCount; ++i) { batches.push_back(fuzzer.fuzzInputFlatRow(type)); } return batches; } + +struct ChunkSizeResults { + uint32_t stripeCount; + uint32_t minChunkCount; + uint32_t maxChunkCount; +}; + +ChunkSizeResults validateChunkSize( + nimble::VeloxReader& reader, + const uint64_t minStreamChunkRawSize) { + const auto& tablet = reader.tabletReader(); + auto& pool = reader.memoryPool(); + + const uint32_t stripeCount = tablet.stripeCount(); + uint32_t maxChunkCount = 0; + uint32_t minChunkCount = std::numeric_limits::max(); + + for (uint32_t stripeIndex = 0; stripeIndex < stripeCount; ++stripeIndex) { + const auto stripeIdentifier = tablet.getStripeIdentifier(stripeIndex); + const auto streamCount = tablet.streamCount(stripeIdentifier); + + std::vector streamIds(streamCount); + std::iota(streamIds.begin(), streamIds.end(), 0); + auto streamLoaders = tablet.load(stripeIdentifier, streamIds); + + for (uint32_t streamId = 0; streamId < streamLoaders.size(); ++streamId) { + if (!streamLoaders[streamId]) { + continue; + } + nimble::InMemoryChunkedStream chunkedStream{ + pool, std::move(streamLoaders[streamId])}; + uint32_t currentStreamChunkCount = 0; + while (chunkedStream.hasNext()) { + ++currentStreamChunkCount; + const auto chunk = chunkedStream.nextChunk(); + // Validate min chunk size when not last chunk + if (chunkedStream.hasNext()) { + const auto encoding = nimble::EncodingFactory::decode(pool, chunk); + const auto rowCount = encoding->rowCount(); + const auto dataType = encoding->dataType(); + const uint64_t chunkRawDataSize = + facebook::nimble::detail::dataTypeSize(dataType) * rowCount; + EXPECT_GE(chunkRawDataSize, minStreamChunkRawSize) + << "Stream " << streamId << " has a non-last chunk with size " + << chunkRawDataSize << " which is below min chunk size of " + << minStreamChunkRawSize; + } + } + maxChunkCount = std::max(maxChunkCount, currentStreamChunkCount); + minChunkCount = std::min(minChunkCount, currentStreamChunkCount); + } + } + + return ChunkSizeResults{ + .stripeCount = stripeCount, + .minChunkCount = minChunkCount, + .maxChunkCount = maxChunkCount, + }; +} } // namespace -struct RawStripeSizeFlushPolicyTestCase { +struct StripeRawSizeFlushPolicyTestCase { const size_t batchCount; const uint32_t rawStripeSize; const uint32_t stripeCount; }; -class RawStripeSizeFlushPolicyTest +class StripeRawSizeFlushPolicyTest : public VeloxWriterTests, - public ::testing::WithParamInterface {}; + public ::testing::WithParamInterface {}; -TEST_P(RawStripeSizeFlushPolicyTest, RawStripeSizeFlushPolicy) { +TEST_P(StripeRawSizeFlushPolicyTest, StripeRawSizeFlushPolicy) { auto type = velox::ROW({{"simple", velox::INTEGER()}}); nimble::VeloxWriterOptions writerOptions{.flushPolicyFactory = []() { // Buffering 256MB data before encoding stripes. - return std::make_unique( + return std::make_unique( GetParam().rawStripeSize); }}; @@ -385,7 +453,7 @@ TEST_F(VeloxWriterTests, MemoryReclaimPath) { TEST_F(VeloxWriterTests, FlushHugeStrings) { nimble::VeloxWriterOptions writerOptions{.flushPolicyFactory = []() { - return std::make_unique(1 * 1024 * 1024); + return std::make_unique(1 * 1024 * 1024); }}; velox::test::VectorMaker vectorMaker{leafPool_.get()}; @@ -1029,7 +1097,7 @@ TEST_F(VeloxWriterTests, CombineMultipleLayersOfDictionaries) { void testChunks( velox::memory::MemoryPool& rootPool, uint32_t minStreamChunkRawSize, - std::vector> vectors, + std::vector> vectors, std::function verifier, folly::F14FastSet flatMapColumns = {}) { ASSERT_LT(0, vectors.size()); @@ -1041,7 +1109,7 @@ void testChunks( std::string file; auto writeFile = std::make_unique(&file); - auto flushDecision = nimble::FlushDecision::None; + auto flushDecision = false; nimble::VeloxWriter writer( rootPool, type, @@ -1052,6 +1120,7 @@ void testChunks( .flushPolicyFactory = [&]() { return std::make_unique( + [&](auto&) { return false; }, [&](auto&) { return flushDecision; }); }, .enableChunking = true, @@ -1080,6 +1149,8 @@ void testChunks( ASSERT_TRUE(expected->equalValueAt(result.get(), i, i)); } ASSERT_FALSE(reader.next(1, result)); + + validateChunkSize(reader, minStreamChunkRawSize); } TEST_F(VeloxWriterTests, ChunkedStreamsRowAllNullsNoChunks) { @@ -1095,8 +1166,7 @@ TEST_F(VeloxWriterTests, ChunkedStreamsRowAllNullsNoChunks) { testChunks( *rootPool_, /* minStreamChunkRawSize */ 0, - {{vector, nimble::FlushDecision::None}, - {vector, nimble::FlushDecision::None}}, + {{vector, false}, {vector, false}}, [&](const auto& tablet) { auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); @@ -1131,8 +1201,7 @@ TEST_F(VeloxWriterTests, ChunkedStreamsRowAllNullsWithChunksMinSizeBig) { testChunks( *rootPool_, /* minStreamChunkRawSize */ 1024, - {{vector, nimble::FlushDecision::Chunk}, - {vector, nimble::FlushDecision::Chunk}}, + {{vector, true}, {vector, true}}, [&](const auto& tablet) { auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); @@ -1168,8 +1237,7 @@ TEST_F(VeloxWriterTests, ChunkedStreamsRowAllNullsWithChunksMinSizeZero) { testChunks( *rootPool_, /* minStreamChunkRawSize */ 0, - {{vector, nimble::FlushDecision::Chunk}, - {vector, nimble::FlushDecision::Chunk}}, + {{vector, true}, {vector, true}}, [&](const auto& tablet) { auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); @@ -1212,8 +1280,7 @@ TEST_F(VeloxWriterTests, ChunkedStreamsRowSomeNullsNoChunks) { testChunks( *rootPool_, /* minStreamChunkRawSize */ 0, - {{nullsVector, nimble::FlushDecision::None}, - {nonNullsVector, nimble::FlushDecision::None}}, + {{nullsVector, false}, {nonNullsVector, false}}, [&](const auto& tablet) { auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); @@ -1261,8 +1328,7 @@ TEST_F(VeloxWriterTests, ChunkedStreamsRowSomeNullsWithChunksMinSizeBig) { testChunks( *rootPool_, /* minStreamChunkRawSize */ 1024, - {{nullsVector, nimble::FlushDecision::Chunk}, - {nonNullsVector, nimble::FlushDecision::Chunk}}, + {{nullsVector, true}, {nonNullsVector, true}}, [&](const auto& tablet) { auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); @@ -1311,8 +1377,7 @@ TEST_F(VeloxWriterTests, ChunkedStreamsRowSomeNullsWithChunksMinSizeZero) { testChunks( *rootPool_, /* minStreamChunkRawSize */ 0, - {{nullsVector, nimble::FlushDecision::Chunk}, - {nonNullsVector, nimble::FlushDecision::Chunk}}, + {{nullsVector, true}, {nonNullsVector, true}}, [&](const auto& tablet) { auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); @@ -1353,8 +1418,7 @@ TEST_F(VeloxWriterTests, ChunkedStreamsRowNoNullsNoChunks) { testChunks( *rootPool_, /* minStreamChunkRawSize */ 0, - {{vector, nimble::FlushDecision::None}, - {vector, nimble::FlushDecision::None}}, + {{vector, false}, {vector, false}}, [&](const auto& tablet) { auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); @@ -1393,8 +1457,7 @@ TEST_F(VeloxWriterTests, ChunkedStreamsRowNoNullsWithChunksMinSizeBig) { testChunks( *rootPool_, /* minStreamChunkRawSize */ 1024, - {{vector, nimble::FlushDecision::Chunk}, - {vector, nimble::FlushDecision::Chunk}}, + {{vector, true}, {vector, true}}, [&](const auto& tablet) { auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); @@ -1434,8 +1497,7 @@ TEST_F(VeloxWriterTests, ChunkedStreamsRowNoNullsWithChunksMinSizeZero) { testChunks( *rootPool_, /* minStreamChunkRawSize */ 0, - {{vector, nimble::FlushDecision::Chunk}, - {vector, nimble::FlushDecision::Chunk}}, + {{vector, true}, {vector, true}}, [&](const auto& tablet) { auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); @@ -1475,8 +1537,7 @@ TEST_F(VeloxWriterTests, ChunkedStreamsChildAllNullsNoChunks) { testChunks( *rootPool_, /* minStreamChunkRawSize */ 0, - {{vector, nimble::FlushDecision::None}, - {vector, nimble::FlushDecision::None}}, + {{vector, false}, {vector, false}}, [&](const auto& tablet) { auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); @@ -1501,8 +1562,7 @@ TEST_F(VeloxWriterTests, ChunkedStreamsChildAllNullsWithChunksMinSizeBig) { testChunks( *rootPool_, /* minStreamChunkRawSize */ 1024, - {{vector, nimble::FlushDecision::Chunk}, - {vector, nimble::FlushDecision::Chunk}}, + {{vector, true}, {vector, true}}, [&](const auto& tablet) { auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); @@ -1527,8 +1587,7 @@ TEST_F(VeloxWriterTests, ChunkedStreamsChildAllNullsWithChunksMinSizeZero) { testChunks( *rootPool_, /* minStreamChunkRawSize */ 0, - {{vector, nimble::FlushDecision::Chunk}, - {vector, nimble::FlushDecision::Chunk}}, + {{vector, true}, {vector, true}}, [&](const auto& tablet) { auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); @@ -1560,8 +1619,7 @@ TEST_F(VeloxWriterTests, ChunkedStreamsFlatmapAllNullsNoChunks) { testChunks( *rootPool_, /* minStreamChunkRawSize */ 0, - {{vector, nimble::FlushDecision::None}, - {vector, nimble::FlushDecision::None}}, + {{vector, false}, {vector, false}}, [&](const auto& tablet) { auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); @@ -1603,8 +1661,7 @@ TEST_F(VeloxWriterTests, ChunkedStreamsFlatmapAllNullsWithChunksMinSizeBig) { testChunks( *rootPool_, /* minStreamChunkRawSize */ 1024, - {{vector, nimble::FlushDecision::Chunk}, - {vector, nimble::FlushDecision::Chunk}}, + {{vector, true}, {vector, true}}, [&](const auto& tablet) { auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); @@ -1647,8 +1704,7 @@ TEST_F(VeloxWriterTests, ChunkedStreamsFlatmapAllNullsWithChunksMinSizeZero) { testChunks( *rootPool_, /* minStreamChunkRawSize */ 0, - {{vector, nimble::FlushDecision::Chunk}, - {vector, nimble::FlushDecision::Chunk}}, + {{vector, true}, {vector, true}}, [&](const auto& tablet) { auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); @@ -1702,8 +1758,7 @@ TEST_F(VeloxWriterTests, ChunkedStreamsFlatmapSomeNullsNoChunks) { testChunks( *rootPool_, /* minStreamChunkRawSize */ 0, - {{nullsVector, nimble::FlushDecision::None}, - {nonNullsVector, nimble::FlushDecision::None}}, + {{nullsVector, false}, {nonNullsVector, false}}, [&](const auto& tablet) { auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); @@ -1780,8 +1835,7 @@ TEST_F(VeloxWriterTests, ChunkedStreamsFlatmapSomeNullsWithChunksMinSizeBig) { testChunks( *rootPool_, /* minStreamChunkRawSize */ 1024, - {{nullsVector, nimble::FlushDecision::Chunk}, - {nonNullsVector, nimble::FlushDecision::Chunk}}, + {{nullsVector, true}, {nonNullsVector, true}}, [&](const auto& tablet) { auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); @@ -1861,8 +1915,7 @@ TEST_F(VeloxWriterTests, ChunkedStreamsFlatmapSomeNullsWithChunksMinSizeZero) { testChunks( *rootPool_, /* minStreamChunkRawSize */ 0, - {{nullsVector, nimble::FlushDecision::Chunk}, - {nonNullsVector, nimble::FlushDecision::Chunk}}, + {{nullsVector, true}, {nonNullsVector, true}}, [&](const auto& tablet) { auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); @@ -1952,28 +2005,410 @@ TEST_F(VeloxWriterTests, RawSizeWritten) { ASSERT_EQ(expectedRawSize, rawSize); } +struct ChunkFlushPolicyTestCase { + const size_t batchCount{20}; + const bool enableChunking{true}; + const uint64_t targetStripeSizeBytes{250 << 10}; + const uint64_t writerMemoryHighThresholdBytes{80 << 10}; + const uint64_t writerMemoryLowThresholdBytes{75 << 10}; + const double estimatedCompressionFactor{1.3}; + const uint32_t minStreamChunkRawSize{100}; + const uint32_t maxStreamChunkRawSize{128 << 10}; + const uint32_t expectedStripeCount{0}; + const uint32_t expectedMaxChunkCount{0}; + const uint32_t expectedMinChunkCount{0}; + const uint32_t chunkedStreamBatchSize{2}; +}; + +class ChunkFlushPolicyTest + : public VeloxWriterTests, + public ::testing::WithParamInterface {}; + +TEST_P(ChunkFlushPolicyTest, ChunkFlushPolicyIntegration) { + const auto type = velox::ROW( + {{"BIGINT", velox::BIGINT()}, {"SMALLINT", velox::SMALLINT()}}); + nimble::VeloxWriterOptions writerOptions{ + .minStreamChunkRawSize = GetParam().minStreamChunkRawSize, + .maxStreamChunkRawSize = GetParam().maxStreamChunkRawSize, + .chunkedStreamBatchSize = GetParam().chunkedStreamBatchSize, + .flushPolicyFactory = GetParam().enableChunking + ? []() -> std::unique_ptr { + return std::make_unique( + nimble::ChunkFlushPolicyConfig{ + .writerMemoryHighThresholdBytes = GetParam().writerMemoryHighThresholdBytes, + .writerMemoryLowThresholdBytes = GetParam().writerMemoryLowThresholdBytes, + .targetStripeSizeBytes = GetParam().targetStripeSizeBytes, + .estimatedCompressionFactor = + GetParam().estimatedCompressionFactor, + }); + } + : []() -> std::unique_ptr { + return std::make_unique( + GetParam().targetStripeSizeBytes); + }, + .enableChunking = GetParam().enableChunking, + }; + + std::string file; + auto writeFile = std::make_unique(&file); + + nimble::VeloxWriter writer( + *rootPool_, type, std::move(writeFile), std::move(writerOptions)); + const auto batches = generateBatches( + type, + GetParam().batchCount, + /*size=*/4000, + /*seed=*/20221110, + *leafPool_); + + for (const auto& batch : batches) { + writer.write(batch); + } + writer.close(); + velox::InMemoryReadFile readFile(file); + auto selector = std::make_shared(type); + nimble::VeloxReader reader(*leafPool_, &readFile); + ChunkSizeResults result = + validateChunkSize(reader, GetParam().minStreamChunkRawSize); + + EXPECT_EQ(GetParam().expectedStripeCount, result.stripeCount); + EXPECT_EQ(GetParam().expectedMaxChunkCount, result.maxChunkCount); + EXPECT_EQ(GetParam().expectedMinChunkCount, result.minChunkCount); +} + +TEST_F(VeloxWriterTests, FuzzComplex) { + auto type = velox::ROW( + {{"array", velox::ARRAY(velox::REAL())}, + {"dict_array", velox::ARRAY(velox::REAL())}, + {"map", velox::MAP(velox::INTEGER(), velox::DOUBLE())}, + {"row", + velox::ROW({ + {"a", velox::REAL()}, + {"b", velox::INTEGER()}, + })}, + {"row", + velox::ROW( + {{"nested_row", + velox::ROW( + {{"nested_nested_row", velox::ROW({{"a", velox::INTEGER()}})}, + {"b", velox::INTEGER()}})}})}, + {"map", + velox::MAP(velox::INTEGER(), velox::ROW({{"a", velox::INTEGER()}}))}, + {"nested", + velox::ARRAY(velox::ROW({ + {"a", velox::INTEGER()}, + {"b", velox::MAP(velox::REAL(), velox::REAL())}, + }))}, + {"nested_map_array1", + velox::MAP(velox::INTEGER(), velox::ARRAY(velox::REAL()))}, + {"nested_map_array2", + velox::MAP(velox::INTEGER(), velox::ARRAY(velox::INTEGER()))}, + {"dict_map", velox::MAP(velox::INTEGER(), velox::INTEGER())}}); + auto rowType = std::dynamic_pointer_cast(type); + uint32_t seed = FLAGS_writer_tests_seed > 0 ? FLAGS_writer_tests_seed + : folly::Random::rand32(); + LOG(INFO) << "seed: " << seed; + std::mt19937 rng{seed}; + for (auto parallelismFactor : {0U, 1U, std::thread::hardware_concurrency()}) { + std::shared_ptr executor; + nimble::VeloxWriterOptions writerOptions; + writerOptions.enableChunking = true; + writerOptions.flushPolicyFactory = + []() -> std::unique_ptr { + return std::make_unique( + nimble::ChunkFlushPolicyConfig{ + .writerMemoryHighThresholdBytes = 200 << 10, + .writerMemoryLowThresholdBytes = 100 << 10, + .targetStripeSizeBytes = 100 << 10, + .estimatedCompressionFactor = 1.7, + }); + }; + + LOG(INFO) << "Parallelism Factor: " << parallelismFactor; + writerOptions.dictionaryArrayColumns.insert("nested_map_array1"); + writerOptions.dictionaryArrayColumns.insert("nested_map_array2"); + writerOptions.dictionaryArrayColumns.insert("dict_array"); + writerOptions.deduplicatedMapColumns.insert("dict_map"); + + if (parallelismFactor > 0) { + executor = + std::make_shared(parallelismFactor); + writerOptions.encodingExecutor = folly::getKeepAliveToken(*executor); + writerOptions.writeExecutor = folly::getKeepAliveToken(*executor); + } + + const auto iterations = 4; + for (auto i = 0; i < iterations; ++i) { + writerOptions.minStreamChunkRawSize = + std::uniform_int_distribution(10, 4096)(rng); + const auto batchSize = + std::uniform_int_distribution(10, 400)(rng); + const auto batchCount = 10; + + std::string file; + auto writeFile = std::make_unique(&file); + nimble::VeloxWriter writer( + *rootPool_, type, std::move(writeFile), writerOptions); + const auto batches = generateBatches( + type, + /*batchCount=*/batchCount, + /*size=*/batchSize, + /*seed=*/seed, + *leafPool_); + + for (const auto& batch : batches) { + writer.write(batch); + } + writer.close(); + + velox::InMemoryReadFile readFile(file); + nimble::VeloxReader reader(*leafPool_, &readFile); + validateChunkSize(reader, writerOptions.minStreamChunkRawSize); + } + } +} + +TEST_F(VeloxWriterTests, BatchedChunkingRelievesMemoryPressure) { + // Verify we stop chunking early when chunking relieves memory pressure. + const uint32_t seed = FLAGS_writer_tests_seed > 0 ? FLAGS_writer_tests_seed + : folly::Random::rand32(); + LOG(INFO) << "seed: " << seed; + std::mt19937 rng{seed}; + const uint32_t rowCount = + std::uniform_int_distribution(1000, 4096)(rng); + + velox::VectorFuzzer fuzzer({.vectorSize = rowCount}, leafPool_.get(), seed); + const auto stringColumn = fuzzer.fuzzFlat(velox::VARCHAR()); + const auto intColumn = fuzzer.fuzzFlat(velox::INTEGER()); + + nimble::RawSizeContext context; + nimble::OrderedRanges ranges; + ranges.add(0, rowCount); + const uint64_t stringColumnRawSize = + nimble::getRawSizeFromVector(stringColumn, ranges, context) + + sizeof(std::string_view) * rowCount; + const uint64_t intColumnRawSize = + nimble::getRawSizeFromVector(intColumn, ranges, context); + + constexpr size_t kColumnCount = 20; + constexpr size_t kBatchSize = 4; + std::vector children(kColumnCount); + std::vector columnNames(kColumnCount); + uint64_t totalRawSize = 0; + for (size_t i = 0; i < kColumnCount; i += 2) { + columnNames[i] = fmt::format("string_column_{}", i); + columnNames[i + 1] = fmt::format("int_column_{}", i); + children[i] = stringColumn; + children[i + 1] = intColumn; + totalRawSize += intColumnRawSize + stringColumnRawSize; + } + + velox::test::VectorMaker vectorMaker{leafPool_.get()}; + const auto rowVector = vectorMaker.rowVector(columnNames, children); + + // We will return true twice and false once + const std::vector expectedChunkingDecisions{true, true, false}; + std::vector actualChunkingDecisions; + + // We will be chunking the large streams in the first two batches. 8 string + // streams in total. We set the expected rawSize after chunking these two + // batches as our memory threshold. + const uint64_t memoryPressureThreshold = + totalRawSize - (2 * kBatchSize * stringColumnRawSize); + + nimble::VeloxWriterOptions writerOptions; + writerOptions.chunkedStreamBatchSize = kBatchSize; + writerOptions.enableChunking = true; + writerOptions.flushPolicyFactory = + [&]() -> std::unique_ptr { + return std::make_unique( + [](const auto&) { return true; }, + [&](const nimble::StripeProgress& stripeProgress) { + const bool shouldChunk = + stripeProgress.stripeRawSize > memoryPressureThreshold; + actualChunkingDecisions.push_back(shouldChunk); + return shouldChunk; + }); + }; + + std::string file; + auto writeFile = std::make_unique(&file); + nimble::VeloxWriter writer( + *rootPool_, rowVector->type(), std::move(writeFile), writerOptions); + writer.write(rowVector); + writer.close(); + + EXPECT_THAT( + expectedChunkingDecisions, + ::testing::ElementsAreArray(actualChunkingDecisions)); + + velox::InMemoryReadFile readFile(file); + nimble::VeloxReader reader(*leafPool_, &readFile); + validateChunkSize(reader, writerOptions.minStreamChunkRawSize); +} + INSTANTIATE_TEST_CASE_P( - RawStripeSizeFlushPolicyTestSuite, - RawStripeSizeFlushPolicyTest, + StripeRawSizeFlushPolicyTestSuite, + StripeRawSizeFlushPolicyTest, ::testing::Values( - RawStripeSizeFlushPolicyTestCase{ + StripeRawSizeFlushPolicyTestCase{ .batchCount = 50, .rawStripeSize = 256 << 10, .stripeCount = 4}, - RawStripeSizeFlushPolicyTestCase{ + StripeRawSizeFlushPolicyTestCase{ .batchCount = 100, .rawStripeSize = 256 << 10, .stripeCount = 7}, - RawStripeSizeFlushPolicyTestCase{ + StripeRawSizeFlushPolicyTestCase{ .batchCount = 100, .rawStripeSize = 256 << 11, .stripeCount = 4}, - RawStripeSizeFlushPolicyTestCase{ + StripeRawSizeFlushPolicyTestCase{ .batchCount = 100, .rawStripeSize = 256 << 12, .stripeCount = 2}, - RawStripeSizeFlushPolicyTestCase{ + StripeRawSizeFlushPolicyTestCase{ .batchCount = 100, .rawStripeSize = 256 << 20, .stripeCount = 1})); + +INSTANTIATE_TEST_CASE_P( + ChunkFlushPolicyTestSuite, + ChunkFlushPolicyTest, + ::testing::Values( + // Base case (no chunking, RawStripeSizeFlushPolicy) + ChunkFlushPolicyTestCase{ + .batchCount = 20, + .enableChunking = false, + .targetStripeSizeBytes = 250 << 10, // 250KB + .writerMemoryHighThresholdBytes = 80 << 10, + .writerMemoryLowThresholdBytes = 75 << 10, + .estimatedCompressionFactor = 1.3, + .minStreamChunkRawSize = 100, + .maxStreamChunkRawSize = 128 << 10, + .expectedStripeCount = 4, + .expectedMaxChunkCount = 1, + .expectedMinChunkCount = 1, + .chunkedStreamBatchSize = 2, + }, + // Baseline with default settings (has chunking) + ChunkFlushPolicyTestCase{ + .batchCount = 20, + .enableChunking = true, + .targetStripeSizeBytes = 250 << 10, // 250KB + .writerMemoryHighThresholdBytes = 80 << 10, + .writerMemoryLowThresholdBytes = 75 << 10, + .estimatedCompressionFactor = 1.3, + .minStreamChunkRawSize = 100, + .maxStreamChunkRawSize = 128 << 10, + .expectedStripeCount = 7, + .expectedMaxChunkCount = 2, + .expectedMinChunkCount = 1, + .chunkedStreamBatchSize = 2, + }, + // Reducing maxStreamChunkRawSize produces more chunks + ChunkFlushPolicyTestCase{ + .batchCount = 20, + .enableChunking = true, + .targetStripeSizeBytes = 250 << 10, // 250KB + .writerMemoryHighThresholdBytes = 80 << 10, + .writerMemoryLowThresholdBytes = 75 << 10, + .estimatedCompressionFactor = 1.0, + .minStreamChunkRawSize = 100, + .maxStreamChunkRawSize = 12 << 10, // -126KB + .expectedStripeCount = 8, + .expectedMaxChunkCount = 9, // +7 + .expectedMinChunkCount = 2, // +1 + .chunkedStreamBatchSize = 10, + }, + // High memory regression threshold and no compression + // Stripe count identical to RawStripeSizeFlushPolicy + ChunkFlushPolicyTestCase{ + .batchCount = 20, + .enableChunking = true, + .targetStripeSizeBytes = 250 << 10, // 250KB + .writerMemoryHighThresholdBytes = 500 + << 10, // 500KB (as opposed to 80 KB in other cases) + .writerMemoryLowThresholdBytes = 75 << 10, + .estimatedCompressionFactor = + 1.0, // No compression (as opposed to 1.3 in other cases) + .minStreamChunkRawSize = 100, + .maxStreamChunkRawSize = 128 << 10, + .expectedStripeCount = 4, + .expectedMaxChunkCount = 2, + .expectedMinChunkCount = 1, + .chunkedStreamBatchSize = 2, + }, + // Low memory regression threshold + // Produces file with more min chunks per stripe + ChunkFlushPolicyTestCase{ + .batchCount = 20, + .enableChunking = true, + .targetStripeSizeBytes = 250 << 10, + .writerMemoryHighThresholdBytes = 40 + << 10, // 40KB (as opposed to 80KB in other cases) + .writerMemoryLowThresholdBytes = 35 + << 10, // 35KB (as opposed to 75KB in other cases) + .estimatedCompressionFactor = 1.3, + .minStreamChunkRawSize = 100, + .maxStreamChunkRawSize = 128 << 10, + .expectedStripeCount = 10, + .expectedMaxChunkCount = 2, + .expectedMinChunkCount = 2, // +1 chunk + .chunkedStreamBatchSize = 2, + }, + // High target stripe size bytes (with disabled memory pressure + // optimization) produces fewer stripes. + ChunkFlushPolicyTestCase{ + .batchCount = 20, + .enableChunking = true, + .targetStripeSizeBytes = 900 + << 10, // 900KB (as opposed to 250KB in other cases) + .writerMemoryHighThresholdBytes = 2 + << 20, // 2MB (as opposed to 80KB in other cases) + .writerMemoryLowThresholdBytes = 1 + << 20, // 1MB (as opposed to 75KB in other cases) + .estimatedCompressionFactor = 1.3, + .minStreamChunkRawSize = 100, + .maxStreamChunkRawSize = 128 << 10, + .expectedStripeCount = 1, + .expectedMaxChunkCount = 5, + .expectedMinChunkCount = 2, + .chunkedStreamBatchSize = 2, + + }, + // Low target stripe size bytes (with disabled memory pressure + // optimization) produces more stripes. Single chunks. + ChunkFlushPolicyTestCase{ + .batchCount = 20, + .enableChunking = true, + .targetStripeSizeBytes = 90 + << 10, // 90KB (as opposed to 250KB in other cases) + .writerMemoryHighThresholdBytes = 2 + << 20, // 2MB (as opposed to 80KB in other cases) + .writerMemoryLowThresholdBytes = 1 + << 20, // 1MB (as opposed to 75KB in other cases) + .estimatedCompressionFactor = 1.3, + .minStreamChunkRawSize = 100, + .maxStreamChunkRawSize = 128 << 10, + .expectedStripeCount = 7, + .expectedMaxChunkCount = 1, + .expectedMinChunkCount = 1, + .chunkedStreamBatchSize = 2, + + }, + // Higher chunked stream batch size (no change in policy) + ChunkFlushPolicyTestCase{ + .batchCount = 20, + .enableChunking = true, + .targetStripeSizeBytes = 250 << 10, // 250KB + .writerMemoryHighThresholdBytes = 80 << 10, + .writerMemoryLowThresholdBytes = 75 << 10, + .estimatedCompressionFactor = 1.0, + .minStreamChunkRawSize = 100, + .maxStreamChunkRawSize = 128 << 10, + .expectedStripeCount = 7, + .expectedMaxChunkCount = 2, + .expectedMinChunkCount = 1, + .chunkedStreamBatchSize = 10})); } // namespace facebook