Skip to content

Commit

Permalink
Track the total number of compaction sorted runs from inside Compacti…
Browse files Browse the repository at this point in the history
…onMergingIterator (#13325)

Summary:
**This PR adds a new statistic to track the total number of sorted runs for running compactions.**

Context: I am currently working on a separate project, where I am trying to tune the read request sizes made by `FilePrefetchBuffer` to the storage backend. In this particular case, `FilePrefetchBuffer` will issue larger reads and have to buffer larger read responses. This means we expect to see higher memory utilization. At least for the initial rollout, we only want to enable this optimization for compaction reads.

**I want some way to get a sense of what the memory usage _impact_ will be if the prefetch read request size is increased from (for instance) 8MB to 64MB.**

**If I know the number of files that compactions are actively reading from (i.e. the number of sorted runs / "input iterators"), I can determine how much the memory usage will increase if I bump up the readahead size inside `FilePrefetchBuffer`.** For instance, if there are 16 sorted runs at any given point in time and I bump up the readahead size by 64MB, I can project an increase of 16 * 64 MB.

In most cases, the number of sorted runs processed per compaction is the number of L0 files plus the number of non-L0 levels. However, we need to be aware of exceptions like trivial compactions, deletion compactions, and subcompactions. This is a major reason why this PR chooses to implement the stats counting inside `CompactionMergingIterator`, since by the time we get down to that part of the stack, we know the "true" values for the number of input iterators / sorted runs.

Alternatives considered:
- #13299 gives you a histogram for the number of sorted runs ("input iterators") for a _single compaction_. While this statistic is interested and in the direction of what we want, we are going to be assessing the memory impact across _all_ compactions that are currently running. Thus, this statistic does not give us all the information we need.
- #13302 gives you the total prefetch buffer memory usage, but it doesn't tell you what happens when the readahead size is increased. Furthermore, the code change is error prone and very "invasive" -- look at how many places in the code had to be updated. This would be useful in the future for general memory accounting purposes, but it does not serve our immediate needs.
- #13320 aimed to track the same metric, but did this inside `DbImpl:: BackgroundCallCompaction`. It turns out that this does not handle the case where a compaction is divided into multiple subcompactions (in which case, there would be _more_ sorted runs being processed at the same time than you would otherwise predict.) The current PR handles subcompactions automatically, and I think it is cleaner overall.

Note: When I attempted to put this statistic as part of the `cf_stats_value_` array, even after updating the array to use `std::atomic<uint64_t>`, I still was able to get assertions to _fail_ inside the crash tests. These assertions checked that the unsigned integer would not underflow below zero during compaction. I experimented for many hours but could not figure out a solution, even though it would seem like things "should" work with `fetch_add` and `fetch_sub`. One possibility is that the values in `cf_stats_value_` are being cleared to 0, but I added a `fprintf` to that portion of the code and didn't see it getting printed out before my assertions failed. Regardless, I think that this statistic is different enough from the CF-specific and the other DB-wide stats that the best solution is to just have it defined as a separate `std::atomic<uint64_t>`. I also do not want to spend more hours trying to debug why the crash test assertions break, when the solution in the current version of the PR can get the assertions to consistently pass.

Pull Request resolved: #13325

Test Plan:
- I updated one unit test to confirm that `num_running_compaction_sorted_runs` starts and ends at 0. This checks that all the additions and subtractions cancel out. I also made sure the statistic got incremented at least once.
- When I added `fprintf` manually, I confirmed that my statistics updating code was being exercised numerous times inside `db_compaction_test`. I printed out the results before and after the increments/decrements, and the numbers looked good.
- We will monitor the generated statistics after this PR is merged.
- There are assertion checks after each increment and before each decrement. If there are bugs, the crash test will almost certainly find them, since they quickly found issues with my initial implementation for this PR which tried using the `cf_stats_value_` array (modified to use `std::atomic`).

Reviewed By: anand1976, hx235

Differential Revision: D68527895

Pulled By: archang19

fbshipit-source-id: 135cf210e0ff1550ea28ae4384d429ae620b1784
  • Loading branch information
archang19 authored and facebook-github-bot committed Feb 6, 2025
1 parent 354025f commit 62531da
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 14 deletions.
33 changes: 33 additions & 0 deletions db/db_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5856,6 +5856,26 @@ TEST_F(DBCompactionTest, CompactionStatsTest) {
options.listeners.emplace_back(collector);
DestroyAndReopen(options);

// Verify that the internal statistics for num_running_compactions and
// num_running_compaction_sorted_runs start and end at valid states
uint64_t num_running_compactions = 0;
ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kNumRunningCompactions,
&num_running_compactions));
ASSERT_EQ(num_running_compactions, 0);
uint64_t num_running_compaction_sorted_runs = 0;
ASSERT_TRUE(
db_->GetIntProperty(DB::Properties::kNumRunningCompactionSortedRuns,
&num_running_compaction_sorted_runs));
ASSERT_EQ(num_running_compaction_sorted_runs, 0);
// Check that the stat actually gets changed some time between the start and
// end of compaction
std::atomic<bool> sorted_runs_count_incremented = false;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionMergingIterator::UpdateInternalStats",
[&](void*) { sorted_runs_count_incremented = true; });

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

