Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dwio/nimble/common/MetricsLogger.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ struct FileCloseMetrics {
};

enum class LogOperation {
ChunkWrite,
StripeLoad,
StripeFlush,
FileClose,
Expand Down
34 changes: 27 additions & 7 deletions dwio/nimble/velox/FlushPolicy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,35 @@
#include "dwio/nimble/velox/FlushPolicy.h"

namespace facebook::nimble {

FlushDecision RawStripeSizeFlushPolicy::shouldFlush(
const StripeProgress& stripeProgress) {
return stripeProgress.rawStripeSize >= rawStripeSize_ ? FlushDecision::Stripe
: FlushDecision::None;
// Relieve memory pressure with chunking.
bool ChunkFlushPolicy::shouldChunk(const StripeProgress& stripeProgress) {
const uint64_t inMemoryBytes =
stripeProgress.stripeRawSize + stripeProgress.stripeEncodedSize;
const auto writerMemoryThreshold = (lastChunkDecision_ == false)
? config_.writerMemoryHighThresholdBytes
: config_.writerMemoryLowThresholdBytes;
lastChunkDecision_ = inMemoryBytes > writerMemoryThreshold;
return lastChunkDecision_;
}

void RawStripeSizeFlushPolicy::onClose() {
// No-op
// Optimize for expected storage stripe size.
bool ChunkFlushPolicy::shouldFlush(const StripeProgress& stripeProgress) {
// When chunking is unable to relieve memory pressure, we flush stripe.
if (stripeProgress.stripeRawSize + stripeProgress.stripeEncodedSize >
config_.writerMemoryHighThresholdBytes) {
return true;
}

double compressionFactor = config_.estimatedCompressionFactor;
// Use historical compression ratio as a heuristic when available.
if (stripeProgress.stripeEncodedSize > 0) {
compressionFactor =
static_cast<double>(stripeProgress.stripeEncodedLogicalSize) /
stripeProgress.stripeEncodedSize;
}
double expectedEncodedStripeSize = stripeProgress.stripeEncodedSize +
stripeProgress.stripeRawSize / std::max(compressionFactor, 1.0);
return (expectedEncodedStripeSize >= config_.targetStripeSizeBytes);
}

} // namespace facebook::nimble
84 changes: 57 additions & 27 deletions dwio/nimble/velox/FlushPolicy.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,59 +20,89 @@

namespace facebook::nimble {

// TODO: Set default values for these parameters based on DISCO experiments.
// Use abitrary values for now.
struct ChunkFlushPolicyConfig {
// Threshold to trigger chunking to relieve memory pressure
const uint64_t writerMemoryHighThresholdBytes{200 * 1024L * 1024L};
// Threshold below which chunking stops and stripe size optimization resumes
const uint64_t writerMemoryLowThresholdBytes{100 * 1024L * 1024L};
// Target size for encoded stripes
const uint64_t targetStripeSizeBytes{100 * 1024L * 1024L};
// Expected ratio of raw to encoded data
const double estimatedCompressionFactor{1.3};
};

struct StripeProgress {
// Size of the stripe data when it's fully decompressed and decoded
const uint64_t rawStripeSize;
const uint64_t stripeRawSize;
// Size of the stripe after buffered data is encoded and optionally compressed
const uint64_t stripeSize;
// Size of the allocated buffer in the writer
const uint64_t bufferSize;
};

enum class FlushDecision : uint8_t {
None = 0,
Stripe = 1,
Chunk = 2,
const uint64_t stripeEncodedSize;
// Logical size of the now encoded stripe data
const uint64_t stripeEncodedLogicalSize;
};

class FlushPolicy {
public:
virtual ~FlushPolicy() = default;
virtual FlushDecision shouldFlush(const StripeProgress& stripeProgress) = 0;
// Required for memory pressure coordination for now. Will remove in the
// future.
virtual void onClose() = 0;
virtual bool shouldFlush(const StripeProgress& stripeProgress) = 0;
virtual bool shouldChunk(const StripeProgress& stripeProgress) = 0;
};

class RawStripeSizeFlushPolicy final : public FlushPolicy {
class StripeRawSizeFlushPolicy final : public FlushPolicy {
public:
explicit RawStripeSizeFlushPolicy(uint64_t rawStripeSize)
: rawStripeSize_{rawStripeSize} {}
explicit StripeRawSizeFlushPolicy(uint64_t stripeRawSize)
: stripeRawSize_{stripeRawSize} {}

FlushDecision shouldFlush(const StripeProgress& stripeProgress) override;
bool shouldFlush(const StripeProgress& stripeProgress) override {
return stripeProgress.stripeRawSize >= stripeRawSize_;
}

void onClose() override;
bool shouldChunk(const StripeProgress&) override {
return false;
}

private:
const uint64_t rawStripeSize_;
const uint64_t stripeRawSize_;
};

class LambdaFlushPolicy : public FlushPolicy {
public:
explicit LambdaFlushPolicy(
std::function<FlushDecision(const StripeProgress&)> lambda)
: lambda_{lambda} {}
std::function<bool(const StripeProgress&)> flushLambda =
[](const StripeProgress&) { return false; },
std::function<bool(const StripeProgress&)> chunkLambda =
[](const StripeProgress&) { return false; })
: flushLambda_{std::move(flushLambda)},
chunkLambda_{std::move(chunkLambda)} {}

FlushDecision shouldFlush(const StripeProgress& stripeProgress) override {
return lambda_(stripeProgress);
bool shouldFlush(const StripeProgress& stripeProgress) override {
return flushLambda_(stripeProgress);
}

void onClose() override {
// No-op
bool shouldChunk(const StripeProgress& stripeProgress) override {
return chunkLambda_(stripeProgress);
}

private:
std::function<FlushDecision(const StripeProgress&)> lambda_;
std::function<bool(const StripeProgress&)> flushLambda_;
std::function<bool(const StripeProgress&)> chunkLambda_;
};

class ChunkFlushPolicy : public FlushPolicy {
public:
explicit ChunkFlushPolicy(ChunkFlushPolicyConfig config)
: config_{std::move(config)}, lastChunkDecision_{false} {}

// Optimize for expected storage stripe size.
bool shouldFlush(const StripeProgress& stripeProgress) override;

// Relieve memory pressure with chunking.
bool shouldChunk(const StripeProgress& stripeProgress) override;

private:
const ChunkFlushPolicyConfig config_;
bool lastChunkDecision_;
};

} // namespace facebook::nimble
121 changes: 121 additions & 0 deletions dwio/nimble/velox/StreamChunker.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "dwio/nimble/velox/StreamChunker.h"

namespace facebook::nimble {
bool omitStream(
const StreamData& streamData,
uint64_t minChunkSize,
bool isNullStream,
bool hasEmptyStreamContent,
bool isLastChunk) {
bool shouldChunkStream;
minChunkSize = isLastChunk ? 0 : minChunkSize;
if (isNullStream) {
// When all values are non-nulls, we omit the entire null stream.
shouldChunkStream =
streamData.hasNulls() && streamData.nonNulls().size() > minChunkSize;
if (!shouldChunkStream && !hasEmptyStreamContent) {
shouldChunkStream = !streamData.empty();
}
} else {
// When all values are null, the values stream is omitted.
shouldChunkStream = streamData.data().size() > minChunkSize;
if (!shouldChunkStream && !hasEmptyStreamContent) {
shouldChunkStream = !streamData.nonNulls().empty();
}
}

return !shouldChunkStream;
}

template <typename T>
std::unique_ptr<StreamChunker> getStreamChunkerTyped(
StreamData& streamData,
uint64_t maxChunkSize,
uint64_t minChunkSize,
bool ensureFullChunks,
bool emptyStreamContent,
bool isNullStream,
bool isLastChunk) {
const auto& streamDataType = streamData.type();
if (streamDataType == ContentStreamData<T>::TYPE_NAME) {
return std::make_unique<ContentStreamChunker<T>>(
static_cast<ContentStreamData<T>&>(streamData),
maxChunkSize,
minChunkSize,
ensureFullChunks,
isLastChunk);
} else if (streamDataType == NullsStreamData::TYPE_NAME) {
return std::make_unique<NullsStreamChunker>(
static_cast<NullsStreamData&>(streamData),
maxChunkSize,
minChunkSize,
ensureFullChunks,
emptyStreamContent,
isNullStream,
isLastChunk);
} else if (streamDataType == NullableContentStreamData<T>::TYPE_NAME) {
return std::make_unique<NullableContentStreamChunker<T>>(
static_cast<NullableContentStreamData<T>&>(streamData),
maxChunkSize,
minChunkSize,
ensureFullChunks,
emptyStreamContent,
isNullStream,
isLastChunk);
}
NIMBLE_UNREACHABLE(
fmt::format("Unsupported streamData type {}", streamDataType))
}

std::unique_ptr<StreamChunker> getStreamChunker(
StreamData& streamData,
uint64_t maxChunkSize,
uint64_t minChunkSize,
bool ensureFullChunks,
bool emptyStreamContent,
bool isNullStream,
bool isLastChunk) {
const auto scalarKind = streamData.descriptor().scalarKind();
switch (scalarKind) {
#define HANDLE_SCALAR_KIND(kind, type) \
case ScalarKind::kind: \
return getStreamChunkerTyped<type>( \
streamData, \
maxChunkSize, \
minChunkSize, \
ensureFullChunks, \
emptyStreamContent, \
isNullStream, \
isLastChunk);
HANDLE_SCALAR_KIND(Bool, bool);
HANDLE_SCALAR_KIND(Int8, int8_t);
HANDLE_SCALAR_KIND(Int16, int16_t);
HANDLE_SCALAR_KIND(Int32, int32_t);
HANDLE_SCALAR_KIND(UInt32, uint32_t);
HANDLE_SCALAR_KIND(Int64, int64_t);
HANDLE_SCALAR_KIND(Float, float);
HANDLE_SCALAR_KIND(Double, double);
HANDLE_SCALAR_KIND(String, std::string_view);
HANDLE_SCALAR_KIND(Binary, std::string_view);
default:
NIMBLE_UNREACHABLE(
fmt::format("Unsupported scalar kind {}", toString(scalarKind)));
}
}
} // namespace facebook::nimble
Loading
Loading