Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track the total number of compaction sorted runs inside BackgroundCallCompaction #13320

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
28 changes: 26 additions & 2 deletions db/db_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3300,11 +3300,14 @@ TEST_P(DBCompactionTestWithParam, CompressLevelCompaction) {
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ("1,4,8", FilesPerLevel(0));

ASSERT_EQ(matches, 12);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cbi42 the check for IsTrivialMove inside my GetNumberCompactionSortedRuns method messes up the count for InputCompressionMatchesOutput. I did not dig into why the original test values were the way they are, but I do not think my PR would break this functionality. For those who do understand, I updated the expected value to be the original expected value plus what I determined was added by GetNumberCompactionSortedRuns.

Not sure if you had to work with this test case before. Let me know if this is problematic or the comments can be reworded

// 12 of the matches come from GetNumberCompactionSortedRuns which calls
// IsTrivialMove(), which then calls InputCompressionMatchesOutput()
ASSERT_EQ(matches, 12 + 12);
// Currently, the test relies on the number of calls to
// InputCompressionMatchesOutput() per compaction.
const int kCallsToInputCompressionMatch = 2;
ASSERT_EQ(didnt_match, 8 * kCallsToInputCompressionMatch);
// Similarly, 8 of the didnt_match come from GetNumberCompactionSortedRuns
ASSERT_EQ(didnt_match, 8 + 8 * kCallsToInputCompressionMatch);
ASSERT_EQ(trivial_move, 12);
ASSERT_EQ(non_trivial, 8);

Expand Down Expand Up @@ -5856,6 +5859,18 @@ TEST_F(DBCompactionTest, CompactionStatsTest) {
options.listeners.emplace_back(collector);
DestroyAndReopen(options);

// Verify that the internal statistics for num_running_compactions and
// num_running_compaction_sorted_runs start and end at valid states
uint64_t num_running_compactions = 0;
ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kNumRunningCompactions,
&num_running_compactions));
ASSERT_EQ(num_running_compactions, 0);
uint64_t num_running_compaction_sorted_runs = 0;
ASSERT_TRUE(
db_->GetIntProperty(DB::Properties::kNumRunningCompactionSortedRuns,
&num_running_compaction_sorted_runs));
ASSERT_EQ(num_running_compaction_sorted_runs, 0);

for (int i = 0; i < 32; i++) {
for (int j = 0; j < 5000; j++) {
ASSERT_OK(Put(std::to_string(j), std::string(1, 'A')));
Expand All @@ -5869,6 +5884,15 @@ TEST_F(DBCompactionTest, CompactionStatsTest) {
ColumnFamilyData* cfd = cfh->cfd();

VerifyCompactionStats(*cfd, *collector);
// There should be no more running compactions, and thus no more input
// sorted runs
ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kNumRunningCompactions,
&num_running_compactions));
ASSERT_EQ(num_running_compactions, 0);
ASSERT_TRUE(
db_->GetIntProperty(DB::Properties::kNumRunningCompactionSortedRuns,
&num_running_compaction_sorted_runs));
ASSERT_EQ(num_running_compaction_sorted_runs, 0);
}