for (int i = 0; i < 32; i++) {
for (int j = 0; j < 5000; j++) {
ASSERT_OK(Put(std::to_string(j), std::string(1, 'A')));
Expand All @@ -5869,6 +5889,19 @@ TEST_F(DBCompactionTest, CompactionStatsTest) {
ColumnFamilyData* cfd = cfh->cfd();

VerifyCompactionStats(*cfd, *collector);
// There should be no more running compactions, and thus no more sorted runs
// to process
ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kNumRunningCompactions,
&num_running_compactions));
ASSERT_EQ(num_running_compactions, 0);
ASSERT_TRUE(
db_->GetIntProperty(DB::Properties::kNumRunningCompactionSortedRuns,
&num_running_compaction_sorted_runs));
ASSERT_EQ(num_running_compaction_sorted_runs, 0);
ASSERT_TRUE(sorted_runs_count_incremented);

SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}

TEST_F(DBCompactionTest, SubcompactionEvent) {
Expand Down
7 changes: 0 additions & 7 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -949,13 +949,6 @@ class DBImpl : public DB {
return num_running_flushes_;
}

// Returns the number of currently running compactions.
// REQUIREMENT: mutex_ must be held when calling this function.
int num_running_compactions() {
mutex_.AssertHeld();
return num_running_compactions_;
}

const WriteController& write_controller() { return write_controller_; }

// hollow transactions shell used for recovery.
Expand Down
1 change: 1 addition & 0 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3522,6 +3522,7 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
}
}

