diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 7cb7c15..b6a05cf 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -16,7 +16,7 @@ repos: - repo: https://github.com/pre-commit/mirrors-clang-format - rev: v18.1.3 + rev: v21.1.0 hooks: - id: clang-format - repo: https://github.com/BlankSpruce/gersemi diff --git a/dwio/nimble/common/MetricsLogger.h b/dwio/nimble/common/MetricsLogger.h index 63efd93..6868021 100644 --- a/dwio/nimble/common/MetricsLogger.h +++ b/dwio/nimble/common/MetricsLogger.h @@ -74,9 +74,10 @@ struct FileCloseMetrics { }; enum class LogOperation { + Write, + Flush, + Close, StripeLoad, - StripeFlush, - FileClose, CompressionContext, }; diff --git a/dwio/nimble/velox/FlushPolicy.cpp b/dwio/nimble/velox/FlushPolicy.cpp index 84e3d62..2caf682 100644 --- a/dwio/nimble/velox/FlushPolicy.cpp +++ b/dwio/nimble/velox/FlushPolicy.cpp @@ -16,15 +16,35 @@ #include "dwio/nimble/velox/FlushPolicy.h" namespace facebook::nimble { - -FlushDecision RawStripeSizeFlushPolicy::shouldFlush( - const StripeProgress& stripeProgress) { - return stripeProgress.rawStripeSize >= rawStripeSize_ ? FlushDecision::Stripe - : FlushDecision::None; +// Relieve memory pressure with chunking. +bool ChunkFlushPolicy::shouldChunk(const StripeProgress& stripeProgress) { + const uint64_t inMemoryBytes = + stripeProgress.stripeRawSize + stripeProgress.stripeEncodedSize; + const auto writerMemoryThreshold = (lastChunkDecision_ == false) + ? config_.writerMemoryHighThresholdBytes + : config_.writerMemoryLowThresholdBytes; + lastChunkDecision_ = inMemoryBytes > writerMemoryThreshold; + return lastChunkDecision_; } -void RawStripeSizeFlushPolicy::onClose() { - // No-op +// Optimize for expected storage stripe size. +bool ChunkFlushPolicy::shouldFlush(const StripeProgress& stripeProgress) { + // When chunking is unable to relieve memory pressure, we flush stripe. + if (stripeProgress.stripeRawSize + stripeProgress.stripeEncodedSize > + config_.writerMemoryHighThresholdBytes) { + return true; + } + + double compressionFactor = config_.estimatedCompressionFactor; + // Use historical compression ratio as a heuristic when available. + if (stripeProgress.stripeEncodedSize > 0) { + compressionFactor = + static_cast(stripeProgress.stripeEncodedLogicalSize) / + stripeProgress.stripeEncodedSize; + } + double expectedEncodedStripeSize = stripeProgress.stripeEncodedSize + + stripeProgress.stripeRawSize / std::max(compressionFactor, 1.0); + return (expectedEncodedStripeSize >= config_.targetStripeSizeBytes); } } // namespace facebook::nimble diff --git a/dwio/nimble/velox/FlushPolicy.h b/dwio/nimble/velox/FlushPolicy.h index 1ad8667..241801e 100644 --- a/dwio/nimble/velox/FlushPolicy.h +++ b/dwio/nimble/velox/FlushPolicy.h @@ -20,59 +20,89 @@ namespace facebook::nimble { +// TODO: Set default values for these parameters based on DISCO experiments. +// Use abitrary values for now. +struct ChunkFlushPolicyConfig { + // Threshold to trigger chunking to relieve memory pressure + const uint64_t writerMemoryHighThresholdBytes{200 * 1024L * 1024L}; + // Threshold below which chunking stops and stripe size optimization resumes + const uint64_t writerMemoryLowThresholdBytes{100 * 1024L * 1024L}; + // Target size for encoded stripes + const uint64_t targetStripeSizeBytes{100 * 1024L * 1024L}; + // Expected ratio of raw to encoded data + const double estimatedCompressionFactor{1.3}; +}; + struct StripeProgress { // Size of the stripe data when it's fully decompressed and decoded - const uint64_t rawStripeSize; + const uint64_t stripeRawSize; // Size of the stripe after buffered data is encoded and optionally compressed - const uint64_t stripeSize; - // Size of the allocated buffer in the writer - const uint64_t bufferSize; -}; - -enum class FlushDecision : uint8_t { - None = 0, - Stripe = 1, - Chunk = 2, + const uint64_t stripeEncodedSize; + // Logical size of the now encoded stripe data + const uint64_t stripeEncodedLogicalSize; }; class FlushPolicy { public: virtual ~FlushPolicy() = default; - virtual FlushDecision shouldFlush(const StripeProgress& stripeProgress) = 0; - // Required for memory pressure coordination for now. Will remove in the - // future. - virtual void onClose() = 0; + virtual bool shouldFlush(const StripeProgress& stripeProgress) = 0; + virtual bool shouldChunk(const StripeProgress& stripeProgress) = 0; }; -class RawStripeSizeFlushPolicy final : public FlushPolicy { +class StripeRawSizeFlushPolicy final : public FlushPolicy { public: - explicit RawStripeSizeFlushPolicy(uint64_t rawStripeSize) - : rawStripeSize_{rawStripeSize} {} + explicit StripeRawSizeFlushPolicy(uint64_t stripeRawSize) + : stripeRawSize_{stripeRawSize} {} - FlushDecision shouldFlush(const StripeProgress& stripeProgress) override; + bool shouldFlush(const StripeProgress& stripeProgress) override { + return stripeProgress.stripeRawSize >= stripeRawSize_; + } - void onClose() override; + bool shouldChunk(const StripeProgress&) override { + return false; + } private: - const uint64_t rawStripeSize_; + const uint64_t stripeRawSize_; }; class LambdaFlushPolicy : public FlushPolicy { public: explicit LambdaFlushPolicy( - std::function lambda) - : lambda_{lambda} {} + std::function flushLambda = + [](const StripeProgress&) { return false; }, + std::function chunkLambda = + [](const StripeProgress&) { return false; }) + : flushLambda_{std::move(flushLambda)}, + chunkLambda_{std::move(chunkLambda)} {} - FlushDecision shouldFlush(const StripeProgress& stripeProgress) override { - return lambda_(stripeProgress); + bool shouldFlush(const StripeProgress& stripeProgress) override { + return flushLambda_(stripeProgress); } - void onClose() override { - // No-op + bool shouldChunk(const StripeProgress& stripeProgress) override { + return chunkLambda_(stripeProgress); } private: - std::function lambda_; + std::function flushLambda_; + std::function chunkLambda_; +}; + +class ChunkFlushPolicy : public FlushPolicy { + public: + explicit ChunkFlushPolicy(ChunkFlushPolicyConfig config) + : config_{std::move(config)}, lastChunkDecision_{false} {} + + // Optimize for expected storage stripe size. + bool shouldFlush(const StripeProgress& stripeProgress) override; + + // Relieve memory pressure with chunking. + bool shouldChunk(const StripeProgress& stripeProgress) override; + + private: + const ChunkFlushPolicyConfig config_; + bool lastChunkDecision_; }; } // namespace facebook::nimble diff --git a/dwio/nimble/velox/StreamChunker.cpp b/dwio/nimble/velox/StreamChunker.cpp new file mode 100644 index 0000000..defb9ff --- /dev/null +++ b/dwio/nimble/velox/StreamChunker.cpp @@ -0,0 +1,70 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "dwio/nimble/velox/StreamChunker.h" + +namespace facebook::nimble { +template +std::unique_ptr getStreamChunkerTyped( + StreamData& streamData, + const StreamChunkerOptions& options) { + if (auto* contentStreamChunker = + dynamic_cast*>(&streamData)) { + return std::make_unique>( + *contentStreamChunker, options); + } else if ( + auto* nullableContentStreamData = + dynamic_cast*>(&streamData)) { + // When there are no nulls in the NullableContentStreamData stream, we treat + // it as a regular ContentStreamData stream. + if (!streamData.hasNulls()) { + return std::make_unique< + ContentStreamChunker>>( + *nullableContentStreamData, options); + } + return std::make_unique>( + *nullableContentStreamData, options); + } else if ( + auto* nullsStreamData = dynamic_cast(&streamData)) { + return std::make_unique(*nullsStreamData, options); + } + NIMBLE_UNREACHABLE(fmt::format("Unsupported streamData type")) +} + +std::unique_ptr getStreamChunker( + StreamData& streamData, + const StreamChunkerOptions& options) { + const auto scalarKind = streamData.descriptor().scalarKind(); + switch (scalarKind) { +#define HANDLE_SCALAR_KIND(kind, type) \ + case ScalarKind::kind: \ + return getStreamChunkerTyped(streamData, options); + HANDLE_SCALAR_KIND(Bool, bool); + HANDLE_SCALAR_KIND(Int8, int8_t); + HANDLE_SCALAR_KIND(Int16, int16_t); + HANDLE_SCALAR_KIND(Int32, int32_t); + HANDLE_SCALAR_KIND(UInt32, uint32_t); + HANDLE_SCALAR_KIND(Int64, int64_t); + HANDLE_SCALAR_KIND(Float, float); + HANDLE_SCALAR_KIND(Double, double); + HANDLE_SCALAR_KIND(String, std::string_view); + HANDLE_SCALAR_KIND(Binary, std::string_view); + default: + NIMBLE_UNREACHABLE( + fmt::format("Unsupported scalar kind {}", toString(scalarKind))); + } +} +} // namespace facebook::nimble diff --git a/dwio/nimble/velox/StreamChunker.h b/dwio/nimble/velox/StreamChunker.h new file mode 100644 index 0000000..2fafdfa --- /dev/null +++ b/dwio/nimble/velox/StreamChunker.h @@ -0,0 +1,549 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "dwio/nimble/velox/StreamData.h" + +namespace facebook::nimble { +namespace detail { +// When we create new buffers, it's likely that these buffers will end up +// growing to the size that previously triggered chunking. As a trafeoff between +// minimizing reallocations (for performance) and actually releasing memory (to +// relieve memory pressure), we heuristically set the new buffer size to be +// maxChunkSize_ plus 50% of the difference between the current buffer capacity +// and maxChunkSize_. This ensures the buffer is not too small (causing frequent +// reallocations) nor too large (wasting memory). +template +uint64_t getNewBufferCapacity( + uint64_t maxChunkSize, + uint64_t currentCapacityCount, + uint64_t requiredCapacityCount) { + const auto maxChunkElementCount = maxChunkSize / sizeof(T); + if (currentCapacityCount < maxChunkElementCount) { + return currentCapacityCount; + } + NIMBLE_DASSERT( + requiredCapacityCount <= currentCapacityCount, + "Required capacity should not exceed current capacity"); + return maxChunkElementCount + + (currentCapacityCount - maxChunkElementCount) * 0.5; +} + +inline bool shouldOmitDataStream( + const StreamData& streamData, + uint64_t minChunkSize, + bool isFirstChunk) { + if (streamData.data().size() > minChunkSize) { + return false; + } + // When all values are null, the values stream is omitted. + return isFirstChunk || streamData.nonNulls().empty(); +} + +inline bool shouldOmitNullStream( + const NullsStreamData& streamData, + uint64_t minChunkSize, + bool isFirstChunk) { + // When all values are non-nulls, we omit the entire null stream. + if (streamData.hasNulls() && streamData.nonNulls().size() > minChunkSize) { + return false; + } + return isFirstChunk || streamData.empty(); +} +} // namespace detail + +/** + * Options for configuring StreamChunker behavior. + */ +struct StreamChunkerOptions { + uint64_t minChunkSize; + uint64_t maxChunkSize; + bool ensureFullChunks; + bool isFirstChunk; +}; + +/** + * Breaks streams into manageable chunks based on size thresholds. + * The `compact()` method should be called after use to reclaim memory from + * processed StreamData values. + */ +class StreamChunker { + public: + StreamChunker() = default; + + // Returns the next chunk of stream data. + virtual std::optional next() = 0; + + // Erases processed data to reclaim memory. + virtual void compact() = 0; + + virtual ~StreamChunker() = default; + + protected: + // Helper struct to hold result of nextChunkSize helper methods. + struct ChunkSize { + size_t dataElementCount = 0; + size_t nullElementCount = 0; + uint64_t rollingChunkSize = 0; + uint64_t extraMemory = 0; + }; +}; + +std::unique_ptr getStreamChunker( + StreamData& streamData, + const StreamChunkerOptions& options); + +template > +class ContentStreamChunker final : public StreamChunker { + public: + explicit ContentStreamChunker( + StreamDataT& streamData, + const StreamChunkerOptions& options) + : streamData_{streamData}, + minChunkSize_{options.minChunkSize}, + maxChunkSize_{options.maxChunkSize}, + dataElementOffset_{0}, + extraMemory_{streamData_.extraMemory()}, + ensureFullChunks_{options.ensureFullChunks} { + static_assert( + std::is_same_v> || + std::is_same_v>, + "StreamDataT must be either ContentStreamData or NullableContentStreamData"); + NIMBLE_DASSERT( + !streamData.hasNulls(), + "Streams with nulls should be handled by NullableContentStreamData"); + } + + std::optional next() override { + const auto& chunkSize = nextChunkSize(); + if (chunkSize.rollingChunkSize == 0) { + return std::nullopt; + } + + std::string_view dataChunk = { + reinterpret_cast( + streamData_.mutableData().data() + dataElementOffset_), + chunkSize.dataElementCount * sizeof(T)}; + dataElementOffset_ += chunkSize.dataElementCount; + extraMemory_ -= chunkSize.extraMemory; + + return StreamDataView{streamData_.descriptor(), dataChunk}; + } + + private: + ChunkSize nextChunkSize() { + const size_t maxChunkValuesCount = maxChunkSize_ / sizeof(T); + const size_t remainingValuesCount = + streamData_.mutableData().size() - dataElementOffset_; + if ((ensureFullChunks_ && remainingValuesCount < maxChunkValuesCount) || + (remainingValuesCount < minChunkSize_ / sizeof(T))) { + return ChunkSize{}; + } + const size_t chunkValuesCount = + std::min(maxChunkValuesCount, remainingValuesCount); + return ChunkSize{ + .dataElementCount = chunkValuesCount, + .rollingChunkSize = chunkValuesCount * sizeof(T)}; + } + + ChunkSize nextStringChunkSize() { + const auto& data = streamData_.mutableData(); + size_t stringCount = 0; + uint64_t rollingChunkSize = 0; + uint64_t rollingExtraMemory = 0; + bool fullChunk = false; + for (size_t i = dataElementOffset_; i < data.size(); ++i) { + const auto& str = data[i]; + const uint64_t strSize = str.size() + sizeof(std::string_view); + if (rollingChunkSize + strSize > maxChunkSize_) { + fullChunk = true; + break; + } + + rollingExtraMemory += str.size(); + rollingChunkSize += strSize; + ++stringCount; + } + + fullChunk = fullChunk || (rollingChunkSize == maxChunkSize_); + if ((ensureFullChunks_ && !fullChunk) || + (rollingChunkSize < minChunkSize_)) { + return ChunkSize{}; + } + + return ChunkSize{ + .dataElementCount = stringCount, + .rollingChunkSize = rollingChunkSize, + .extraMemory = rollingExtraMemory}; + } + + void compact() override { + // No changes made to stream data, nothing to compact. + if (dataElementOffset_ == 0) { + return; + } + + auto& currentData = streamData_.mutableData(); + const uint64_t remainingDataCount = currentData.size() - dataElementOffset_; + const auto newDataCapactity = detail::getNewBufferCapacity( + maxChunkSize_, currentData.capacity(), remainingDataCount); + + // Move and clear existing buffer + auto tempData = std::move(currentData); + streamData_.reset(); + NIMBLE_DASSERT( + streamData_.empty(), "StreamData should be empty after reset"); + + auto& mutableData = streamData_.mutableData(); + mutableData.reserve(newDataCapactity); + NIMBLE_DASSERT( + mutableData.capacity() >= newDataCapactity, + "Data buffer capacity should be at least new capacity"); + + mutableData.resize(remainingDataCount); + NIMBLE_DASSERT( + mutableData.size() == remainingDataCount, + "Data buffer size should be equal to remaining data count"); + + std::copy_n( + tempData.begin() + dataElementOffset_, + remainingDataCount, + mutableData.begin()); + dataElementOffset_ = 0; + streamData_.extraMemory() = extraMemory_; + } + + StreamDataT& streamData_; + uint64_t minChunkSize_; + uint64_t maxChunkSize_; + size_t dataElementOffset_; + size_t extraMemory_; + bool ensureFullChunks_; +}; + +template <> +inline StreamChunker::ChunkSize ContentStreamChunker< + std::string_view, + ContentStreamData>::nextChunkSize() { + return nextStringChunkSize(); +} + +template <> +inline StreamChunker::ChunkSize ContentStreamChunker< + std::string_view, + NullableContentStreamData>::nextChunkSize() { + return nextStringChunkSize(); +} + +class NullsStreamChunker final : public StreamChunker { + public: + explicit NullsStreamChunker( + NullsStreamData& streamData, + const StreamChunkerOptions& options) + : streamData_{streamData}, + minChunkSize_{options.minChunkSize}, + maxChunkSize_{options.maxChunkSize}, + nonNullsOffset_{0}, + omitStream_{detail::shouldOmitNullStream( + streamData, + options.minChunkSize, + options.isFirstChunk)}, + ensureFullChunks_{options.ensureFullChunks} { + static_assert(sizeof(bool) == 1); + // No need to materialize nulls stream if it's omitted. + if (!omitStream_) { + streamData.materialize(); + } + } + + std::optional next() override { + if (omitStream_) { + return std::nullopt; + } + + auto& nonNulls = streamData_.mutableNonNulls(); + size_t remainingNonNulls = nonNulls.size() - nonNullsOffset_; + size_t nonNullsInChunk = std::min(maxChunkSize_, remainingNonNulls); + if (nonNullsInChunk == 0 || nonNullsInChunk < minChunkSize_ || + (ensureFullChunks_ && nonNullsInChunk < maxChunkSize_)) { + return std::nullopt; + } + + // Null stream values are converted to boolean data for encoding. + std::string_view dataChunk = { + reinterpret_cast(nonNulls.data() + nonNullsOffset_), + nonNullsInChunk}; + nonNullsOffset_ += nonNullsInChunk; + + return StreamDataView{streamData_.descriptor(), dataChunk}; + } + + private: + void compact() override { + // No changes made to stream data, nothing to compact. + if (nonNullsOffset_ == 0) { + return; + } + auto& nonNulls = streamData_.mutableNonNulls(); + const auto remainingNonNullsCount = nonNulls.size() - nonNullsOffset_; + const auto newCapacity = detail::getNewBufferCapacity( + maxChunkSize_, nonNulls.capacity(), remainingNonNullsCount); + + // Move and clear existing buffer + auto tempNonNulls = std::move(nonNulls); + const bool hasNulls = streamData_.hasNulls(); + streamData_.reset(); + NIMBLE_DASSERT( + streamData_.empty(), "StreamData should be empty after reset"); + + auto& mutableNonNulls = streamData_.mutableNonNulls(); + mutableNonNulls.reserve(newCapacity); + NIMBLE_DASSERT( + mutableNonNulls.capacity() >= newCapacity, + "NonNulls buffer capacity should be at least new capacity"); + + streamData_.ensureAdditionalNullsCapacity( + hasNulls, static_cast(remainingNonNullsCount)); + if (hasNulls) { + mutableNonNulls.resize(remainingNonNullsCount); + NIMBLE_DASSERT( + mutableNonNulls.size() == remainingNonNullsCount, + "NonNulls buffer size should be equal to remaining non-nulls count"); + + std::copy_n( + tempNonNulls.begin() + nonNullsOffset_, + remainingNonNullsCount, + mutableNonNulls.begin()); + } + nonNullsOffset_ = 0; + } + + NullsStreamData& streamData_; + uint64_t minChunkSize_; + uint64_t maxChunkSize_; + size_t nonNullsOffset_; + bool omitStream_; + bool ensureFullChunks_; +}; + +template +class NullableContentStreamChunker final : public StreamChunker { + public: + explicit NullableContentStreamChunker( + NullableContentStreamData& streamData, + const StreamChunkerOptions& options) + : streamData_{streamData}, + minChunkSize_{options.minChunkSize}, + maxChunkSize_{options.maxChunkSize}, + dataElementOffset_{0}, + nonNullsOffset_{0}, + extraMemory_{streamData_.extraMemory()}, + omitStream_{detail::shouldOmitDataStream( + streamData, + options.minChunkSize, + options.isFirstChunk)}, + ensureFullChunks_{options.ensureFullChunks} { + static_assert(sizeof(bool) == 1); + NIMBLE_DASSERT( + streamData.hasNulls(), + "ContentStreamChunker should be used when no nulls are present in stream."); + streamData.materialize(); + } + + std::optional next() override { + if (omitStream_) { + return std::nullopt; + } + const auto& chunkSize = nextChunkSize(); + if (chunkSize.rollingChunkSize == 0) { + return std::nullopt; + } + + // Chunk content + std::string_view dataChunk = { + reinterpret_cast( + streamData_.mutableData().data() + dataElementOffset_), + chunkSize.dataElementCount * sizeof(T)}; + + // Chunk nulls + std::span nonNullsChunk( + streamData_.mutableNonNulls().data() + nonNullsOffset_, + chunkSize.nullElementCount); + + dataElementOffset_ += chunkSize.dataElementCount; + nonNullsOffset_ += chunkSize.nullElementCount; + extraMemory_ -= chunkSize.extraMemory; + + if (chunkSize.nullElementCount > chunkSize.dataElementCount) { + return StreamDataView{streamData_.descriptor(), dataChunk, nonNullsChunk}; + } + return StreamDataView{streamData_.descriptor(), dataChunk}; + } + + private: + ChunkSize nextChunkSize() { + const auto& nonNulls = streamData_.mutableNonNulls(); + const auto bufferedCount = nonNulls.size(); + + ChunkSize chunkSize; + bool fullChunk = false; + // Calculate how many entries we can fit in the chunk + for (size_t nonNullIdx = nonNullsOffset_; nonNullIdx < bufferedCount; + ++nonNullIdx) { + uint32_t elementSize = sizeof(bool); // Always account for null indicator + uint32_t numDataElements = 0; + if (nonNulls[nonNullIdx]) { + elementSize += sizeof(T); // Add data size if non-null + ++numDataElements; + } + + if (chunkSize.rollingChunkSize + elementSize > maxChunkSize_) { + fullChunk = true; + break; + } + + chunkSize.dataElementCount += numDataElements; + ++chunkSize.nullElementCount; + chunkSize.rollingChunkSize += elementSize; + } + + fullChunk = fullChunk || (chunkSize.rollingChunkSize == maxChunkSize_); + if ((ensureFullChunks_ && !fullChunk) || + (chunkSize.rollingChunkSize < minChunkSize_)) { + return ChunkSize{}; + } + + return chunkSize; + } + + void compact() override { + // No changes made to stream data, nothing to compact. + if (nonNullsOffset_ == 0) { + return; + } + auto& currentNonNulls = streamData_.mutableNonNulls(); + auto& currentData = streamData_.mutableData(); + const auto remainingNonNullsCount = + currentNonNulls.size() - nonNullsOffset_; + const auto remainingDataCount = currentData.size() - dataElementOffset_; + + const auto newNonNullsCapacity = detail::getNewBufferCapacity( + maxChunkSize_, currentNonNulls.capacity(), remainingNonNullsCount); + const auto newDataCapactity = detail::getNewBufferCapacity( + maxChunkSize_, currentData.capacity(), remainingDataCount); + + // Move and clear existing buffers + auto tempNonNulls = std::move(currentNonNulls); + auto tempData = std::move(currentData); + const bool hasNulls = streamData_.hasNulls(); + streamData_.reset(); + NIMBLE_DASSERT( + streamData_.empty(), "StreamData should be empty after reset"); + + auto& mutableData = streamData_.mutableData(); + mutableData.reserve(newDataCapactity); + NIMBLE_DASSERT( + mutableData.capacity() >= newDataCapactity, + "Data buffer capacity should be at least new capacity"); + + mutableData.resize(remainingDataCount); + NIMBLE_DASSERT( + mutableData.size() == remainingDataCount, + "Data buffer size should be equal to remaining data count"); + + std::copy_n( + tempData.begin() + dataElementOffset_, + remainingDataCount, + mutableData.begin()); + + auto& mutableNonNulls = streamData_.mutableNonNulls(); + mutableNonNulls.reserve(newNonNullsCapacity); + NIMBLE_DASSERT( + mutableNonNulls.capacity() >= newNonNullsCapacity, + "NonNulls buffer capacity should be at least new capacity"); + + streamData_.ensureAdditionalNullsCapacity( + hasNulls, static_cast(remainingNonNullsCount)); + if (hasNulls) { + mutableNonNulls.resize(remainingNonNullsCount); + NIMBLE_DASSERT( + mutableNonNulls.size() == remainingNonNullsCount, + "NonNulls buffer size should be equal to remaining non-nulls count"); + + std::copy_n( + tempNonNulls.begin() + nonNullsOffset_, + remainingNonNullsCount, + mutableNonNulls.begin()); + } + streamData_.extraMemory() = extraMemory_; + dataElementOffset_ = 0; + nonNullsOffset_ = 0; + } + + NullableContentStreamData& streamData_; + uint64_t minChunkSize_; + uint64_t maxChunkSize_; + size_t dataElementOffset_; + size_t nonNullsOffset_; + size_t extraMemory_; + bool omitStream_; + bool ensureFullChunks_; +}; + +template <> +inline StreamChunker::ChunkSize +NullableContentStreamChunker::nextChunkSize() { + const auto& data = streamData_.mutableData(); + const auto& nonNulls = streamData_.mutableNonNulls(); + const auto bufferedCount = nonNulls.size(); + ChunkSize chunkSize; + bool fullChunk = false; + // Calculate how many entries we can fit in the chunk + for (size_t nonNullsIndex = nonNullsOffset_; nonNullsIndex < bufferedCount; + ++nonNullsIndex) { + uint64_t currentElementTotalSize = sizeof(bool); + size_t currentElementExtraMemory = 0; + uint32_t currentElementDataCount = 0; + if (nonNulls[nonNullsIndex]) { + const auto& currentString = + data[dataElementOffset_ + chunkSize.dataElementCount]; + currentElementTotalSize += + currentString.size() + sizeof(std::string_view); + currentElementExtraMemory = currentString.size(); + ++currentElementDataCount; + } + + if (chunkSize.rollingChunkSize + currentElementTotalSize > maxChunkSize_) { + fullChunk = true; + break; + } + + chunkSize.dataElementCount += currentElementDataCount; + ++chunkSize.nullElementCount; + chunkSize.rollingChunkSize += currentElementTotalSize; + chunkSize.extraMemory += currentElementExtraMemory; + } + + fullChunk = fullChunk || (chunkSize.rollingChunkSize == maxChunkSize_); + if ((ensureFullChunks_ && !fullChunk) || + (chunkSize.rollingChunkSize < minChunkSize_)) { + chunkSize = ChunkSize{}; + } + + return chunkSize; +} +} // namespace facebook::nimble diff --git a/dwio/nimble/velox/StreamData.cpp b/dwio/nimble/velox/StreamData.cpp index ac35d29..a1cd662 100644 --- a/dwio/nimble/velox/StreamData.cpp +++ b/dwio/nimble/velox/StreamData.cpp @@ -21,6 +21,10 @@ namespace facebook::nimble { void NullsStreamData::ensureAdditionalNullsCapacity( bool mayHaveNulls, uint64_t additionalSize) { + if (additionalSize == 0) { + return; + } + if (mayHaveNulls || hasNulls_) { const auto newSize = bufferedCount_ + additionalSize; ensureDataCapacity(nonNulls_, newSize); diff --git a/dwio/nimble/velox/StreamData.h b/dwio/nimble/velox/StreamData.h index 8e103d2..f4ba5ec 100644 --- a/dwio/nimble/velox/StreamData.h +++ b/dwio/nimble/velox/StreamData.h @@ -77,6 +77,62 @@ class MutableStreamData : public StreamData { const InputBufferGrowthPolicy& growthPolicy_; }; +// Provides a lightweight, non-owning view into a portion of stream data, +// containing references to the data content and null indicators for efficient +// processing of large streams without copying data. +class StreamDataView final : public StreamData { + public: + StreamDataView( + const StreamDescriptorBuilder& descriptor, + std::string_view data, + std::optional> nonNulls = std::nullopt) + : StreamData(descriptor), data_{data}, nonNulls_{nonNulls} {} + + StreamDataView(StreamDataView&& other) noexcept + : StreamData(other.descriptor()), + data_{other.data_}, + nonNulls_{other.nonNulls_} {} + + StreamDataView(const StreamDataView&) = delete; + + StreamDataView& operator=(const StreamDataView&) = delete; + + StreamDataView& operator=(StreamDataView&& other) noexcept = delete; + + ~StreamDataView() override = default; + + std::string_view data() const override { + return data_; + } + + std::span nonNulls() const override { + return nonNulls_.value_or(std::span{}); + } + + bool hasNulls() const override { + return nonNulls_.has_value(); + } + + uint64_t memoryUsed() const override { + NIMBLE_UNREACHABLE("StreamDataView is non-owning"); + } + + bool empty() const override { + if (!nonNulls_.has_value()) { + return data_.empty(); + } + return data_.empty() && nonNulls_->empty(); + } + + void reset() override { + NIMBLE_UNREACHABLE("StreamDataView is non-owning"); + } + + private: + const std::string_view data_; + const std::optional> nonNulls_; +}; + // Content only data stream. // Used when a stream doesn't contain nulls. template diff --git a/dwio/nimble/velox/VeloxWriter.cpp b/dwio/nimble/velox/VeloxWriter.cpp index a5ad2ea..3a24173 100644 --- a/dwio/nimble/velox/VeloxWriter.cpp +++ b/dwio/nimble/velox/VeloxWriter.cpp @@ -33,7 +33,6 @@ #include "dwio/nimble/velox/SchemaSerialization.h" #include "dwio/nimble/velox/SchemaTypes.h" #include "dwio/nimble/velox/StatsGenerated.h" -#include "folly/ScopeGuard.h" #include "velox/common/time/CpuWallTimer.h" #include "velox/dwio/common/ExecutorBarrier.h" #include "velox/type/Type.h" @@ -45,7 +44,6 @@ namespace detail { class WriterContext : public FieldWriterContext { public: const VeloxWriterOptions options; - std::unique_ptr flushPolicy; velox::CpuWallTiming totalFlushTiming; velox::CpuWallTiming stripeFlushTiming; velox::CpuWallTiming encodingSelectionTiming; @@ -56,9 +54,12 @@ class WriterContext : public FieldWriterContext { uint64_t memoryUsed{0}; uint64_t bytesWritten{0}; uint64_t rowsInFile{0}; - uint64_t rowsInStripe{0}; - uint64_t stripeSize{0}; - uint64_t rawSize{0}; + uint32_t rowsInStripe{0}; + // Physical size of the encoded stripe data. + uint64_t stripeEncodedPhysicalSize{0}; + // Logical size of the encoded stripe data. + uint64_t stripeEncodedLogicalSize{0}; + uint64_t fileRawSize{0}; std::vector rowsPerStripe; WriterContext( @@ -67,7 +68,6 @@ class WriterContext : public FieldWriterContext { : FieldWriterContext{memoryPool, options.reclaimerFactory(), options.vectorDecoderVisitor}, options{std::move(options)}, logger{this->options.metricsLogger} { - flushPolicy = this->options.flushPolicyFactory(); inputBufferGrowthPolicy = this->options.lowMemoryMode ? std::make_unique() : this->options.inputGrowthPolicyFactory(); @@ -82,7 +82,8 @@ class WriterContext : public FieldWriterContext { rowsPerStripe.push_back(rowsInStripe); memoryUsed = 0; rowsInStripe = 0; - stripeSize = 0; + stripeEncodedPhysicalSize = 0; + stripeEncodedLogicalSize = 0; ++stripeIndex_; } @@ -100,6 +101,45 @@ namespace { constexpr uint32_t kInitialSchemaSectionSize = 1 << 20; // 1MB +// When writing null streams, we write the nulls as data, and the stream itself +// is non-nullable. This adapter class is how we expose the nulls as values. +class NullsAsDataStreamData : public StreamData { + public: + explicit NullsAsDataStreamData(StreamData& streamData) + : StreamData(streamData.descriptor()), streamData_{streamData} { + streamData_.materialize(); + } + + inline virtual std::string_view data() const override { + return { + reinterpret_cast(streamData_.nonNulls().data()), + streamData_.nonNulls().size()}; + } + + inline virtual std::span nonNulls() const override { + return {}; + } + + inline virtual bool hasNulls() const override { + return false; + } + + inline virtual bool empty() const override { + return streamData_.empty(); + } + + inline virtual uint64_t memoryUsed() const override { + return streamData_.memoryUsed(); + } + + inline virtual void reset() override { + streamData_.reset(); + } + + private: + StreamData& streamData_; +}; + class WriterStreamContext : public StreamContext { public: bool isNullStream = false; @@ -133,7 +173,7 @@ std::string_view encode( std::unique_ptr> policy; if (encodingLayout.has_value()) { policy = std::make_unique>( - encodingLayout.value(), + std::move(encodingLayout.value()), context.options.compressionOptions, context.options.encodingSelectionPolicyFactory); @@ -168,7 +208,7 @@ std::string_view encodeStreamTyped( } try { - return encode(encodingLayout, context, buffer, streamData); + return encode(std::move(encodingLayout), context, buffer, streamData); } catch (const NimbleUserError& e) { if (e.errorCode() != error_code::IncompatibleEncoding || !encodingLayout.has_value()) { @@ -215,7 +255,8 @@ template void findNodeIds( const velox::dwio::common::TypeWithId& typeWithId, Set& output, - std::function predicate) { + const std::function& + predicate) { if (predicate(typeWithId)) { output.insert(typeWithId.id()); } @@ -517,7 +558,7 @@ bool VeloxWriter::write(const velox::VectorPtr& vector) { auto rawSize = nimble::getRawSizeFromVector( vector, velox::common::Ranges::of(0, size)); DWIO_ENSURE_GE(rawSize, 0, "Invalid raw size"); - context_->rawSize += rawSize; + context_->fileRawSize += rawSize; if (context_->options.writeExecutor) { velox::dwio::common::ExecutorBarrier barrier{ @@ -538,9 +579,16 @@ bool VeloxWriter::write(const velox::VectorPtr& vector) { context_->rowsInStripe += size; context_->bytesWritten = file_->size(); - return tryWriteStripe(); + return evalauateFlushPolicy(); + } catch (const std::exception& e) { + lastException_ = std::current_exception(); + context_->logger->logException(LogOperation::Write, e.what()); + throw; } catch (...) { lastException_ = std::current_exception(); + context_->logger->logException( + LogOperation::Write, + folly::to(folly::exceptionStr(std::current_exception()))); throw; } } @@ -552,9 +600,7 @@ void VeloxWriter::close() { if (file_) { try { - auto exitGuard = - folly::makeGuard([this]() { context_->flushPolicy->onClose(); }); - flush(); + writeStripe(); root_->close(); if (!context_->options.metadata.empty()) { @@ -584,7 +630,8 @@ void VeloxWriter::close() { *context_->schemaBuilder.getRoot(), context_->columnStats); // TODO(T228118622): Write column stats to file. flatbuffers::FlatBufferBuilder builder; - builder.Finish(serialization::CreateStats(builder, context_->rawSize)); + builder.Finish( + serialization::CreateStats(builder, context_->fileRawSize)); writer_.writeOptionalSection( std::string(kStatsSection), {reinterpret_cast(builder.GetBufferPointer()), @@ -614,13 +661,13 @@ void VeloxWriter::close() { file_ = nullptr; } catch (const std::exception& e) { lastException_ = std::current_exception(); - context_->logger->logException(LogOperation::FileClose, e.what()); + context_->logger->logException(LogOperation::Close, e.what()); file_ = nullptr; throw; } catch (...) { lastException_ = std::current_exception(); context_->logger->logException( - LogOperation::FileClose, + LogOperation::Close, folly::to( folly::exceptionStr(std::current_exception()))); file_ = nullptr; @@ -635,9 +682,16 @@ void VeloxWriter::flush() { } try { - tryWriteStripe(true); + writeStripe(); + } catch (const std::exception& e) { + lastException_ = std::current_exception(); + context_->logger->logException(LogOperation::Flush, e.what()); + throw; } catch (...) { lastException_ = std::current_exception(); + context_->logger->logException( + LogOperation::Flush, + folly::to(folly::exceptionStr(std::current_exception()))); throw; } } @@ -654,45 +708,6 @@ void VeloxWriter::writeChunk(bool lastChunk) { } streams_.resize(context_->schemaBuilder.nodeCount()); - // When writing null streams, we write the nulls as data, and the stream - // itself is non-nullable. This adapter class is how we expose the nulls as - // values. - class NullsAsDataStreamData : public StreamData { - public: - explicit NullsAsDataStreamData(StreamData& streamData) - : StreamData(streamData.descriptor()), streamData_{streamData} { - streamData_.materialize(); - } - - inline virtual std::string_view data() const override { - return { - reinterpret_cast(streamData_.nonNulls().data()), - streamData_.nonNulls().size()}; - } - - inline virtual std::span nonNulls() const override { - return {}; - } - - inline virtual bool hasNulls() const override { - return false; - } - - inline virtual bool empty() const override { - return streamData_.empty(); - } - inline virtual uint64_t memoryUsed() const override { - return streamData_.memoryUsed(); - } - - inline virtual void reset() override { - streamData_.reset(); - } - - private: - StreamData& streamData_; - }; - auto encode = [&](StreamData& streamData, uint64_t& streamSize) { const auto offset = streamData.descriptor().offset(); auto encoded = encodeStream(*context_, *encodingBuffer_, streamData); @@ -781,8 +796,108 @@ void VeloxWriter::writeChunk(bool lastChunk) { if (lastChunk) { root_->reset(); } + } + + // Consider getting this from flush timing. + auto flushWallTimeMs = + (context_->stripeFlushTiming.wallNanos - previousFlushWallTime) / + 1'000'000; + VLOG(1) << "writeChunk milliseconds: " << flushWallTimeMs + << ", chunk bytes: " << chunkSize; +} + +bool VeloxWriter::writeChunks( + std::span streamIndices, + bool lastChunk) { + uint64_t previousFlushWallTime = context_->stripeFlushTiming.wallNanos; + std::atomic chunkSize = 0; + std::atomic logicalSizeBeforeEncoding = 0; + std::atomic wroteChunk = false; + { + LoggingScope scope{*context_->logger}; + velox::CpuWallTimer veloxTimer{context_->stripeFlushTiming}; + + if (!encodingBuffer_) { + encodingBuffer_ = std::make_unique(*encodingMemoryPool_); + } + streams_.resize(context_->schemaBuilder.nodeCount()); + + auto processStream = [&](StreamData& streamData) { + // TODO: Breakdown large streams above a threshold into smaller chunks. + const auto minStreamSize = + lastChunk ? 0 : context_->options.minStreamChunkRawSize; + const auto* context = + streamData.descriptor().context(); + bool isNullStream = context && context->isNullStream; + bool shouldChunkStream = false; + if (isNullStream) { + // We apply the same null logic, where if all values + // are non-nulls, we omit the entire stream. + shouldChunkStream = streamData.hasNulls() && + streamData.nonNulls().size() > minStreamSize; + } else { + shouldChunkStream = streamData.data().size() > minStreamSize; + } + + // If we have previous written chunks for this stream, during final + // chunk, always write any remaining data. + const auto offset = streamData.descriptor().offset(); + NIMBLE_DASSERT(offset < streams_.size(), "Stream offset out of range."); + auto& stream = streams_[offset]; + if (lastChunk && !shouldChunkStream && !stream.content.empty()) { + shouldChunkStream = + !streamData.empty() || !streamData.nonNulls().empty(); + } + + if (shouldChunkStream) { + std::string_view encoded; + if (isNullStream) { + // For null streams we promote the null values to be written as + // boolean data. + encoded = encodeStream( + *context_, *encodingBuffer_, NullsAsDataStreamData(streamData)); + } else { + encoded = encodeStream(*context_, *encodingBuffer_, streamData); + } + + if (!encoded.empty()) { + auto& streamSize = context_->columnStats[offset].physicalSize; + ChunkedStreamWriter chunkWriter{*encodingBuffer_}; + for (auto& buffer : chunkWriter.encode(encoded)) { + streamSize += buffer.size(); + chunkSize += buffer.size(); + stream.content.push_back(std::move(buffer)); + } + } + wroteChunk = true; + logicalSizeBeforeEncoding += streamData.memoryUsed(); + streamData.reset(); + } + }; + + const auto& streams = context_->streams(); + if (context_->options.encodingExecutor) { + velox::dwio::common::ExecutorBarrier barrier{ + context_->options.encodingExecutor}; + for (auto streamIndex : streamIndices) { + auto& streamData = streams[streamIndex]; + barrier.add([&] { processStream(*streamData); }); + } + barrier.waitAll(); + } else { + for (auto streamIndex : streamIndices) { + auto& streamData = streams[streamIndex]; + processStream(*streamData); + } + } + + if (lastChunk) { + root_->reset(); + } - context_->stripeSize += chunkSize; + context_->stripeEncodedPhysicalSize += chunkSize; + context_->stripeEncodedLogicalSize += logicalSizeBeforeEncoding; + context_->memoryUsed -= logicalSizeBeforeEncoding; } // Consider getting this from flush timing. @@ -791,10 +906,22 @@ void VeloxWriter::writeChunk(bool lastChunk) { 1'000'000; VLOG(1) << "writeChunk milliseconds: " << flushWallTimeMs << ", chunk bytes: " << chunkSize; + return wroteChunk; } -uint32_t VeloxWriter::writeStripe() { - writeChunk(true); +bool VeloxWriter::writeStripe() { + if (context_->rowsInStripe == 0) { + return false; + } + + if (context_->options.enableChunking) { + // Chunk all streams. + std::vector streamIndices(context_->streams().size()); + std::iota(streamIndices.begin(), streamIndices.end(), 0); + writeChunks(streamIndices, true); + } else { + writeChunk(true); + } uint64_t previousFlushWallTime = context_->stripeFlushTiming.wallNanos; uint64_t stripeSize = 0; @@ -836,67 +963,79 @@ uint32_t VeloxWriter::writeStripe() { VLOG(1) << "writeStripe milliseconds: " << flushWallTimeMs << ", on disk stripe bytes: " << stripeSize; - return static_cast(stripeSize); + StripeFlushMetrics metrics{ + .inputSize = context_->stripeEncodedPhysicalSize, + .rowCount = context_->rowsInStripe, + .stripeSize = stripeSize, + .trackedMemory = context_->memoryUsed, + }; + context_->logger->logStripeFlush(metrics); + + context_->nextStripe(); + return true; } -bool VeloxWriter::tryWriteStripe(bool force) { - if (context_->rowsInStripe == 0) { - return false; - } +bool VeloxWriter::evalauateFlushPolicy() { + auto flushPolicy = context_->options.flushPolicyFactory(); + NIMBLE_DASSERT(flushPolicy != nullptr, "Flush policy must not be null"); auto shouldFlush = [&]() { - return context_->flushPolicy->shouldFlush( + return flushPolicy->shouldFlush( StripeProgress{ - .rawStripeSize = context_->memoryUsed, - .stripeSize = context_->stripeSize, - .bufferSize = - static_cast(context_->bufferMemoryPool->usedBytes()), - }); + .stripeRawSize = context_->memoryUsed, + .stripeEncodedSize = context_->stripeEncodedPhysicalSize, + .stripeEncodedLogicalSize = context_->stripeEncodedLogicalSize}); }; - auto decision = force ? FlushDecision::Stripe : shouldFlush(); - if (decision == FlushDecision::None) { - return false; - } + auto shouldChunk = [&]() { + return flushPolicy->shouldChunk( + StripeProgress{ + .stripeRawSize = context_->memoryUsed, + .stripeEncodedSize = context_->stripeEncodedPhysicalSize, + .stripeEncodedLogicalSize = context_->stripeEncodedLogicalSize, + }); + }; - try { - // TODO: we can improve merge the last chunk write with stripe - if (decision == FlushDecision::Chunk && context_->options.enableChunking) { - writeChunk(false); - decision = shouldFlush(); - } + if (context_->options.enableChunking && shouldChunk()) { + const auto& streams = context_->streams(); + const size_t streamCount = streams.size(); + // Sort streams for chunking based on raw memory usage. + // TODO(T240072104): Improve performance by bucketing the streams by size + // (most significant bit) instead of sorting. + std::vector streamIndices(streamCount); + std::iota(streamIndices.begin(), streamIndices.end(), 0); + std::sort( + streamIndices.begin(), + streamIndices.end(), + [&](const uint32_t& a, const uint32_t& b) { + return streams[a]->memoryUsed() > streams[b]->memoryUsed(); + }); - if (decision != FlushDecision::Stripe) { - return false; + // Chunk streams in batches. + const auto batchSize = context_->options.chunkedStreamBatchSize; + for (size_t index = 0; index < streamCount; index += batchSize) { + const size_t currentBatchSize = std::min(batchSize, streamCount - index); + std::span batchIndices( + streamIndices.begin() + index, currentBatchSize); + // Stop attempting chunking once streams are too small to chunk or + // memory pressure is relieved. + if (!(writeChunks(batchIndices, false) && shouldChunk())) { + break; + } } + } - StripeFlushMetrics metrics{ - .inputSize = context_->stripeSize, - .rowCount = context_->rowsInStripe, - .trackedMemory = context_->memoryUsed, - }; - - metrics.stripeSize = writeStripe(); - context_->logger->logStripeFlush(metrics); - - context_->nextStripe(); - return true; - } catch (const std::exception& e) { - context_->logger->logException(LogOperation::StripeFlush, e.what()); - throw; - } catch (...) { - context_->logger->logException( - LogOperation::StripeFlush, - folly::to(folly::exceptionStr(std::current_exception()))); - throw; + if (shouldFlush()) { + return writeStripe(); } + return false; } VeloxWriter::RunStats VeloxWriter::getRunStats() const { return RunStats{ .bytesWritten = context_->bytesWritten, .stripeCount = folly::to(context_->getStripeIndex()), - .rawSize = context_->rawSize, + .rawSize = context_->fileRawSize, .rowsPerStripe = context_->rowsPerStripe, .flushCpuTimeUsec = context_->totalFlushTiming.cpuNanos / 1000, .flushWallTimeUsec = context_->totalFlushTiming.wallNanos / 1000, diff --git a/dwio/nimble/velox/VeloxWriter.h b/dwio/nimble/velox/VeloxWriter.h index 2d90fa0..aa49f52 100644 --- a/dwio/nimble/velox/VeloxWriter.h +++ b/dwio/nimble/velox/VeloxWriter.h @@ -84,10 +84,15 @@ class VeloxWriter { std::exception_ptr lastException_; const velox::common::SpillConfig* const spillConfig_; + // Returning 'true' if stripe was flushed. + bool evalauateFlushPolicy(); // Returning 'true' if stripe was written. - bool tryWriteStripe(bool force = false); + bool writeStripe(); void writeChunk(bool lastChunk = true); - uint32_t writeStripe(); + // Returns 'true' if chunks were written. + bool writeChunks( + std::span streamIndices, + bool lastChunk = true); }; } // namespace facebook::nimble diff --git a/dwio/nimble/velox/VeloxWriterOptions.h b/dwio/nimble/velox/VeloxWriterOptions.h index 4fbc842..f4fbbc0 100644 --- a/dwio/nimble/velox/VeloxWriterOptions.h +++ b/dwio/nimble/velox/VeloxWriterOptions.h @@ -96,6 +96,10 @@ struct VeloxWriterOptions { // Note: this threshold is ignored when it is time to flush a stripe. uint64_t minStreamChunkRawSize = 1024; + // Number of streams to try chunking between memory pressure evaluations. + // Note: this is ignored when it is time to flush a stripe. + size_t chunkedStreamBatchSize = 1024; + // The factory function that produces the root encoding selection policy. // Encoding selection policy is the way to balance the tradeoffs of // different performance factors (at both read and write times). Heuristics @@ -109,7 +113,7 @@ struct VeloxWriterOptions { // Provides policy that controls stripe sizes and memory footprint. std::function()> flushPolicyFactory = []() { // Buffering 256MB data before encoding stripes. - return std::make_unique(256 << 20); + return std::make_unique(256 << 20); }; // When the writer needs to buffer data, and internal buffers don't have diff --git a/dwio/nimble/velox/selective/tests/E2EFilterTest.cpp b/dwio/nimble/velox/selective/tests/E2EFilterTest.cpp index 11adf53..164b6ff 100644 --- a/dwio/nimble/velox/selective/tests/E2EFilterTest.cpp +++ b/dwio/nimble/velox/selective/tests/E2EFilterTest.cpp @@ -128,14 +128,12 @@ class E2EFilterTest : public dwio::common::E2EFilterTestBase { writeSchema_ = rowType_; VeloxWriterOptions options; options.enableChunking = true; - options.flushPolicyFactory = [] { + auto i = 0; + options.flushPolicyFactory = [&] { return std::make_unique( - [i = 0](const StripeProgress&) mutable { - if (i++ % 3 == 2) { - return FlushDecision::Stripe; - } - return FlushDecision::Chunk; - }); + /*flushLambda=*/[&](const StripeProgress&) { return (i++ % 3 == 2); }, + /*chunkLambda=*/ + [&](const StripeProgress&) { return (i++ % 3 == 2); }); }; if (!flatMapColumns_.empty()) { setUpFlatMapColumns(); diff --git a/dwio/nimble/velox/selective/tests/SelectiveNimbleReaderTest.cpp b/dwio/nimble/velox/selective/tests/SelectiveNimbleReaderTest.cpp index d88d461..527d5b6 100644 --- a/dwio/nimble/velox/selective/tests/SelectiveNimbleReaderTest.cpp +++ b/dwio/nimble/velox/selective/tests/SelectiveNimbleReaderTest.cpp @@ -566,7 +566,8 @@ TEST_F(SelectiveNimbleReaderTest, multiChunkNulls) { options.minStreamChunkRawSize = 0; options.flushPolicyFactory = [] { return std::make_unique( - [](const StripeProgress&) { return FlushDecision::Chunk; }); + /*flushLambda=*/[](const StripeProgress&) { return false; }, + /*chunkLambda=*/[](const StripeProgress&) { return true; }); }; auto file = test::createNimbleFile( *rootPool(), {chunk1, chunk2, chunk3}, options, false); @@ -612,7 +613,8 @@ TEST_F(SelectiveNimbleReaderTest, multiChunkInt16RowSetOverBoundary) { options.minStreamChunkRawSize = 0; options.flushPolicyFactory = [] { return std::make_unique( - [](const StripeProgress&) { return FlushDecision::Chunk; }); + /*flushLambda=*/[](const StripeProgress&) { return false; }, + /*chunkLambda=*/[](const StripeProgress&) { return true; }); }; auto file = test::createNimbleFile(*rootPool(), {chunk1, chunk2}, options, false); diff --git a/dwio/nimble/velox/tests/FlushPolicyTests.cpp b/dwio/nimble/velox/tests/FlushPolicyTests.cpp new file mode 100644 index 0000000..792d5f9 --- /dev/null +++ b/dwio/nimble/velox/tests/FlushPolicyTests.cpp @@ -0,0 +1,232 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include + +#include "dwio/nimble/velox/FlushPolicy.h" + +namespace facebook::nimble { + +class ChunkFlushPolicyTest : public ::testing::Test {}; + +TEST_F(ChunkFlushPolicyTest, InitialNoMemoryPressure) { + ChunkFlushPolicy policy( + ChunkFlushPolicyConfig{ + .writerMemoryHighThresholdBytes = 1000, + .writerMemoryLowThresholdBytes = 500, + .targetStripeSizeBytes = 800, + .estimatedCompressionFactor = 1.5}); + + StripeProgress progress{ + .stripeRawSize = 100, + .stripeEncodedSize = 50, + .stripeEncodedLogicalSize = 80}; + + // Total Memory: 100 + 50 = 150 < 1000 (writerMemoryHighThresholdBytes) + EXPECT_FALSE(policy.shouldChunk(progress)); + + // shouldFlush checks two conditions: + // 1. Memory pressure check: 100 + 50 = 150 < 1000 (passes, no flush) + // 2. Stripe size check (only if condition 1 passes): + // compressionFactor = stripeEncodedLogicalSize / stripeEncodedSize + // = 80 / 50 = 1.6 + // expectedEncodedStripeSize = stripeEncodedSize + stripeRawSize / + // max(compressionFactor, 1.0) + // = 50 + 100 / max(1.6, 1.0) + // = 50 + 100 / 1.6 + // = 50 + 62.5 = 112.5 < 800 + // (targetStripeSizeBytes) + EXPECT_FALSE(policy.shouldFlush(progress)); + ; +} + +TEST_F(ChunkFlushPolicyTest, MemoryPressureChunkingLifecycle) { + ChunkFlushPolicy policy( + ChunkFlushPolicyConfig{ + .writerMemoryHighThresholdBytes = 1000, + .writerMemoryLowThresholdBytes = 500, + .targetStripeSizeBytes = 800, + .estimatedCompressionFactor = 1.5}); + + // Start chunking when memory exceeds high threshold + StripeProgress progress1{ + .stripeRawSize = 600, + .stripeEncodedSize = 500, + .stripeEncodedLogicalSize = 400}; + // Total Memory: 600 + 500 = 1100 > 1000 (writerMemoryHighThresholdBytes) + EXPECT_TRUE(policy.shouldChunk(progress1)); + + // Continue chunking with decreasing raw size but still above low threshold + // Raw size decreased (600 → 550) and memory still above low threshold + StripeProgress progress2{ + .stripeRawSize = 550, + .stripeEncodedSize = 520, + .stripeEncodedLogicalSize = 400}; + // Total Memory: 550 + 520 = 1070 > 500 (writerMemoryLowThresholdBytes) + EXPECT_TRUE(policy.shouldChunk(progress2)); + + // Continue chunking as raw size decreases further + StripeProgress progress3{ + .stripeRawSize = 300, + .stripeEncodedSize = 300, + .stripeEncodedLogicalSize = 400}; + // Total memory: 300 + 300 = 600 > 500 + EXPECT_TRUE(policy.shouldChunk(progress3)); + + // Memory drops below low threshold - stop chunking + StripeProgress progress4{ + .stripeRawSize = 200, + .stripeEncodedSize = 200, + .stripeEncodedLogicalSize = 400}; + // Total Memory: 200 + 200 = 400 < 500 (writerMemoryLowThresholdBytes) + EXPECT_FALSE(policy.shouldChunk(progress4)); +} + +TEST_F(ChunkFlushPolicyTest, MemoryPressureTriggersFlush) { + ChunkFlushPolicy policy( + ChunkFlushPolicyConfig{ + .writerMemoryHighThresholdBytes = 1000, + .writerMemoryLowThresholdBytes = 500, + .targetStripeSizeBytes = 800, + .estimatedCompressionFactor = 1.5}); + + StripeProgress progress{ + .stripeRawSize = 600, + .stripeEncodedSize = 500, + .stripeEncodedLogicalSize = 400}; + // Total Memory: 600 + 500 = 1100 > 1000 (writerMemoryHighThresholdBytes) + EXPECT_TRUE(policy.shouldChunk(progress)); + + // Total Memory: 600 + 500 = 1100 > 1000 (writerMemoryHighThresholdBytes) + EXPECT_TRUE(policy.shouldFlush(progress)); +} + +TEST_F(ChunkFlushPolicyTest, StripeSizeTriggersFlush) { + ChunkFlushPolicy policy( + ChunkFlushPolicyConfig{ + .writerMemoryHighThresholdBytes = 2000, + .writerMemoryLowThresholdBytes = 500, + .targetStripeSizeBytes = 800, + .estimatedCompressionFactor = 1.5}); + + StripeProgress progress{ + .stripeRawSize = 1000, + .stripeEncodedSize = 400, + .stripeEncodedLogicalSize = 600}; + + // Expected Encoded Memory calculation: + // compressionFactor = stripeEncodedLogicalSize / stripeEncodedSize + // = 600 / 400 = 1.5 + // expectedEncodedStripeSize = stripeEncodedSize + stripeRawSize / + // max(compressionFactor, 1.0) + // = 400 + 1000 / max(1.5, 1.0) + // = 400 + 1000 / 1.5 + // = 400 + 666.67 = 1066.67 > 800 + EXPECT_TRUE(policy.shouldFlush(progress)); + ; +} + +TEST_F(ChunkFlushPolicyTest, ZeroEncodedSize) { + ChunkFlushPolicy policy( + ChunkFlushPolicyConfig{ + .writerMemoryHighThresholdBytes = 1000, + .writerMemoryLowThresholdBytes = 500, + .targetStripeSizeBytes = 800, + .estimatedCompressionFactor = 2.0}); + + StripeProgress progress{ + .stripeRawSize = 1200, + .stripeEncodedSize = 0, + .stripeEncodedLogicalSize = 0}; + + // Memory pressure check: + // tripeRawSize + stripeEncodedSize = 1200 + 0 = 1200 > 1000 + // This triggers chunking due to memory pressure + EXPECT_TRUE(policy.shouldChunk(progress)); + ; + + // Flush check: Memory pressure (1200 > 1000) triggers stripe flush + EXPECT_TRUE(policy.shouldFlush(progress)); + ; +} + +TEST(FlushPolicyTest, StripeRawSizeFlushPolicyTest) { + StripeRawSizeFlushPolicy policy(/*stripeRawSize=*/1000); + + StripeProgress progress{ + .stripeRawSize = 1200, // Exceeds threshold + .stripeEncodedSize = 600, + .stripeEncodedLogicalSize = 800}; + + EXPECT_TRUE(policy.shouldFlush(progress)); + ; + + StripeProgress progress2{ + .stripeRawSize = 800, // Below threshold + .stripeEncodedSize = 600, + .stripeEncodedLogicalSize = 700}; + + EXPECT_FALSE(policy.shouldFlush(progress2)); +} + +TEST(FlushPolicyTest, LambdaFlushPolicyTest) { + LambdaFlushPolicy policy( + /*flushLambda_=*/ + [](const StripeProgress& progress) { + return progress.stripeRawSize > 1000; + }, + /*chunkLambda_=*/ + [](const StripeProgress& progress) { + return progress.stripeEncodedSize > 500; + }); + + // Test None case + StripeProgress noneProgress{ + .stripeRawSize = 300, + .stripeEncodedSize = 200, + .stripeEncodedLogicalSize = 250}; + EXPECT_FALSE(policy.shouldFlush(noneProgress)); + EXPECT_FALSE(policy.shouldChunk(noneProgress)); + + // Test Chunk case + StripeProgress chunkProgress{ + .stripeRawSize = 500, + .stripeEncodedSize = 600, + .stripeEncodedLogicalSize = 450}; + EXPECT_TRUE(policy.shouldChunk(chunkProgress)); + + // Test Stripe case + StripeProgress stripeProgress{ + .stripeRawSize = 1200, + .stripeEncodedSize = 400, + .stripeEncodedLogicalSize = 800}; + EXPECT_TRUE(policy.shouldFlush(stripeProgress)); +} + +TEST(FlushPolicyTest, LambdaFlushPolicyDefaultLambdas) { + LambdaFlushPolicy policy; + + StripeProgress progress{ + .stripeRawSize = 1200, + .stripeEncodedSize = 600, + .stripeEncodedLogicalSize = 800}; + + EXPECT_FALSE(policy.shouldFlush(progress)); + ; + EXPECT_FALSE(policy.shouldChunk(progress)); +} + +} // namespace facebook::nimble diff --git a/dwio/nimble/velox/tests/StreamChunkerTests.cpp b/dwio/nimble/velox/tests/StreamChunkerTests.cpp new file mode 100644 index 0000000..1ed29b8 --- /dev/null +++ b/dwio/nimble/velox/tests/StreamChunkerTests.cpp @@ -0,0 +1,988 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include + +#include "dwio/nimble/velox/SchemaBuilder.h" +#include "dwio/nimble/velox/StreamChunker.h" +#include "dwio/nimble/velox/StreamData.h" +#include "velox/common/memory/Memory.h" +#include "velox/common/memory/MemoryArbitrator.h" +#include "velox/common/memory/SharedArbitrator.h" + +namespace facebook::nimble { +template +void populateData(Vector& vec, const std::vector& data) { + vec.reserve(data.size()); + for (const auto& item : data) { + vec.push_back(item); + } +} + +template +std::vector toVector(std::string_view data) { + const T* chunkData = reinterpret_cast(data.data()); + return std::vector(chunkData, chunkData + data.size() / sizeof(T)); +} + +// Expected chunk result structure +template +struct ExpectedChunk { + std::vector chunkData; + std::vector chunkNonNulls = {}; + bool hasNulls = false; + size_t extraMemory = 0; +}; + +class StreamChunkerTestsBase : public ::testing::Test { + protected: + // Helper method to validate chunk results against expected data + template + void validateChunk( + const StreamData& stream, + std::unique_ptr chunker, + const std::vector>& expectedChunks, + const ExpectedChunk& expectedRetainedData) { + // Compaction before chunking should have no effect. + if (folly::Random::rand32() % 2 == 0) { + chunker->compact(); + } + + int chunkIndex = 0; + while (const auto chunkView = chunker->next()) { + ASSERT_LT(chunkIndex, expectedChunks.size()); + ASSERT_TRUE(chunkView.has_value()); + const auto chunkData = toVector(chunkView->data()); + const auto& expectedChunk = expectedChunks[chunkIndex]; + const auto& expectedChunkData = expectedChunk.chunkData; + const auto& expectedChunkNonNulls = expectedChunk.chunkNonNulls; + EXPECT_THAT(chunkData, ::testing::ElementsAreArray(expectedChunkData)); + EXPECT_THAT( + chunkView->nonNulls(), + ::testing::ElementsAreArray(expectedChunkNonNulls)); + EXPECT_EQ(chunkView->hasNulls(), expectedChunk.hasNulls); + ++chunkIndex; + } + ASSERT_EQ(chunkIndex, expectedChunks.size()); + + // Erases processed data to reclaim memory. + chunker->compact(); + + // Validate buffer is properly compacted to only retain expected data + EXPECT_THAT( + toVector(stream.data()), + ::testing::ElementsAreArray(expectedRetainedData.chunkData)); + EXPECT_THAT( + stream.nonNulls(), + ::testing::ElementsAreArray(expectedRetainedData.chunkNonNulls)); + EXPECT_EQ(stream.hasNulls(), expectedRetainedData.hasNulls); + const uint64_t expectedRetainedMemoryUsed = + (sizeof(T) * expectedRetainedData.chunkData.size()) + + expectedRetainedData.chunkNonNulls.size() + + expectedRetainedData.extraMemory; + EXPECT_EQ(stream.memoryUsed(), expectedRetainedMemoryUsed); + } + + static void SetUpTestCase() { + velox::memory::SharedArbitrator::registerFactory(); + velox::memory::MemoryManager::Options options; + options.arbitratorKind = "SHARED"; + velox::memory::MemoryManager::testingSetInstance(options); + } + + void SetUp() override { + rootPool_ = velox::memory::memoryManager()->addRootPool("default_root"); + leafPool_ = rootPool_->addLeafChild("default_leaf"); + schemaBuilder_ = std::make_unique(); + inputBufferGrowthPolicy_ = + DefaultInputBufferGrowthPolicy::withDefaultRanges(); + } + + std::shared_ptr rootPool_; + std::shared_ptr leafPool_; + std::unique_ptr schemaBuilder_; + std::unique_ptr inputBufferGrowthPolicy_; +}; + +TEST_F(StreamChunkerTestsBase, getNewBufferCapacityTest) { + // currentCapacityCount < maxChunkElementCount + uint64_t maxChunkElementCount = 8; + uint64_t currentCapacityCount = 4; + const uint64_t requiredCapacityCount = 2; + EXPECT_EQ( + detail::getNewBufferCapacity( + /*maxChunkSize=*/maxChunkElementCount * sizeof(int32_t), + /*currentCapacityCount=*/currentCapacityCount, + /*requiredCapacityCount=*/requiredCapacityCount), + currentCapacityCount); + + currentCapacityCount = 40; + // currentCapacityCount > maxChunkElementCount = 8 + (40 - 8) * 0.5 = 24 + EXPECT_EQ( + detail::getNewBufferCapacity( + /*maxChunkSize=*/maxChunkElementCount * sizeof(int32_t), + /*currentCapacityCount=*/currentCapacityCount, + /*requiredCapacityCount=*/requiredCapacityCount), + 24); +} + +TEST_F(StreamChunkerTestsBase, getStreamChunkerTest) { + // Ensure a chunker can be created for all types. +#define TEST_STREAM_CHUNKER_FOR_TYPE(scalarKind, T) \ + { \ + const auto scalarTypeBuilder = \ + schemaBuilder_->createScalarTypeBuilder(scalarKind); \ + auto descriptor = &scalarTypeBuilder->scalarDescriptor(); \ + ContentStreamData stream( \ + *leafPool_, *descriptor, *inputBufferGrowthPolicy_); \ + getStreamChunker(stream, {}); \ + } + + EXPECT_NO_THROW( + TEST_STREAM_CHUNKER_FOR_TYPE(facebook::nimble::ScalarKind::Bool, bool)); + EXPECT_NO_THROW( + TEST_STREAM_CHUNKER_FOR_TYPE(facebook::nimble::ScalarKind::Int8, int8_t)); + EXPECT_NO_THROW(TEST_STREAM_CHUNKER_FOR_TYPE( + facebook::nimble::ScalarKind::Int16, int16_t)); + EXPECT_NO_THROW(TEST_STREAM_CHUNKER_FOR_TYPE( + facebook::nimble::ScalarKind::Int32, int32_t)); + EXPECT_NO_THROW(TEST_STREAM_CHUNKER_FOR_TYPE( + facebook::nimble::ScalarKind::UInt32, uint32_t)); + EXPECT_NO_THROW(TEST_STREAM_CHUNKER_FOR_TYPE( + facebook::nimble::ScalarKind::Int64, int64_t)); + EXPECT_NO_THROW( + TEST_STREAM_CHUNKER_FOR_TYPE(facebook::nimble::ScalarKind::Float, float)); + EXPECT_NO_THROW(TEST_STREAM_CHUNKER_FOR_TYPE( + facebook::nimble::ScalarKind::Double, double)); + EXPECT_NO_THROW(TEST_STREAM_CHUNKER_FOR_TYPE( + facebook::nimble::ScalarKind::String, std::string_view)); + EXPECT_NO_THROW(TEST_STREAM_CHUNKER_FOR_TYPE( + facebook::nimble::ScalarKind::Binary, std::string_view)); + EXPECT_THROW( + TEST_STREAM_CHUNKER_FOR_TYPE( + facebook::nimble::ScalarKind::UInt64, uint64_t), + facebook::nimble::NimbleInternalError); +#undef TEST_STREAM_CHUNKER_FOR_TYPE +} + +TEST_F(StreamChunkerTestsBase, shouldOmitNullStreamTest) { + const auto scalarTypeBuilder = + schemaBuilder_->createScalarTypeBuilder(ScalarKind::Bool); + const auto descriptor = &scalarTypeBuilder->scalarDescriptor(); + + // No Omit Case: Has nulls and non nulls size greater than minChunkSize. + { + NullsStreamData nullsStream( + *leafPool_, *descriptor, *inputBufferGrowthPolicy_); + std::vector nonNullsTestData = {true, false}; + nullsStream.ensureAdditionalNullsCapacity( + /*mayHaveNulls=*/true, static_cast(nonNullsTestData.size())); + auto& nonNulls = nullsStream.mutableNonNulls(); + populateData(nonNulls, nonNullsTestData); + EXPECT_FALSE( + detail::shouldOmitNullStream( + nullsStream, + /*minChunkSize=*/1, + /*isFirstChunk=*/false)); + } + + // No nulls. + { + NullsStreamData nullsStream( + *leafPool_, *descriptor, *inputBufferGrowthPolicy_); + std::vector nonNullsTestData = {true, false}; + nullsStream.ensureAdditionalNullsCapacity( + /*mayHaveNulls=*/false, static_cast(nonNullsTestData.size())); + + // Omit stream when first chunk and empty. + EXPECT_TRUE( + detail::shouldOmitNullStream( + nullsStream, + /*minChunkSize=*/1, + /*isFirstChunk=*/true)); + + auto& nonNulls = nullsStream.mutableNonNulls(); + populateData(nonNulls, nonNullsTestData); + // No omit stream when not first chunk and not empty. + EXPECT_FALSE( + detail::shouldOmitNullStream( + nullsStream, + /*minChunkSize=*/1, + /*isFirstChunk=*/false)); + } + + // Has nulls. + { + NullsStreamData nullsStream( + *leafPool_, *descriptor, *inputBufferGrowthPolicy_); + std::vector nonNullsTestData = {true, false}; + nullsStream.ensureAdditionalNullsCapacity( + /*mayHaveNulls=*/true, static_cast(nonNullsTestData.size())); + auto& nonNulls = nullsStream.mutableNonNulls(); + populateData(nonNulls, nonNullsTestData); + + // No omit stream when size above minChunkSize. + EXPECT_FALSE( + detail::shouldOmitNullStream( + nullsStream, + /*minChunkSize=*/1, + /*isFirstChunk=*/true)); + + // Omit stream when size below minChunkSize. + EXPECT_TRUE( + detail::shouldOmitNullStream( + nullsStream, + /*minChunkSize=*/10, + /*isFirstChunk=*/true)); + } +} + +TEST_F(StreamChunkerTestsBase, shouldOmitDataStreamTest) { + auto scalarTypeBuilder = + schemaBuilder_->createScalarTypeBuilder(ScalarKind::Bool); + auto descriptor = &scalarTypeBuilder->scalarDescriptor(); + NullsStreamData nullsStream( + *leafPool_, *descriptor, *inputBufferGrowthPolicy_); + + // Setup test data with some nulls + std::vector nonNullsTestData = { + true, false, true, true, false, true, false, true, false, false}; + nullsStream.ensureAdditionalNullsCapacity( + /*mayHaveNulls=*/true, static_cast(nonNullsTestData.size())); + auto& nonNulls = nullsStream.mutableNonNulls(); + populateData(nonNulls, nonNullsTestData); + + scalarTypeBuilder = + schemaBuilder_->createScalarTypeBuilder(ScalarKind::Int32); + descriptor = &scalarTypeBuilder->scalarDescriptor(); + NullableContentStreamData nullableContentStream( + *leafPool_, *descriptor, *inputBufferGrowthPolicy_); + std::vector testData = {1, 2, 3, 4, 5}; + // Populate data using ensureAdditionalNullsCapacity and manual data insertion + nullableContentStream.ensureAdditionalNullsCapacity( + /*mayHaveNulls=*/true, static_cast(nonNullsTestData.size())); + populateData(nullableContentStream.mutableData(), testData); + populateData(nullableContentStream.mutableNonNulls(), nonNullsTestData); + + // Nullable Content Stream Basic Test + { + // Non-empty stream with size above minChunkSize + EXPECT_FALSE( + detail::shouldOmitDataStream( + nullableContentStream, + /*minChunkSize=*/4, + /*isFirstChunk=*/false)); + + // Non-empty stream with size below minChunkSize + EXPECT_TRUE( + detail::shouldOmitDataStream( + nullableContentStream, + /*minChunkSize=*/100, + /*isFirstChunk=*/true)); + + // Non-empty stream with size below minChunkSize + // and non-empty written stream content + EXPECT_FALSE( + detail::shouldOmitDataStream( + nullableContentStream, + /*minChunkSize=*/100, + /*isFirstChunk=*/false)); + } +} + +TEST_F(StreamChunkerTestsBase, ensureShouldOmitStreamIsRespectedTest) { + auto scalarTypeBuilder = + schemaBuilder_->createScalarTypeBuilder(ScalarKind::Bool); + auto descriptor = &scalarTypeBuilder->scalarDescriptor(); + NullsStreamData nullsStream( + *leafPool_, *descriptor, *inputBufferGrowthPolicy_); + + // Setup test data with some nulls + std::vector nonNullsTestData = { + true, false, true, true, false, true, false, true, false, false}; + nullsStream.ensureAdditionalNullsCapacity( + /*mayHaveNulls=*/true, static_cast(nonNullsTestData.size())); + auto& nonNulls = nullsStream.mutableNonNulls(); + populateData(nonNulls, nonNullsTestData); + // NullsStreamChunker respects omitStream + { + // Test case where omitStream returns true - chunker should return nullopt + auto chunker = getStreamChunker( + nullsStream, + { + .minChunkSize = 11, // Set high minChunkSize to trigger omitStream + .maxChunkSize = 4, + .ensureFullChunks = false, + .isFirstChunk = true, + }); + + // Validate that the correct chunker type was created + ASSERT_NE(dynamic_cast(chunker.get()), nullptr); + + // Should return nullopt because omitStream should return true + auto result = chunker->next(); + EXPECT_FALSE(result.has_value()); + + // Test case where omitStream returns false + auto chunker2 = getStreamChunker( + nullsStream, + { + .minChunkSize = 4, // Set reasonable minChunkSize + .maxChunkSize = 4, + .ensureFullChunks = false, + }); + + // Should return valid chunks because omitStream should return false + auto result2 = chunker2->next(); + EXPECT_TRUE(result2.has_value()); + } + + scalarTypeBuilder = + schemaBuilder_->createScalarTypeBuilder(ScalarKind::Int32); + descriptor = &scalarTypeBuilder->scalarDescriptor(); + NullableContentStreamData nullableContentStream( + *leafPool_, *descriptor, *inputBufferGrowthPolicy_); + std::vector testData = {1, 2, 3, 4, 5}; + // Populate data using ensureAdditionalNullsCapacity and manual data + // insertion + nullableContentStream.ensureAdditionalNullsCapacity( + /*mayHaveNulls=*/true, static_cast(nonNullsTestData.size())); + populateData(nullableContentStream.mutableData(), testData); + populateData(nullableContentStream.mutableNonNulls(), nonNullsTestData); + // NullableContentStreamChunker respects omitStream + { + // Test case where omitStream returns true - chunker should return nullopt + auto chunker = getStreamChunker( + nullableContentStream, + { + .minChunkSize = 100, // Set high minChunkSize to trigger omitStream + .maxChunkSize = 20, + .ensureFullChunks = false, + .isFirstChunk = true, + }); + + // Validate that the correct chunker type was created + ASSERT_NE( + dynamic_cast*>(chunker.get()), + nullptr); + + // Should return nullopt because omitStream should return true + auto result = chunker->next(); + EXPECT_FALSE(result.has_value()); + + // Test case where omitStream returns false + auto chunker2 = getStreamChunker( + nullableContentStream, + { + .minChunkSize = 4, // Set reasonable minChunkSize + .maxChunkSize = 20, + .ensureFullChunks = false, + }); + + // Should return valid chunks because omitStream should return false + auto result2 = chunker2->next(); + EXPECT_TRUE(result2.has_value()); + } +} + +TEST_F(StreamChunkerTestsBase, ContentStreamChunkerTest) { + auto scalarTypeBuilder = + schemaBuilder_->createScalarTypeBuilder(ScalarKind::Int32); + auto descriptor = &scalarTypeBuilder->scalarDescriptor(); + StreamChunkerOptions options{}; + + // ContentStreamData can be used to create a ContentStreamChunker. + ContentStreamData stream( + *leafPool_, *descriptor, *inputBufferGrowthPolicy_); + ASSERT_NE( + dynamic_cast*>( + getStreamChunker(stream, options).get()), + nullptr); + + // NullableContentStreamData can be used to create a ContentStreamChunker. + NullableContentStreamData nullableStream( + *leafPool_, *descriptor, *inputBufferGrowthPolicy_); + auto streamChunker = getStreamChunker(nullableStream, options); + auto contentStreamChunker = dynamic_cast< + ContentStreamChunker>*>( + streamChunker.get()); + ASSERT_NE(contentStreamChunker, nullptr); +} + +TEST_F(StreamChunkerTestsBase, ContentStreamIntChunking) { + auto scalarTypeBuilder = + schemaBuilder_->createScalarTypeBuilder(ScalarKind::Int32); + auto descriptor = &scalarTypeBuilder->scalarDescriptor(); + ContentStreamData stream( + *leafPool_, *descriptor, *inputBufferGrowthPolicy_); + + uint64_t maxChunkSize = 18; // 4.5 elements + uint64_t minChunkSize = 2; // 0 elements + // Test 1: Ensure Full Chunks. Min Size is Ignored. + { + std::vector testData = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; + auto& data = stream.mutableData(); + populateData(data, testData); + auto chunker = getStreamChunker( + stream, + { + .minChunkSize = minChunkSize, + .maxChunkSize = maxChunkSize, + .ensureFullChunks = true, + }); + // Validate that the correct chunker type was created + ASSERT_NE( + dynamic_cast*>(chunker.get()), nullptr); + std::vector> expectedChunks = { + {.chunkData = {1, 2, 3, 4}}, // First chunk: 4 elements + {.chunkData = {5, 6, 7, 8}} // Second chunk: 4 elements + // Remaining 2 elements won't be chunked due to minChunkSize + }; + // Expected retained data after compaction + ExpectedChunk expectedRetainedData{.chunkData = {9, 10}}; + validateChunk( + stream, std::move(chunker), expectedChunks, expectedRetainedData); + } + + // When ensureFullChunks = false. + { + // Test 2: Nothing is returned when chunk is below minChunkSize + minChunkSize = 12; // 3 elements + auto chunker = getStreamChunker( + stream, + { + .minChunkSize = minChunkSize, + .maxChunkSize = maxChunkSize, + .ensureFullChunks = false, + }); + // Validate that the correct chunker type was created + ExpectedChunk expectedRetainedData{.chunkData = {9, 10}}; + validateChunk( + stream, std::move(chunker), {}, expectedRetainedData); + + // Test 3: Everything is returned when chunk is above minChunkSize + minChunkSize = 2; + auto chunker2 = getStreamChunker( + stream, + { + .minChunkSize = minChunkSize, + .maxChunkSize = maxChunkSize, + .ensureFullChunks = false, + }); + // Validate that the correct chunker type was created + std::vector> expectedChunks = { + {.chunkData = {9, 10}}}; + validateChunk(stream, std::move(chunker2), expectedChunks, {}); + } + + // Test 4: We can reuse a stream post compaction. + { + auto& data = stream.mutableData(); + std::vector testData = {7, 8, 9, 10, 11}; + populateData(data, testData); + + maxChunkSize = 12; // 3 elements + auto chunker = getStreamChunker( + stream, + { + .minChunkSize = minChunkSize, + .maxChunkSize = maxChunkSize, + .ensureFullChunks = false, + }); + // Validate that the correct chunker type was created + ASSERT_NE( + dynamic_cast*>(chunker.get()), nullptr); + + std::vector> expectedChunks = { + {.chunkData = {7, 8, 9}}, // First chunk: 3 elements + {.chunkData = {10, 11}}, // Second chunk: 2 elements + }; + // No data retained after processing all chunks + validateChunk(stream, std::move(chunker), expectedChunks, {}); + } +} + +TEST_F(StreamChunkerTestsBase, ContentStreamStringChunking) { + auto scalarTypeBuilder = + schemaBuilder_->createScalarTypeBuilder(ScalarKind::String); + auto descriptor = &scalarTypeBuilder->scalarDescriptor(); + ContentStreamData stream( + *leafPool_, *descriptor, *inputBufferGrowthPolicy_); + uint64_t maxChunkSize = 55; + uint64_t minChunkSize = 1; + // Test 1: Ensure Full Chunks. Min Size is Ignored. + { + std::vector testData = { + "short", "a_longer_string", "x", "medium_size", "tiny"}; + auto& data = stream.mutableData(); + populateData(data, testData); + + // Calculate extra memory for string content + for (const auto& str : testData) { + stream.extraMemory() += str.size(); + } + + // Total string content sizes: + // "short"(5) + "a_longer_string"(15) + "x"(1) + "medium_size"(11) + + // "tiny"(4) = 36 bytes + ASSERT_EQ(stream.extraMemory(), 36); + auto chunker = getStreamChunker( + stream, + { + .minChunkSize = minChunkSize, + .maxChunkSize = maxChunkSize, + .ensureFullChunks = true, + }); + std::vector> expectedChunks = { + {.chunkData = {"short", "a_longer_string"}, + .chunkNonNulls = {}, + .hasNulls = false, + .extraMemory = + 20}, // "short"(5) + "a_longer_string"(15) = 20 bytes extra + {.chunkData = {"x", "medium_size"}, + .chunkNonNulls = {}, + .hasNulls = false, + .extraMemory = 12} // "x"(1) + "medium_size"(11) = 12 bytes extra + // "tiny"(4) remains due to minChunkSize constraint + }; + + ExpectedChunk expectedRetainedData{ + .chunkData = {"tiny"}, + .chunkNonNulls = {}, + .hasNulls = false, + .extraMemory = 4}; // "tiny"(4) = 4 bytes extra + validateChunk( + stream, std::move(chunker), expectedChunks, expectedRetainedData); + } + + // When ensureFullChunks = false + { + // Test 2: Nothing is returned when chunk is below minChunkSize + auto chunker = getStreamChunker( + stream, + { + .minChunkSize = 30, + .maxChunkSize = maxChunkSize, + .ensureFullChunks = false, + }); + + ExpectedChunk lastChunk = { + .chunkData = {"tiny"}, + .chunkNonNulls = {}, + .hasNulls = false, + .extraMemory = 4 // "tiny"(4) = 4 bytes extra + }; + validateChunk(stream, std::move(chunker), {}, lastChunk); + + // Test 3: Everything is returned when chunk is above minChunkSize + auto chunker2 = getStreamChunker( + stream, + { + .minChunkSize = 0, + .maxChunkSize = maxChunkSize, + .ensureFullChunks = false, + }); + validateChunk( + stream, std::move(chunker2), {lastChunk}, {}); + } + + // Test 4: We can reuse a stream post compaction. + { + auto& data = stream.mutableData(); + stream.extraMemory() = 0; // Reset extra memory + std::vector testData = { + "hello", "world", "hello", "world"}; + populateData(data, testData); + + // Calculate extra memory for string content + for (const auto& str : testData) { + stream.extraMemory() += str.size(); + } + + auto chunker = getStreamChunker( + stream, + { + .minChunkSize = 0, + .maxChunkSize = 42, + .ensureFullChunks = true, + }); + + std::vector> expectedChunks = { + {.chunkData = {"hello", "world"}, + .chunkNonNulls = {}, + .hasNulls = false, + .extraMemory = 10}, // "hello"(5) + "world"(5) = 10 bytes extra + {.chunkData = {"hello", "world"}, + .chunkNonNulls = {}, + .hasNulls = false, + .extraMemory = 10}, // "hello"(5) + "world"(5) = 10 bytes extra + }; + validateChunk( + stream, std::move(chunker), expectedChunks, {}); + } +} + +TEST_F(StreamChunkerTestsBase, NullsStreamChunking) { + auto scalarTypeBuilder = + schemaBuilder_->createScalarTypeBuilder(ScalarKind::Bool); + auto descriptor = &scalarTypeBuilder->scalarDescriptor(); + NullsStreamData stream(*leafPool_, *descriptor, *inputBufferGrowthPolicy_); + + // Setup test data with some nulls + std::vector testData = { + true, false, true, true, false, true, false, true, false, false}; + stream.ensureAdditionalNullsCapacity( + /*mayHaveNulls=*/true, static_cast(testData.size())); + auto& nonNulls = stream.mutableNonNulls(); + populateData(nonNulls, testData); + + // size of data is 10 * sizeof(bool) = 10 bytes + ASSERT_EQ(stream.memoryUsed(), 10); + EXPECT_EQ(stream.data().size(), 0); + + // 4 bytes (arbitrary chunk size) + uint32_t maxChunkSize = 4; + uint32_t minChunkSize = 1; + + // Test 1: Ensure Full Chunks. Min Size is Ignored. + { + auto chunker = getStreamChunker( + stream, + { + .minChunkSize = minChunkSize, + .maxChunkSize = maxChunkSize, + .ensureFullChunks = true, + }); + // Validate that the correct chunker type was created + ASSERT_NE(dynamic_cast(chunker.get()), nullptr); + + std::vector> expectedChunks = { + {.chunkData = {true, false, true, true}, + .chunkNonNulls = {}, + .hasNulls = false}, + {.chunkData = {false, true, false, true}, + .chunkNonNulls = {}, + .hasNulls = false}}; + // Expected retained data after compaction (no data, but non-nulls remain) + ExpectedChunk expectedRetainedData{ + .chunkData = {}, .chunkNonNulls = {false, false}, .hasNulls = true}; + validateChunk( + stream, std::move(chunker), expectedChunks, expectedRetainedData); + } + + // When ensureFullChunks = false + { + // Test 2: Nothing is returned when chunk is below minChunkSize + auto chunker = getStreamChunker( + stream, + { + .minChunkSize = 3, + .maxChunkSize = maxChunkSize, + .ensureFullChunks = false, + }); + + validateChunk( + stream, + std::move(chunker), + {}, + {.chunkNonNulls = {false, false}, .hasNulls = true}); + + // Test 3: Everything is returned when chunk is above minChunkSize + auto chunker2 = getStreamChunker( + stream, + { + .minChunkSize = 1, + .maxChunkSize = maxChunkSize, + .ensureFullChunks = false, + }); + // No data retained after processing all chunks + validateChunk( + stream, std::move(chunker2), {{.chunkData = {false, false}}}, {}); + } + + // Test 4: We can reuse a stream post compaction. + { + testData = { + true, false, true, true, false, true, false, true, false, false}; + stream.ensureAdditionalNullsCapacity( + /*mayHaveNulls=*/true, static_cast(testData.size())); + nonNulls = stream.mutableNonNulls(); + populateData(nonNulls, testData); + auto memoryUsed = stream.memoryUsed(); + // When maxChunkSize is equal to available memory, chunk is returned. + auto chunker2 = getStreamChunker( + stream, + { + .minChunkSize = minChunkSize, + .maxChunkSize = memoryUsed, + .ensureFullChunks = true, + }); + auto expectedChunks = {ExpectedChunk{ + .chunkData = testData, .chunkNonNulls = {}, .hasNulls = false}}; + // No data retained after processing all chunks + validateChunk(stream, std::move(chunker2), expectedChunks, {}); + } +} + +TEST_F(StreamChunkerTestsBase, NullableContentStreamIntChunking) { + auto scalarTypeBuilder = + schemaBuilder_->createScalarTypeBuilder(ScalarKind::Int32); + auto descriptor = &scalarTypeBuilder->scalarDescriptor(); + NullableContentStreamData stream( + *leafPool_, *descriptor, *inputBufferGrowthPolicy_); + + // Setup test data with some nulls + std::vector testData = {1, 2, 3, 4, 5, 6, 7}; + std::vector nonNullsData = { + true, false, true, true, false, true, false, true, true, true}; + + // Populate data using ensureAdditionalNullsCapacity and manual data + // insertion + stream.ensureAdditionalNullsCapacity( + /*mayHaveNulls=*/true, static_cast(nonNullsData.size())); + populateData(stream.mutableData(), testData); + populateData(stream.mutableNonNulls(), nonNullsData); + + // 7 * sizeof(int32_t) + 10 * sizeof(bool)) = 38 + ASSERT_EQ(stream.memoryUsed(), 38); + + // Test 1: Ensure Full Chunks. Min Size is Ignored + uint64_t maxChunkSize = 17; + uint64_t minChunkSize = 1; + { + auto chunker = getStreamChunker( + stream, + { + .minChunkSize = minChunkSize, + .maxChunkSize = maxChunkSize, + .ensureFullChunks = true, + }); + + std::vector> expectedChunks = { + {.chunkData = {1, 2, 3}, + .chunkNonNulls = {true, false, true, true, false}, + .hasNulls = true}, + {.chunkData = {4, 5, 6}, + .chunkNonNulls = {true, false, true, true}, + .hasNulls = true}}; + // Expected retained data after compaction + ExpectedChunk expectedRetainedData{ + .chunkData = {7}, .chunkNonNulls = {true}, .hasNulls = true}; + validateChunk( + stream, std::move(chunker), expectedChunks, expectedRetainedData); + } + + // When ensureFullChunks = false + { + // Test 2: Nothing is returned when chunk is below minChunkSize + auto chunker = getStreamChunker( + stream, + { + .minChunkSize = 80, + .maxChunkSize = maxChunkSize, + .ensureFullChunks = false, + }); + // Validate that the correct chunker type was created + ASSERT_NE( + dynamic_cast*>(chunker.get()), + nullptr); + // Nothing is returned + validateChunk( + stream, + std::move(chunker), + {}, + {.chunkData = {7}, .chunkNonNulls = {true}, .hasNulls = true}); + // Test 3: Everything is returned when chunk is above minChunkSize + auto chunker2 = getStreamChunker( + stream, + { + .minChunkSize = 1, + .maxChunkSize = maxChunkSize, + .ensureFullChunks = false, + }); + // Everything is returned. + validateChunk( + stream, + std::move(chunker2), + {{.chunkData = {7}, .hasNulls = false}}, + {}); + } + + // Test 4: We can reuse a stream post compaction + { + std::vector newTestData = {10, 11}; + std::vector test3NonNullsData = { + true, false, false, true, false, false}; + stream.ensureAdditionalNullsCapacity( + /*mayHaveNulls=*/true, static_cast(test3NonNullsData.size())); + auto& data = stream.mutableData(); + auto& nonNulls = stream.mutableNonNulls(); + populateData(data, newTestData); + populateData(nonNulls, test3NonNullsData); + + auto chunker = getStreamChunker( + stream, + {.maxChunkSize = stream.memoryUsed(), .ensureFullChunks = true}); + std::vector> expectedChunks = { + {.chunkData = newTestData, + .chunkNonNulls = test3NonNullsData, + .hasNulls = true}}; + validateChunk(stream, std::move(chunker), expectedChunks, {}); + } +} + +TEST_F(StreamChunkerTestsBase, NullableContentStreamStringChunking) { + auto scalarTypeBuilder = + schemaBuilder_->createScalarTypeBuilder(ScalarKind::String); + auto descriptor = &scalarTypeBuilder->scalarDescriptor(); + NullableContentStreamData stream( + *leafPool_, *descriptor, *inputBufferGrowthPolicy_); + + // Setup test data with some nulls + std::vector testData = { + "hello", "world", "test", "string", "chunk", "data", "example"}; + std::vector nonNullsData = { + true, false, true, true, false, true, false, true, true, true, false}; + + // Populate data using ensureAdditionalNullsCapacity and manual data + // insertion + stream.ensureAdditionalNullsCapacity( + /*mayHaveNulls=*/true, static_cast(nonNullsData.size())); + auto& data = stream.mutableData(); + auto& nonNulls = stream.mutableNonNulls(); + populateData(data, testData); + populateData(nonNulls, nonNullsData); + + // Set extra memory for string overhead + // number of strings: 7 + // size of string view data pointers = 7 * 8 = 56 bytes + // size of string view data size = 7 * 8 = 56 bytes + // size of nulls = 11 * 1 = 11 bytes + // total size of stored string data = 36 bytes + // total size of stream data = 56 + 56 + 11 + 36 = 159 bytes + for (const auto& entry : testData) { + stream.extraMemory() += entry.size(); + } + ASSERT_EQ(stream.memoryUsed(), 159); + + // Test 1: Not last chunk + { + uint64_t maxChunkSize = 72; + uint64_t minChunkSize = 50; + auto chunker = getStreamChunker( + stream, + { + .minChunkSize = minChunkSize, + .maxChunkSize = maxChunkSize, + .ensureFullChunks = true, + }); + + std::vector> expectedChunks = { + {.chunkData = {"hello", "world", "test"}, + .chunkNonNulls = {true, false, true, true, false}, + .hasNulls = true, + .extraMemory = + 14}, // "hello"(5) + "world"(5) + "test"(4) = 14 bytes extra + {.chunkData = {"string", "chunk", "data"}, + .chunkNonNulls = {true, false, true, true}, + .hasNulls = true, + .extraMemory = 15} + // "string"(6) + "chunk"(5) + "data"(4) = 14 bytes extra + }; + // Expected retained data after compaction + ExpectedChunk expectedRetainedData{ + .chunkData = {"example"}, + .chunkNonNulls = {true, false}, + .hasNulls = true, + .extraMemory = 7}; + validateChunk( + stream, std::move(chunker), expectedChunks, expectedRetainedData); + } + + // When ensureFullChunks = false + { + // Test 2: Nothing is returned when chunk is below minChunkSize + uint64_t maxChunkSize = 35; + auto chunker = getStreamChunker( + stream, + { + .minChunkSize = stream.memoryUsed() + 1, + .maxChunkSize = maxChunkSize, + .ensureFullChunks = false, + }); + + ExpectedChunk lastChunk{ + .chunkData = {"example"}, + .chunkNonNulls = {true, false}, + .hasNulls = true, + .extraMemory = 7 // "example"(7) = 7 bytes extra + }; + + // Nothing is returned + validateChunk( + stream, std::move(chunker), {}, {lastChunk}); + + // Test 3: Everything is returned when chunk is above minChunkSize + auto chunker2 = getStreamChunker( + stream, + { + .minChunkSize = stream.memoryUsed(), + .maxChunkSize = maxChunkSize, + .ensureFullChunks = false, + }); + validateChunk( + stream, std::move(chunker2), {lastChunk}, {}); + } + + // Test 4: We can reuse a stream post compaction + { + std::vector smallTestData = {"test", "data"}; + std::vector smallNonNullsData = {true, false, true}; + stream.ensureAdditionalNullsCapacity( + /*mayHaveNulls=*/true, static_cast(smallNonNullsData.size())); + auto& smallData = stream.mutableData(); + auto& smallNonNulls = stream.mutableNonNulls(); + populateData(smallData, smallTestData); + populateData(smallNonNulls, smallNonNullsData); + + // Reset extra memory for new test data + stream.extraMemory() = 0; + for (const auto& entry : smallTestData) { + stream.extraMemory() += entry.size(); + } + + uint64_t maxChunkSize = stream.memoryUsed() + 1; // Larger than data size + uint64_t minChunkSize = stream.memoryUsed(); + auto chunker = getStreamChunker( + stream, + { + .minChunkSize = minChunkSize, + .maxChunkSize = maxChunkSize, + .ensureFullChunks = false, + }); + validateChunk( + stream, + std::move(chunker), + {{.chunkData = {"test", "data"}, + .chunkNonNulls = {true, false, true}, + .hasNulls = true}}, + {}); + } +} +} // namespace facebook::nimble diff --git a/dwio/nimble/velox/tests/VeloxReaderTests.cpp b/dwio/nimble/velox/tests/VeloxReaderTests.cpp index e982617..b77bfe5 100644 --- a/dwio/nimble/velox/tests/VeloxReaderTests.cpp +++ b/dwio/nimble/velox/tests/VeloxReaderTests.cpp @@ -563,12 +563,14 @@ class VeloxReaderTests : public ::testing::Test { std::string& file, nimble::VeloxWriterOptions writerOptions) { auto writeFile = std::make_unique(&file); - nimble::FlushDecision decision; + bool flushDecision; + bool chunkDecision; writerOptions.enableChunking = true; writerOptions.minStreamChunkRawSize = folly::Random::rand32(30, rng); writerOptions.flushPolicyFactory = [&]() { return std::make_unique( - [&](auto&) { return decision; }); + /*flushLambda=*/[&](auto&) { return flushDecision; }, + /*chunkLambda=*/[&](auto&) { return chunkDecision; }); }; std::vector expected; @@ -579,16 +581,17 @@ class VeloxReaderTests : public ::testing::Test { auto vector = generator(type, i); int32_t rowIndex = 0; while (rowIndex < vector->size()) { - decision = nimble::FlushDecision::None; + flushDecision = false; + chunkDecision = false; auto batchSize = vector->size() - rowIndex; // Randomly produce chunks if (folly::Random::oneIn(2, rng)) { batchSize = folly::Random::rand32(0, batchSize, rng) + 1; - decision = nimble::FlushDecision::Chunk; + chunkDecision = true; } if ((perBatchFlush || folly::Random::oneIn(5, rng)) && (rowIndex + batchSize == vector->size())) { - decision = nimble::FlushDecision::Stripe; + flushDecision = true; } writer.write(vector->slice(rowIndex, batchSize)); rowIndex += batchSize; @@ -773,7 +776,7 @@ class VeloxReaderTests : public ::testing::Test { writerOptions.enableChunking = true; writerOptions.flushPolicyFactory = [&]() { return std::make_unique( - [&](auto&) { return nimble::FlushDecision::None; }); + /*flushLambda=*/[&](auto&) { return false; }); }; writerOptions.dictionaryArrayColumns.insert("dictionaryArray"); writerOptions.vectorDecoderVisitor = [&decodeCounter]() { @@ -5506,7 +5509,8 @@ TEST_F(VeloxReaderTests, ChunkStreamsWithNulls) { .flushPolicyFactory = [&]() { return std::make_unique( - [&](auto&) { return nimble::FlushDecision::Chunk; }); + /*flushLambda=*/[&](auto&) { return false; }, + /*chunkLambda=*/[&](auto&) { return true; }); }, .enableChunking = enableChunking}; auto file = nimble::test::createNimbleFile( diff --git a/dwio/nimble/velox/tests/VeloxWriterTests.cpp b/dwio/nimble/velox/tests/VeloxWriterTests.cpp index a4d1141..9f6024a 100644 --- a/dwio/nimble/velox/tests/VeloxWriterTests.cpp +++ b/dwio/nimble/velox/tests/VeloxWriterTests.cpp @@ -18,10 +18,13 @@ #include "dwio/nimble/common/Exceptions.h" #include "dwio/nimble/common/tests/TestUtils.h" +#include "dwio/nimble/encodings/EncodingFactory.h" #include "dwio/nimble/encodings/EncodingLayoutCapture.h" +#include "dwio/nimble/encodings/EncodingUtils.h" #include "dwio/nimble/tablet/Constants.h" #include "dwio/nimble/velox/ChunkedStream.h" #include "dwio/nimble/velox/EncodingLayoutTree.h" +#include "dwio/nimble/velox/FlushPolicy.h" #include "dwio/nimble/velox/SchemaSerialization.h" #include "dwio/nimble/velox/StatsGenerated.h" #include "dwio/nimble/velox/VeloxReader.h" @@ -35,6 +38,12 @@ namespace facebook { +DEFINE_uint32( + writer_tests_seed, + 0, + "If provided, this seed will be used when executing tests. " + "Otherwise, a random seed will be used."); + class VeloxWriterTests : public ::testing::Test { protected: static void SetUpTestCase() { @@ -104,7 +113,7 @@ TEST_F(VeloxWriterTests, ExceptionOnClose) { std::move(writeFile), {.flushPolicyFactory = [&]() { return std::make_unique( - [&](auto&) { return nimble::FlushDecision::Stripe; }); + /*flushLambda=*/[&](auto&) { return true; }); }}); std::string error; try { @@ -289,29 +298,92 @@ std::vector generateBatches( velox::VectorFuzzer fuzzer( {.vectorSize = size, .nullRatio = 0.1}, &pool, seed); std::vector batches; - + batches.reserve(batchCount); for (size_t i = 0; i < batchCount; ++i) { batches.push_back(fuzzer.fuzzInputFlatRow(type)); } return batches; } + +struct ChunkSizeResults { + uint32_t stripeCount; + uint32_t minChunkCount; + uint32_t maxChunkCount; +}; + +ChunkSizeResults validateChunkSize( + nimble::VeloxReader& reader, + const uint64_t minStreamChunkRawSize) { + const auto& tablet = reader.tabletReader(); + auto& pool = reader.memoryPool(); + + const uint32_t stripeCount = tablet.stripeCount(); + uint32_t maxChunkCount = 0; + uint32_t minChunkCount = std::numeric_limits::max(); + + for (uint32_t stripeIndex = 0; stripeIndex < stripeCount; ++stripeIndex) { + const auto stripeIdentifier = tablet.getStripeIdentifier(stripeIndex); + const auto streamCount = tablet.streamCount(stripeIdentifier); + + std::vector streamIds(streamCount); + std::iota(streamIds.begin(), streamIds.end(), 0); + auto streamLoaders = tablet.load(stripeIdentifier, streamIds); + + for (uint32_t streamId = 0; streamId < streamLoaders.size(); ++streamId) { + if (!streamLoaders[streamId]) { + continue; + } + nimble::InMemoryChunkedStream chunkedStream{ + pool, std::move(streamLoaders[streamId])}; + uint32_t currentStreamChunkCount = 0; + while (chunkedStream.hasNext()) { + ++currentStreamChunkCount; + const auto chunk = chunkedStream.nextChunk(); + // Validate min chunk size when not last chunk + if (chunkedStream.hasNext()) { + const auto encoding = nimble::EncodingFactory::decode(pool, chunk); + const auto rowCount = encoding->rowCount(); + const auto dataType = encoding->dataType(); + const uint64_t chunkRawDataSize = + facebook::nimble::detail::dataTypeSize(dataType) * rowCount; + EXPECT_GE(chunkRawDataSize, minStreamChunkRawSize) + << "Stream " << streamId << " has a non-last chunk with size " + << chunkRawDataSize << " which is below min chunk size of " + << minStreamChunkRawSize; + } + } + DWIO_ENSURE_GT( + currentStreamChunkCount, + 0, + "Non null streams should have at least one chunk"); + maxChunkCount = std::max(maxChunkCount, currentStreamChunkCount); + minChunkCount = std::min(minChunkCount, currentStreamChunkCount); + } + } + + return ChunkSizeResults{ + .stripeCount = stripeCount, + .minChunkCount = minChunkCount, + .maxChunkCount = maxChunkCount, + }; +} } // namespace -struct RawStripeSizeFlushPolicyTestCase { +struct StripeRawSizeFlushPolicyTestCase { const size_t batchCount; const uint32_t rawStripeSize; const uint32_t stripeCount; }; -class RawStripeSizeFlushPolicyTest +class StripeRawSizeFlushPolicyTest : public VeloxWriterTests, - public ::testing::WithParamInterface {}; + public ::testing::WithParamInterface {}; -TEST_P(RawStripeSizeFlushPolicyTest, RawStripeSizeFlushPolicy) { +TEST_P(StripeRawSizeFlushPolicyTest, StripeRawSizeFlushPolicy) { auto type = velox::ROW({{"simple", velox::INTEGER()}}); nimble::VeloxWriterOptions writerOptions{.flushPolicyFactory = []() { // Buffering 256MB data before encoding stripes. - return std::make_unique( + return std::make_unique( GetParam().rawStripeSize); }}; @@ -385,7 +457,7 @@ TEST_F(VeloxWriterTests, MemoryReclaimPath) { TEST_F(VeloxWriterTests, FlushHugeStrings) { nimble::VeloxWriterOptions writerOptions{.flushPolicyFactory = []() { - return std::make_unique(1 * 1024 * 1024); + return std::make_unique(1 * 1024 * 1024); }}; velox::test::VectorMaker vectorMaker{leafPool_.get()}; @@ -1029,7 +1101,7 @@ TEST_F(VeloxWriterTests, CombineMultipleLayersOfDictionaries) { void testChunks( velox::memory::MemoryPool& rootPool, uint32_t minStreamChunkRawSize, - std::vector> vectors, + std::vector> vectors, std::function verifier, folly::F14FastSet flatMapColumns = {}) { ASSERT_LT(0, vectors.size()); @@ -1041,7 +1113,7 @@ void testChunks( std::string file; auto writeFile = std::make_unique(&file); - auto flushDecision = nimble::FlushDecision::None; + auto flushDecision = false; nimble::VeloxWriter writer( rootPool, type, @@ -1052,7 +1124,8 @@ void testChunks( .flushPolicyFactory = [&]() { return std::make_unique( - [&](auto&) { return flushDecision; }); + /*flushLambda=*/[&](auto&) { return false; }, + /*chunkLambda=*/[&](auto&) { return flushDecision; }); }, .enableChunking = true, }); @@ -1080,6 +1153,8 @@ void testChunks( ASSERT_TRUE(expected->equalValueAt(result.get(), i, i)); } ASSERT_FALSE(reader.next(1, result)); + + validateChunkSize(reader, minStreamChunkRawSize); } TEST_F(VeloxWriterTests, ChunkedStreamsRowAllNullsNoChunks) { @@ -1095,8 +1170,7 @@ TEST_F(VeloxWriterTests, ChunkedStreamsRowAllNullsNoChunks) { testChunks( *rootPool_, /* minStreamChunkRawSize */ 0, - {{vector, nimble::FlushDecision::None}, - {vector, nimble::FlushDecision::None}}, + {{vector, false}, {vector, false}}, [&](const auto& tablet) { auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); @@ -1131,8 +1205,7 @@ TEST_F(VeloxWriterTests, ChunkedStreamsRowAllNullsWithChunksMinSizeBig) { testChunks( *rootPool_, /* minStreamChunkRawSize */ 1024, - {{vector, nimble::FlushDecision::Chunk}, - {vector, nimble::FlushDecision::Chunk}}, + {{vector, true}, {vector, true}}, [&](const auto& tablet) { auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); @@ -1168,8 +1241,7 @@ TEST_F(VeloxWriterTests, ChunkedStreamsRowAllNullsWithChunksMinSizeZero) { testChunks( *rootPool_, /* minStreamChunkRawSize */ 0, - {{vector, nimble::FlushDecision::Chunk}, - {vector, nimble::FlushDecision::Chunk}}, + {{vector, true}, {vector, true}}, [&](const auto& tablet) { auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); @@ -1212,8 +1284,7 @@ TEST_F(VeloxWriterTests, ChunkedStreamsRowSomeNullsNoChunks) { testChunks( *rootPool_, /* minStreamChunkRawSize */ 0, - {{nullsVector, nimble::FlushDecision::None}, - {nonNullsVector, nimble::FlushDecision::None}}, + {{nullsVector, false}, {nonNullsVector, false}}, [&](const auto& tablet) { auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); @@ -1261,8 +1332,7 @@ TEST_F(VeloxWriterTests, ChunkedStreamsRowSomeNullsWithChunksMinSizeBig) { testChunks( *rootPool_, /* minStreamChunkRawSize */ 1024, - {{nullsVector, nimble::FlushDecision::Chunk}, - {nonNullsVector, nimble::FlushDecision::Chunk}}, + {{nullsVector, true}, {nonNullsVector, true}}, [&](const auto& tablet) { auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); @@ -1311,8 +1381,7 @@ TEST_F(VeloxWriterTests, ChunkedStreamsRowSomeNullsWithChunksMinSizeZero) { testChunks( *rootPool_, /* minStreamChunkRawSize */ 0, - {{nullsVector, nimble::FlushDecision::Chunk}, - {nonNullsVector, nimble::FlushDecision::Chunk}}, + {{nullsVector, true}, {nonNullsVector, true}}, [&](const auto& tablet) { auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); @@ -1353,8 +1422,7 @@ TEST_F(VeloxWriterTests, ChunkedStreamsRowNoNullsNoChunks) { testChunks( *rootPool_, /* minStreamChunkRawSize */ 0, - {{vector, nimble::FlushDecision::None}, - {vector, nimble::FlushDecision::None}}, + {{vector, false}, {vector, false}}, [&](const auto& tablet) { auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); @@ -1393,8 +1461,7 @@ TEST_F(VeloxWriterTests, ChunkedStreamsRowNoNullsWithChunksMinSizeBig) { testChunks( *rootPool_, /* minStreamChunkRawSize */ 1024, - {{vector, nimble::FlushDecision::Chunk}, - {vector, nimble::FlushDecision::Chunk}}, + {{vector, true}, {vector, true}}, [&](const auto& tablet) { auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); @@ -1434,8 +1501,7 @@ TEST_F(VeloxWriterTests, ChunkedStreamsRowNoNullsWithChunksMinSizeZero) { testChunks( *rootPool_, /* minStreamChunkRawSize */ 0, - {{vector, nimble::FlushDecision::Chunk}, - {vector, nimble::FlushDecision::Chunk}}, + {{vector, true}, {vector, true}}, [&](const auto& tablet) { auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); @@ -1475,8 +1541,7 @@ TEST_F(VeloxWriterTests, ChunkedStreamsChildAllNullsNoChunks) { testChunks( *rootPool_, /* minStreamChunkRawSize */ 0, - {{vector, nimble::FlushDecision::None}, - {vector, nimble::FlushDecision::None}}, + {{vector, false}, {vector, false}}, [&](const auto& tablet) { auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); @@ -1501,8 +1566,7 @@ TEST_F(VeloxWriterTests, ChunkedStreamsChildAllNullsWithChunksMinSizeBig) { testChunks( *rootPool_, /* minStreamChunkRawSize */ 1024, - {{vector, nimble::FlushDecision::Chunk}, - {vector, nimble::FlushDecision::Chunk}}, + {{vector, true}, {vector, true}}, [&](const auto& tablet) { auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); @@ -1527,8 +1591,7 @@ TEST_F(VeloxWriterTests, ChunkedStreamsChildAllNullsWithChunksMinSizeZero) { testChunks( *rootPool_, /* minStreamChunkRawSize */ 0, - {{vector, nimble::FlushDecision::Chunk}, - {vector, nimble::FlushDecision::Chunk}}, + {{vector, true}, {vector, true}}, [&](const auto& tablet) { auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); @@ -1560,8 +1623,7 @@ TEST_F(VeloxWriterTests, ChunkedStreamsFlatmapAllNullsNoChunks) { testChunks( *rootPool_, /* minStreamChunkRawSize */ 0, - {{vector, nimble::FlushDecision::None}, - {vector, nimble::FlushDecision::None}}, + {{vector, false}, {vector, false}}, [&](const auto& tablet) { auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); @@ -1603,8 +1665,7 @@ TEST_F(VeloxWriterTests, ChunkedStreamsFlatmapAllNullsWithChunksMinSizeBig) { testChunks( *rootPool_, /* minStreamChunkRawSize */ 1024, - {{vector, nimble::FlushDecision::Chunk}, - {vector, nimble::FlushDecision::Chunk}}, + {{vector, true}, {vector, true}}, [&](const auto& tablet) { auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); @@ -1647,8 +1708,7 @@ TEST_F(VeloxWriterTests, ChunkedStreamsFlatmapAllNullsWithChunksMinSizeZero) { testChunks( *rootPool_, /* minStreamChunkRawSize */ 0, - {{vector, nimble::FlushDecision::Chunk}, - {vector, nimble::FlushDecision::Chunk}}, + {{vector, true}, {vector, true}}, [&](const auto& tablet) { auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); @@ -1702,8 +1762,7 @@ TEST_F(VeloxWriterTests, ChunkedStreamsFlatmapSomeNullsNoChunks) { testChunks( *rootPool_, /* minStreamChunkRawSize */ 0, - {{nullsVector, nimble::FlushDecision::None}, - {nonNullsVector, nimble::FlushDecision::None}}, + {{nullsVector, false}, {nonNullsVector, false}}, [&](const auto& tablet) { auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); @@ -1780,8 +1839,7 @@ TEST_F(VeloxWriterTests, ChunkedStreamsFlatmapSomeNullsWithChunksMinSizeBig) { testChunks( *rootPool_, /* minStreamChunkRawSize */ 1024, - {{nullsVector, nimble::FlushDecision::Chunk}, - {nonNullsVector, nimble::FlushDecision::Chunk}}, + {{nullsVector, true}, {nonNullsVector, true}}, [&](const auto& tablet) { auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); @@ -1861,8 +1919,7 @@ TEST_F(VeloxWriterTests, ChunkedStreamsFlatmapSomeNullsWithChunksMinSizeZero) { testChunks( *rootPool_, /* minStreamChunkRawSize */ 0, - {{nullsVector, nimble::FlushDecision::Chunk}, - {nonNullsVector, nimble::FlushDecision::Chunk}}, + {{nullsVector, true}, {nonNullsVector, true}}, [&](const auto& tablet) { auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); @@ -1952,28 +2009,388 @@ TEST_F(VeloxWriterTests, RawSizeWritten) { ASSERT_EQ(expectedRawSize, rawSize); } +struct ChunkFlushPolicyTestCase { + const size_t batchCount{20}; + const bool enableChunking{true}; + const uint64_t targetStripeSizeBytes{250 << 10}; + const uint64_t writerMemoryHighThresholdBytes{80 << 10}; + const uint64_t writerMemoryLowThresholdBytes{75 << 10}; + const double estimatedCompressionFactor{1.3}; + const uint32_t minStreamChunkRawSize{100}; + const uint32_t expectedStripeCount{0}; + const uint32_t expectedMaxChunkCount{0}; + const uint32_t expectedMinChunkCount{0}; + const uint32_t chunkedStreamBatchSize{2}; +}; + +class ChunkFlushPolicyTest + : public VeloxWriterTests, + public ::testing::WithParamInterface {}; + +TEST_P(ChunkFlushPolicyTest, ChunkFlushPolicyIntegration) { + const auto type = velox::ROW( + {{"BIGINT", velox::BIGINT()}, {"SMALLINT", velox::SMALLINT()}}); + nimble::VeloxWriterOptions writerOptions{ + .minStreamChunkRawSize = GetParam().minStreamChunkRawSize, + .chunkedStreamBatchSize = GetParam().chunkedStreamBatchSize, + .flushPolicyFactory = GetParam().enableChunking + ? []() -> std::unique_ptr { + return std::make_unique( + nimble::ChunkFlushPolicyConfig{ + .writerMemoryHighThresholdBytes = GetParam().writerMemoryHighThresholdBytes, + .writerMemoryLowThresholdBytes = GetParam().writerMemoryLowThresholdBytes, + .targetStripeSizeBytes = GetParam().targetStripeSizeBytes, + .estimatedCompressionFactor = + GetParam().estimatedCompressionFactor, + }); + } + : []() -> std::unique_ptr { + return std::make_unique( + GetParam().targetStripeSizeBytes); + }, + .enableChunking = GetParam().enableChunking, + }; + + std::string file; + auto writeFile = std::make_unique(&file); + + nimble::VeloxWriter writer( + *rootPool_, type, std::move(writeFile), std::move(writerOptions)); + const auto batches = generateBatches( + type, + GetParam().batchCount, + /*size=*/4000, + /*seed=*/20221110, + *leafPool_); + + for (const auto& batch : batches) { + writer.write(batch); + } + writer.close(); + velox::InMemoryReadFile readFile(file); + nimble::VeloxReader reader(*leafPool_, &readFile); + ChunkSizeResults result = + validateChunkSize(reader, GetParam().minStreamChunkRawSize); + + EXPECT_EQ(GetParam().expectedStripeCount, result.stripeCount); + EXPECT_EQ(GetParam().expectedMaxChunkCount, result.maxChunkCount); + EXPECT_EQ(GetParam().expectedMinChunkCount, result.minChunkCount); +} + +TEST_F(VeloxWriterTests, FuzzComplex) { + auto type = velox::ROW( + {{"array", velox::ARRAY(velox::REAL())}, + {"dict_array", velox::ARRAY(velox::REAL())}, + {"map", velox::MAP(velox::INTEGER(), velox::DOUBLE())}, + {"row", + velox::ROW({ + {"a", velox::REAL()}, + {"b", velox::INTEGER()}, + })}, + {"row", + velox::ROW( + {{"nested_row", + velox::ROW( + {{"nested_nested_row", velox::ROW({{"a", velox::INTEGER()}})}, + {"b", velox::INTEGER()}})}})}, + {"map", + velox::MAP(velox::INTEGER(), velox::ROW({{"a", velox::INTEGER()}}))}, + {"nested", + velox::ARRAY( + velox::ROW({ + {"a", velox::INTEGER()}, + {"b", velox::MAP(velox::REAL(), velox::REAL())}, + }))}, + {"nested_map_array1", + velox::MAP(velox::INTEGER(), velox::ARRAY(velox::REAL()))}, + {"nested_map_array2", + velox::MAP(velox::INTEGER(), velox::ARRAY(velox::INTEGER()))}, + {"dict_map", velox::MAP(velox::INTEGER(), velox::INTEGER())}}); + auto rowType = std::dynamic_pointer_cast(type); + uint32_t seed = FLAGS_writer_tests_seed > 0 ? FLAGS_writer_tests_seed + : folly::Random::rand32(); + LOG(INFO) << "seed: " << seed; + std::mt19937 rng{seed}; + for (auto parallelismFactor : {0U, 1U, std::thread::hardware_concurrency()}) { + std::shared_ptr executor; + nimble::VeloxWriterOptions writerOptions; + writerOptions.enableChunking = true; + writerOptions.flushPolicyFactory = + []() -> std::unique_ptr { + return std::make_unique( + nimble::ChunkFlushPolicyConfig{ + .writerMemoryHighThresholdBytes = 200 << 10, + .writerMemoryLowThresholdBytes = 100 << 10, + .targetStripeSizeBytes = 100 << 10, + .estimatedCompressionFactor = 1.7, + }); + }; + + LOG(INFO) << "Parallelism Factor: " << parallelismFactor; + writerOptions.dictionaryArrayColumns.insert("nested_map_array1"); + writerOptions.dictionaryArrayColumns.insert("nested_map_array2"); + writerOptions.dictionaryArrayColumns.insert("dict_array"); + writerOptions.deduplicatedMapColumns.insert("dict_map"); + + if (parallelismFactor > 0) { + executor = + std::make_shared(parallelismFactor); + writerOptions.encodingExecutor = folly::getKeepAliveToken(*executor); + writerOptions.writeExecutor = folly::getKeepAliveToken(*executor); + } + + const auto iterations = 20; + for (auto i = 0; i < iterations; ++i) { + writerOptions.minStreamChunkRawSize = + std::uniform_int_distribution(10, 4096)(rng); + const auto batchSize = + std::uniform_int_distribution(10, 400)(rng); + const auto batchCount = 5; + + std::string file; + auto writeFile = std::make_unique(&file); + nimble::VeloxWriter writer( + *leafPool_.get(), type, std::move(writeFile), writerOptions); + const auto batches = generateBatches( + type, + /*batchCount=*/batchCount, + /*size=*/batchSize, + /*seed=*/seed, + *leafPool_); + + for (const auto& batch : batches) { + writer.write(batch); + } + writer.close(); + + velox::InMemoryReadFile readFile(file); + nimble::VeloxReader reader(*leafPool_, &readFile); + validateChunkSize(reader, writerOptions.minStreamChunkRawSize); + } + } +} + +TEST_F(VeloxWriterTests, BatchedChunkingRelievesMemoryPressure) { + // Verify we stop chunking early when chunking relieves memory pressure. + const uint32_t seed = FLAGS_writer_tests_seed > 0 ? FLAGS_writer_tests_seed + : folly::Random::rand32(); + LOG(INFO) << "seed: " << seed; + std::mt19937 rng{seed}; + const uint32_t rowCount = + std::uniform_int_distribution(1, 4096)(rng); + + velox::VectorFuzzer fuzzer({.vectorSize = rowCount}, leafPool_.get(), seed); + const auto stringColumn = fuzzer.fuzzFlat(velox::VARCHAR()); + const auto intColumn = fuzzer.fuzzFlat(velox::INTEGER()); + + nimble::RawSizeContext context; + nimble::OrderedRanges ranges; + ranges.add(0, rowCount); + const uint64_t stringColumnRawSize = + nimble::getRawSizeFromVector(stringColumn, ranges, context) + + sizeof(std::string_view) * rowCount; + const uint64_t intColumnRawSize = + nimble::getRawSizeFromVector(intColumn, ranges, context); + + constexpr size_t kColumnCount = 20; + constexpr size_t kBatchSize = 4; + std::vector children(kColumnCount); + std::vector columnNames(kColumnCount); + uint64_t totalRawSize = 0; + for (size_t i = 0; i < kColumnCount; i += 2) { + columnNames[i] = fmt::format("string_column_{}", i); + columnNames[i + 1] = fmt::format("int_column_{}", i); + children[i] = stringColumn; + children[i + 1] = intColumn; + totalRawSize += intColumnRawSize + stringColumnRawSize; + } + + velox::test::VectorMaker vectorMaker{leafPool_.get()}; + const auto rowVector = vectorMaker.rowVector(columnNames, children); + + // We will return true twice and false once + const std::vector expectedChunkingDecisions{true, true, false}; + std::vector actualChunkingDecisions; + + // We will be chunking the large streams in the first two batches. 8 string + // streams in total. We set the expected rawSize after chunking these two + // batches as our memory threshold. + const uint64_t memoryPressureThreshold = + totalRawSize - (2 * kBatchSize * stringColumnRawSize); + + nimble::VeloxWriterOptions writerOptions; + writerOptions.chunkedStreamBatchSize = kBatchSize; + writerOptions.enableChunking = true; + writerOptions.minStreamChunkRawSize = intColumnRawSize / 2; + writerOptions.flushPolicyFactory = + [&]() -> std::unique_ptr { + return std::make_unique( + /* shouldFlush */ [](const auto&) { return true; }, + /* shouldChunk */ + [&](const nimble::StripeProgress& stripeProgress) { + const bool shouldChunk = + stripeProgress.stripeRawSize > memoryPressureThreshold; + actualChunkingDecisions.push_back(shouldChunk); + return shouldChunk; + }); + }; + + std::string file; + auto writeFile = std::make_unique(&file); + nimble::VeloxWriter writer( + *rootPool_, rowVector->type(), std::move(writeFile), writerOptions); + writer.write(rowVector); + writer.close(); + + EXPECT_THAT( + actualChunkingDecisions, + ::testing::ElementsAreArray(expectedChunkingDecisions)); + + velox::InMemoryReadFile readFile(file); + nimble::VeloxReader reader(*leafPool_, &readFile); + validateChunkSize(reader, writerOptions.minStreamChunkRawSize); +} + INSTANTIATE_TEST_CASE_P( - RawStripeSizeFlushPolicyTestSuite, - RawStripeSizeFlushPolicyTest, + StripeRawSizeFlushPolicyTestSuite, + StripeRawSizeFlushPolicyTest, ::testing::Values( - RawStripeSizeFlushPolicyTestCase{ + StripeRawSizeFlushPolicyTestCase{ .batchCount = 50, .rawStripeSize = 256 << 10, .stripeCount = 4}, - RawStripeSizeFlushPolicyTestCase{ + StripeRawSizeFlushPolicyTestCase{ .batchCount = 100, .rawStripeSize = 256 << 10, .stripeCount = 7}, - RawStripeSizeFlushPolicyTestCase{ + StripeRawSizeFlushPolicyTestCase{ .batchCount = 100, .rawStripeSize = 256 << 11, .stripeCount = 4}, - RawStripeSizeFlushPolicyTestCase{ + StripeRawSizeFlushPolicyTestCase{ .batchCount = 100, .rawStripeSize = 256 << 12, .stripeCount = 2}, - RawStripeSizeFlushPolicyTestCase{ + StripeRawSizeFlushPolicyTestCase{ .batchCount = 100, .rawStripeSize = 256 << 20, .stripeCount = 1})); + +INSTANTIATE_TEST_CASE_P( + ChunkFlushPolicyTestSuite, + ChunkFlushPolicyTest, + ::testing::Values( + // Base case (no chunking, RawStripeSizeFlushPolicy) + ChunkFlushPolicyTestCase{ + .batchCount = 20, + .enableChunking = false, + .targetStripeSizeBytes = 250 << 10, // 250KB + .writerMemoryHighThresholdBytes = 80 << 10, + .writerMemoryLowThresholdBytes = 75 << 10, + .estimatedCompressionFactor = 1.3, + .minStreamChunkRawSize = 100, + .expectedStripeCount = 4, + .expectedMaxChunkCount = 1, + .expectedMinChunkCount = 1, + .chunkedStreamBatchSize = 2, + }, + // Baseline with default settings (has chunking) + ChunkFlushPolicyTestCase{ + .batchCount = 20, + .enableChunking = true, + .targetStripeSizeBytes = 250 << 10, // 250KB + .writerMemoryHighThresholdBytes = 80 << 10, + .writerMemoryLowThresholdBytes = 75 << 10, + .estimatedCompressionFactor = 1.3, + .minStreamChunkRawSize = 100, + .expectedStripeCount = 7, + .expectedMaxChunkCount = 2, + .expectedMinChunkCount = 1, + .chunkedStreamBatchSize = 2, + }, + // High memory regression threshold and no compression + // Produces file identical to RawStripeSizeFlushPolicy + ChunkFlushPolicyTestCase{ + .batchCount = 20, + .enableChunking = true, + .targetStripeSizeBytes = 250 << 10, // 250KB + .writerMemoryHighThresholdBytes = 500 + << 10, // 500KB (as opposed to 80 KB in other cases) + .writerMemoryLowThresholdBytes = 75 << 10, + .estimatedCompressionFactor = + 1.0, // No compression (as opposed to 1.3 in other cases) + .minStreamChunkRawSize = 100, + .expectedStripeCount = 4, + .expectedMaxChunkCount = 1, + .expectedMinChunkCount = 1, + .chunkedStreamBatchSize = 2, + }, + // Low memory regression threshold + // Produces file with more min chunks per stripe + ChunkFlushPolicyTestCase{ + .batchCount = 20, + .enableChunking = true, + .targetStripeSizeBytes = 250 << 10, + .writerMemoryHighThresholdBytes = 40 + << 10, // 40KB (as opposed to 80KB in other cases) + .writerMemoryLowThresholdBytes = 35 + << 10, // 35KB (as opposed to 75KB in other cases) + .estimatedCompressionFactor = 1.3, + .minStreamChunkRawSize = 100, + .expectedStripeCount = 10, + .expectedMaxChunkCount = 2, + .expectedMinChunkCount = 2, // +1 chunk + .chunkedStreamBatchSize = 2, + }, + // High target stripe size bytes (with disabled memory pressure + // optimization) produces fewer stripes. Single chunks. + ChunkFlushPolicyTestCase{ + .batchCount = 20, + .enableChunking = true, + .targetStripeSizeBytes = 900 + << 10, // 900KB (as opposed to 250KB in other cases) + .writerMemoryHighThresholdBytes = 2 + << 20, // 2MB (as opposed to 80KB in other cases) + .writerMemoryLowThresholdBytes = 1 + << 20, // 1MB (as opposed to 75KB in other cases) + .estimatedCompressionFactor = 1.3, + .minStreamChunkRawSize = 100, + .expectedStripeCount = 1, + .expectedMaxChunkCount = 1, + .expectedMinChunkCount = 1, + .chunkedStreamBatchSize = 2, + + }, + // Low target stripe size bytes (with disabled memory pressure + // optimization) produces more stripes. Single chunks. + ChunkFlushPolicyTestCase{ + .batchCount = 20, + .enableChunking = true, + .targetStripeSizeBytes = 90 + << 10, // 90KB (as opposed to 250KB in other cases) + .writerMemoryHighThresholdBytes = 2 + << 20, // 2MB (as opposed to 80KB in other cases) + .writerMemoryLowThresholdBytes = 1 + << 20, // 1MB (as opposed to 75KB in other cases) + .estimatedCompressionFactor = 1.3, + .minStreamChunkRawSize = 100, + .expectedStripeCount = 7, + .expectedMaxChunkCount = 1, + .expectedMinChunkCount = 1, + .chunkedStreamBatchSize = 2, + + }, + // Higher chunked stream batch size (no change in policy) + ChunkFlushPolicyTestCase{ + .batchCount = 20, + .enableChunking = true, + .targetStripeSizeBytes = 250 << 10, // 250KB + .writerMemoryHighThresholdBytes = 80 << 10, + .writerMemoryLowThresholdBytes = 75 << 10, + .estimatedCompressionFactor = 1.0, + .minStreamChunkRawSize = 100, + .expectedStripeCount = 7, + .expectedMaxChunkCount = 2, + .expectedMinChunkCount = 1, + .chunkedStreamBatchSize = 10})); } // namespace facebook