TEST_F(DBCompactionTest, SubcompactionEvent) {
Expand Down
1 change: 1 addition & 0 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
bg_bottom_compaction_scheduled_(0),
bg_compaction_scheduled_(0),
num_running_compactions_(0),
num_running_compaction_sorted_runs_(0),
bg_flush_scheduled_(0),
num_running_flushes_(0),
bg_purge_scheduled_(0),
Expand Down
16 changes: 8 additions & 8 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -950,13 +950,6 @@ class DBImpl : public DB {
return num_running_flushes_;
}

// Returns the number of currently running compactions.
// REQUIREMENT: mutex_ must be held when calling this function.
int num_running_compactions() {
mutex_.AssertHeld();
return num_running_compactions_;
}

const WriteController& write_controller() { return write_controller_; }

// hollow transactions shell used for recovery.
Expand Down Expand Up @@ -2420,7 +2413,8 @@ class DBImpl : public DB {
Status BackgroundCompaction(bool* madeProgress, JobContext* job_context,
LogBuffer* log_buffer,
PrepickedCompaction* prepicked_compaction,
Env::Priority thread_pri);
Env::Priority thread_pri,
int& num_compaction_sorted_runs_added);
Status BackgroundFlush(bool* madeProgress, JobContext* job_context,
LogBuffer* log_buffer, FlushReason* reason,
bool* flush_rescheduled_to_retain_udt,
Expand All @@ -2430,6 +2424,8 @@ class DBImpl : public DB {
const std::vector<CompactionInputFiles>& inputs,
bool* sfm_bookkeeping, LogBuffer* log_buffer);

size_t GetNumberCompactionSortedRuns(Compaction* c);

// Request compaction tasks token from compaction thread limiter.
// It always succeeds if force = true or limiter is disable.
bool RequestCompactionToken(ColumnFamilyData* cfd, bool force,
Expand Down Expand Up @@ -2963,6 +2959,10 @@ class DBImpl : public DB {
// stores the number of compactions are currently running
int num_running_compactions_;

// stores the number of sorted runs being processed by currently running
// compactions
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uint64_t or size_t makes more sense to me since num_running_compaction_input_iterators_ should not be negative. However, I wanted to follow the convention here since everything else is using int

int num_running_compaction_sorted_runs_;

// number of background memtable flush jobs, submitted to the HIGH pool
int bg_flush_scheduled_;

Expand Down
37 changes: 35 additions & 2 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,21 @@ bool DBImpl::EnoughRoomForCompaction(
return enough_room;
}

size_t DBImpl::GetNumberCompactionSortedRuns(Compaction* c) {
assert(c);
if (c->IsTrivialMove() || c->deletion_compaction()) {
return 0;
}
if (c->start_level() == 0) {
assert(0 < c->num_input_levels());
assert(c->level(0) == 0);
size_t num_l0_files = c->num_input_files(0);
size_t num_non_l0_levels = c->num_input_levels() - 1;
return num_l0_files + num_non_l0_levels;
}
return c->num_input_levels();
}

bool DBImpl::RequestCompactionToken(ColumnFamilyData* cfd, bool force,
std::unique_ptr<TaskLimiterToken>* token,
LogBuffer* log_buffer) {
Expand Down Expand Up @@ -3418,8 +3433,14 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
assert((bg_thread_pri == Env::Priority::BOTTOM &&
bg_bottom_compaction_scheduled_) ||
(bg_thread_pri == Env::Priority::LOW && bg_compaction_scheduled_));

// BackgroundCompaction will update the
// num_running_compaction_sorted_runs_ total and later we will subtract
// what was added
int num_compaction_sorted_runs_added = 0;
Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer,
prepicked_compaction, bg_thread_pri);
prepicked_compaction, bg_thread_pri,
num_compaction_sorted_runs_added);
TEST_SYNC_POINT("BackgroundCallCompaction:1");
if (s.IsBusy()) {
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
Expand Down Expand Up @@ -3484,6 +3505,10 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,

assert(num_running_compactions_ > 0);
num_running_compactions_--;
assert(num_running_compaction_sorted_runs_ >= 0);
assert(num_running_compaction_sorted_runs_ >=
num_compaction_sorted_runs_added);
num_running_compaction_sorted_runs_ -= num_compaction_sorted_runs_added;

if (bg_thread_pri == Env::Priority::LOW) {
bg_compaction_scheduled_--;
Expand Down Expand Up @@ -3522,11 +3547,13 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
}
}

// Precondition: mutex_ must be held when calling this function.
Status DBImpl::BackgroundCompaction(bool* made_progress,
JobContext* job_context,
LogBuffer* log_buffer,
PrepickedCompaction* prepicked_compaction,
Env::Priority thread_pri) {
Env::Priority thread_pri,
int& num_compaction_sorted_runs_added) {
ManualCompactionState* manual_compaction =
prepicked_compaction == nullptr
? nullptr
Expand Down Expand Up @@ -3720,6 +3747,12 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
num_files += each_level.files.size();
}
RecordInHistogram(stats_, NUM_FILES_IN_SINGLE_COMPACTION, num_files);
num_compaction_sorted_runs_added =
static_cast<int>(GetNumberCompactionSortedRuns(c.get()));
assert(num_compaction_sorted_runs_added >= 0);
assert(num_running_compaction_sorted_runs_ >= 0);
num_running_compaction_sorted_runs_ +=
num_compaction_sorted_runs_added;

// There are three things that can change compaction score:
// 1) When flush or compaction finish. This case is covered by
Expand Down
14 changes: 14 additions & 0 deletions db/internal_stats.cc
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,8 @@ static const std::string aggregated_table_properties =
static const std::string aggregated_table_properties_at_level =
aggregated_table_properties + "-at-level";
static const std::string num_running_compactions = "num-running-compactions";
static const std::string num_running_compaction_sorted_runs =
"num-running-compaction-sorted-runs";
static const std::string num_running_flushes = "num-running-flushes";
static const std::string actual_delayed_write_rate =
"actual-delayed-write-rate";
Expand Down Expand Up @@ -351,6 +353,8 @@ const std::string DB::Properties::kCompactionPending =
rocksdb_prefix + compaction_pending;
const std::string DB::Properties::kNumRunningCompactions =
rocksdb_prefix + num_running_compactions;
const std::string DB::Properties::kNumRunningCompactionSortedRuns =
rocksdb_prefix + num_running_compaction_sorted_runs;
const std::string DB::Properties::kNumRunningFlushes =
rocksdb_prefix + num_running_flushes;
const std::string DB::Properties::kBackgroundErrors =
Expand Down Expand Up @@ -580,6 +584,9 @@ const UnorderedMap<std::string, DBPropertyInfo>
{DB::Properties::kNumRunningCompactions,
{false, nullptr, &InternalStats::HandleNumRunningCompactions, nullptr,
nullptr}},
{DB::Properties::kNumRunningCompactionSortedRuns,
{false, nullptr, &InternalStats::HandleNumRunningCompactionSortedRuns,
nullptr, nullptr}},
{DB::Properties::kActualDelayedWriteRate,
{false, nullptr, &InternalStats::HandleActualDelayedWriteRate, nullptr,
nullptr}},
Expand Down Expand Up @@ -1265,6 +1272,13 @@ bool InternalStats::HandleNumRunningCompactions(uint64_t* value, DBImpl* db,
return true;
}

bool InternalStats::HandleNumRunningCompactionSortedRuns(uint64_t* value,
DBImpl* db,
Version* /*version*/) {
*value = db->num_running_compaction_sorted_runs_;
return true;
}

bool InternalStats::HandleBackgroundErrors(uint64_t* value, DBImpl* /*db*/,
Version* /*version*/) {
// Accumulated number of errors in background flushes or compactions.
Expand Down
2 changes: 2 additions & 0 deletions db/internal_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,8 @@ class InternalStats {
bool HandleCompactionPending(uint64_t* value, DBImpl* db, Version* version);
bool HandleNumRunningCompactions(uint64_t* value, DBImpl* db,
Version* version);
bool HandleNumRunningCompactionSortedRuns(uint64_t* value, DBImpl* db,
Version* version);
bool HandleBackgroundErrors(uint64_t* value, DBImpl* db, Version* version);
bool HandleCurSizeActiveMemTable(uint64_t* value, DBImpl* db,
Version* version);
Expand Down
4 changes: 4 additions & 0 deletions include/rocksdb/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -1210,6 +1210,10 @@ class DB {
// running compactions.
static const std::string kNumRunningCompactions;

// "rocksdb.num-running-compaction-sorted-runs" - returns the number of
// sorted runs being processed by currently running compactions.
static const std::string kNumRunningCompactionSortedRuns;

// "rocksdb.background-errors" - returns accumulated number of background
// errors.
static const std::string kBackgroundErrors;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add new statistic `num_running_compaction_sorted_runs` that tracks the number of sorted runs being processed by currently running compactions
Loading