// Precondition: mutex_ must be held when calling this function.
Status DBImpl::BackgroundCompaction(bool* made_progress,
JobContext* job_context,
LogBuffer* log_buffer,
Expand Down
20 changes: 20 additions & 0 deletions db/internal_stats.cc
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,8 @@ static const std::string aggregated_table_properties =
static const std::string aggregated_table_properties_at_level =
aggregated_table_properties + "-at-level";
static const std::string num_running_compactions = "num-running-compactions";
static const std::string num_running_compaction_sorted_runs =
"num-running-compaction-sorted-runs";
static const std::string num_running_flushes = "num-running-flushes";
static const std::string actual_delayed_write_rate =
"actual-delayed-write-rate";
Expand Down Expand Up @@ -351,6 +353,8 @@ const std::string DB::Properties::kCompactionPending =
rocksdb_prefix + compaction_pending;
const std::string DB::Properties::kNumRunningCompactions =
rocksdb_prefix + num_running_compactions;
const std::string DB::Properties::kNumRunningCompactionSortedRuns =
rocksdb_prefix + num_running_compaction_sorted_runs;
const std::string DB::Properties::kNumRunningFlushes =
rocksdb_prefix + num_running_flushes;
const std::string DB::Properties::kBackgroundErrors =
Expand Down Expand Up @@ -580,6 +584,9 @@ const UnorderedMap<std::string, DBPropertyInfo>
{DB::Properties::kNumRunningCompactions,
{false, nullptr, &InternalStats::HandleNumRunningCompactions, nullptr,
nullptr}},
{DB::Properties::kNumRunningCompactionSortedRuns,
{false, nullptr, &InternalStats::HandleNumRunningCompactionSortedRuns,
nullptr, nullptr}},
{DB::Properties::kActualDelayedWriteRate,
{false, nullptr, &InternalStats::HandleActualDelayedWriteRate, nullptr,
nullptr}},
Expand Down Expand Up @@ -636,6 +643,7 @@ InternalStats::InternalStats(int num_levels, SystemClock* clock,
file_read_latency_(num_levels),
has_cf_change_since_dump_(true),
bg_error_count_(0),
num_running_compaction_sorted_runs_(0),
number_levels_(num_levels),
clock_(clock),
cfd_(cfd),
Expand Down Expand Up @@ -1265,6 +1273,18 @@ bool InternalStats::HandleNumRunningCompactions(uint64_t* value, DBImpl* db,
return true;
}

bool InternalStats::HandleNumRunningCompactionSortedRuns(uint64_t* value,
DBImpl* db,
Version* /*version*/) {
db->mutex()->AssertHeld();
uint64_t sorted_runs = 0;
for (auto* cfd : *db->versions_->GetColumnFamilySet()) {
sorted_runs += cfd->internal_stats()->NumRunningCompactionSortedRuns();
}
*value = sorted_runs;
return true;
}

bool InternalStats::HandleBackgroundErrors(uint64_t* value, DBImpl* /*db*/,
Version* /*version*/) {
// Accumulated number of errors in background flushes or compactions.
Expand Down
22 changes: 22 additions & 0 deletions db/internal_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,20 @@ class InternalStats {
++cf_stats_count_[type];
}

void IncrNumRunningCompactionSortedRuns(uint64_t value) {
num_running_compaction_sorted_runs_.fetch_add(value,
std::memory_order_relaxed);
}

void DecrNumRunningCompactionSortedRuns(uint64_t value) {
num_running_compaction_sorted_runs_.fetch_sub(value,
std::memory_order_relaxed);
}

uint64_t NumRunningCompactionSortedRuns() {
return num_running_compaction_sorted_runs_.load(std::memory_order_relaxed);
}

void AddDBStats(InternalDBStatsType type, uint64_t value,
bool concurrent = false) {
auto& v = db_stats_[type];
Expand Down Expand Up @@ -847,6 +861,8 @@ class InternalStats {
bool HandleCompactionPending(uint64_t* value, DBImpl* db, Version* version);
bool HandleNumRunningCompactions(uint64_t* value, DBImpl* db,
Version* version);
bool HandleNumRunningCompactionSortedRuns(uint64_t* value, DBImpl* db,
Version* version);
bool HandleBackgroundErrors(uint64_t* value, DBImpl* db, Version* version);
bool HandleCurSizeActiveMemTable(uint64_t* value, DBImpl* db,
Version* version);
Expand Down Expand Up @@ -921,6 +937,12 @@ class InternalStats {
// or compaction will cause the counter to increase too.
uint64_t bg_error_count_;

// This is a rolling count of the number of sorted runs being processed by
// currently running compactions. Other metrics are only incremented, but this
// metric is also decremented. Additionally, we also do not want to reset this
// count to zero at a periodic interval.
std::atomic<uint64_t> num_running_compaction_sorted_runs_;

const int number_levels_;
SystemClock* clock_;
ColumnFamilyData* cfd_;
Expand Down
3 changes: 2 additions & 1 deletion db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7110,7 +7110,8 @@ InternalIterator* VersionSet::MakeInputIterator(
assert(num <= space);
InternalIterator* result = NewCompactionMergingIterator(
&c->column_family_data()->internal_comparator(), list,
static_cast<int>(num), range_tombstones);
static_cast<int>(num), range_tombstones, /*arena=*/nullptr,
c->column_family_data()->internal_stats());
delete[] list;
return result;
}
Expand Down
4 changes: 4 additions & 0 deletions include/rocksdb/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -1210,6 +1210,10 @@ class DB {
// running compactions.
static const std::string kNumRunningCompactions;

// "rocksdb.num-running-compaction-sorted-runs" - returns the number of
// sorted runs being processed by currently running compactions.
static const std::string kNumRunningCompactionSortedRuns;

// "rocksdb.background-errors" - returns accumulated number of background
// errors.
static const std::string kBackgroundErrors;
Expand Down
36 changes: 31 additions & 5 deletions table/compaction_merging_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
// (found in the LICENSE.Apache file in the root directory).
#include "table/compaction_merging_iterator.h"

#include "db/internal_stats.h"

namespace ROCKSDB_NAMESPACE {
class CompactionMergingIterator : public InternalIterator {
public:
Expand All @@ -13,12 +15,15 @@ class CompactionMergingIterator : public InternalIterator {
int n, bool is_arena_mode,
std::vector<std::pair<std::unique_ptr<TruncatedRangeDelIterator>,
std::unique_ptr<TruncatedRangeDelIterator>**>>&
range_tombstones)
range_tombstones,
InternalStats* internal_stats)
: is_arena_mode_(is_arena_mode),
comparator_(comparator),
current_(nullptr),
minHeap_(CompactionHeapItemComparator(comparator_)),
pinned_iters_mgr_(nullptr) {
pinned_iters_mgr_(nullptr),
internal_stats_(internal_stats),
num_sorted_runs_recorded_(0) {
children_.resize(n);
for (int i = 0; i < n; i++) {
children_[i].level = i;
Expand All @@ -38,6 +43,17 @@ class CompactionMergingIterator : public InternalIterator {
pinned_heap_item_[i].level = i;
pinned_heap_item_[i].type = HeapItem::DELETE_RANGE_START;
}
if (internal_stats_) {
TEST_SYNC_POINT("CompactionMergingIterator::UpdateInternalStats");
// The size of children_ or range_tombstone_iters_ (n) should not change
// but to be safe, we can record the size here so we decrement by the
// correct amount at destruction time
num_sorted_runs_recorded_ = n;
internal_stats_->IncrNumRunningCompactionSortedRuns(
num_sorted_runs_recorded_);
assert(num_sorted_runs_recorded_ <=
internal_stats_->NumRunningCompactionSortedRuns());
}
}

void considerStatus(const Status& s) {
Expand All @@ -47,6 +63,14 @@ class CompactionMergingIterator : public InternalIterator {
}

~CompactionMergingIterator() override {
if (internal_stats_) {
assert(num_sorted_runs_recorded_ == range_tombstone_iters_.size());
assert(num_sorted_runs_recorded_ <=
internal_stats_->NumRunningCompactionSortedRuns());
internal_stats_->DecrNumRunningCompactionSortedRuns(
num_sorted_runs_recorded_);
}

range_tombstone_iters_.clear();

for (auto& child : children_) {
Expand Down Expand Up @@ -208,6 +232,8 @@ class CompactionMergingIterator : public InternalIterator {
Status status_;
CompactionMinHeap minHeap_;
PinnedIteratorsManager* pinned_iters_mgr_;
InternalStats* internal_stats_;
uint64_t num_sorted_runs_recorded_;
// Process a child that is not in the min heap.
// If valid, add to the min heap. Otherwise, check status.
void AddToMinHeapOrCheckStatus(HeapItem*);
Expand Down Expand Up @@ -350,20 +376,20 @@ InternalIterator* NewCompactionMergingIterator(
std::vector<std::pair<std::unique_ptr<TruncatedRangeDelIterator>,
std::unique_ptr<TruncatedRangeDelIterator>**>>&
range_tombstone_iters,
Arena* arena) {
Arena* arena, InternalStats* stats) {
assert(n >= 0);
if (n == 0) {
return NewEmptyInternalIterator<Slice>(arena);
} else {
if (arena == nullptr) {
return new CompactionMergingIterator(comparator, children, n,
false /* is_arena_mode */,
range_tombstone_iters);
range_tombstone_iters, stats);
} else {
auto mem = arena->AllocateAligned(sizeof(CompactionMergingIterator));
return new (mem) CompactionMergingIterator(comparator, children, n,
true /* is_arena_mode */,
range_tombstone_iters);
range_tombstone_iters, stats);
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion table/compaction_merging_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ namespace ROCKSDB_NAMESPACE {
* two APIs for clarity.
*/
class CompactionMergingIterator;
class InternalStats;

InternalIterator* NewCompactionMergingIterator(
const InternalKeyComparator* comparator, InternalIterator** children, int n,
std::vector<std::pair<std::unique_ptr<TruncatedRangeDelIterator>,
std::unique_ptr<TruncatedRangeDelIterator>**>>&
range_tombstone_iters,
Arena* arena = nullptr);
Arena* arena = nullptr, InternalStats* stats = nullptr);
} // namespace ROCKSDB_NAMESPACE
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add new DB property `num_running_compaction_sorted_runs` that tracks the number of sorted runs being processed by currently running compactions

0 comments on commit 62531da

Please sign in to comment.