Skip to content

Commit

Permalink
Fix GetMergeOperands in ReadOnlyDB and SecondaryDB (#13340)
Browse files Browse the repository at this point in the history
Summary:
Fixing the GetMergeOperands() in ReadOnlyDB and SecondaryDB as reported in #13243. Refactor in #11799 introduced this regression.

Follow ups to come
- Large Result Optimization (done in #10458 ) for ReadOnlyDB and SecondaryDB
- Stress Test / Crash Test coverage
- Consider removing some duplicate logic between ReadOnlyDB's GetImpl() and SecondaryDB's `GetImpl()`. The only difference is between acquiring/referencing Superversion.

Pull Request resolved: #13340

Test Plan:
`DBMergeOperandTest` and `DBSecondaryTest` updated

```
./db_merge_operand_test --gtest_filter="*GetMergeOperandsBasic*"
```
```
./db_secondary_test -- --gtest_filter="*GetMergeOperands*"
```

Reviewed By: ltamasi

Differential Revision: D68791652

Pulled By: jaykorean

fbshipit-source-id: 760925e257ab10993c207094718dc0659822ae64
  • Loading branch information
jaykorean authored and facebook-github-bot committed Jan 29, 2025
1 parent f37ce33 commit 2e0dc21
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 5 deletions.
15 changes: 14 additions & 1 deletion db/db_impl/db_impl_readonly.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ Status DBImplReadOnly::GetImpl(const ReadOptions& read_options,
const Slice& key,
GetImplOptions& get_impl_options) {
assert(get_impl_options.value != nullptr ||
get_impl_options.columns != nullptr);
get_impl_options.columns != nullptr ||
get_impl_options.merge_operands != nullptr);
assert(get_impl_options.column_family);

Status s;
Expand Down Expand Up @@ -86,7 +87,11 @@ Status DBImplReadOnly::GetImpl(const ReadOptions& read_options,
return s;
}
}
// Prepare to store a list of merge operations if merge occurs.
MergeContext merge_context;
// TODO - Large Result Optimization for Read Only DB
// (https://github.com/facebook/rocksdb/pull/10458)

SequenceNumber max_covering_tombstone_seq = 0;
LookupKey lkey(key, snapshot, read_options.timestamp);
PERF_TIMER_STOP(get_snapshot_time);
Expand Down Expand Up @@ -121,6 +126,14 @@ Status DBImplReadOnly::GetImpl(const ReadOptions& read_options,
size = get_impl_options.value->size();
} else if (get_impl_options.columns) {
size = get_impl_options.columns->serialized_size();
} else if (get_impl_options.merge_operands) {
*get_impl_options.number_of_operands =
static_cast<int>(merge_context.GetNumOperands());
for (const Slice& sl : merge_context.GetOperands()) {
size += sl.size();
get_impl_options.merge_operands->PinSelf(sl);
get_impl_options.merge_operands++;
}
}
RecordTick(stats_, BYTES_READ, size);
RecordInHistogram(stats_, BYTES_PER_READ, size);
Expand Down
14 changes: 13 additions & 1 deletion db/db_impl/db_impl_secondary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,8 @@ Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
const Slice& key,
GetImplOptions& get_impl_options) {
assert(get_impl_options.value != nullptr ||
get_impl_options.columns != nullptr);
get_impl_options.columns != nullptr ||
get_impl_options.merge_operands != nullptr);
assert(get_impl_options.column_family);

Status s;
Expand Down Expand Up @@ -397,6 +398,9 @@ Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
}
}
MergeContext merge_context;
// TODO - Large Result Optimization for Secondary DB
// (https://github.com/facebook/rocksdb/pull/10458)

SequenceNumber max_covering_tombstone_seq = 0;
LookupKey lkey(key, snapshot, read_options.timestamp);
PERF_TIMER_STOP(get_snapshot_time);
Expand Down Expand Up @@ -451,6 +455,14 @@ Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
size = get_impl_options.value->size();
} else if (get_impl_options.columns) {
size = get_impl_options.columns->serialized_size();
} else if (get_impl_options.merge_operands) {
*get_impl_options.number_of_operands =
static_cast<int>(merge_context.GetNumOperands());
for (const Slice& sl : merge_context.GetOperands()) {
size += sl.size();
get_impl_options.merge_operands->PinSelf(sl);
get_impl_options.merge_operands++;
}
}
RecordTick(stats_, BYTES_READ, size);
RecordTimeToHistogram(stats_, BYTES_PER_READ, size);
Expand Down
21 changes: 20 additions & 1 deletion db/db_merge_operand_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,11 @@ TEST_F(DBMergeOperandTest, FlushedMergeOperandReadAfterFreeBug) {

TEST_F(DBMergeOperandTest, GetMergeOperandsBasic) {
Options options = CurrentOptions();

int limit = 2;
// Use only the latest two merge operands.
options.merge_operator = std::make_shared<LimitedStringAppendMergeOp>(2, ',');
options.merge_operator =
std::make_shared<LimitedStringAppendMergeOp>(limit, ',');
Reopen(options);
int num_records = 4;
int number_of_operands = 0;
Expand Down Expand Up @@ -299,9 +302,25 @@ TEST_F(DBMergeOperandTest, GetMergeOperandsBasic) {
ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
"k5", values.data(), &merge_operands_info,
&number_of_operands));
ASSERT_EQ(number_of_operands, 4);
ASSERT_EQ(values[0], "remember");
ASSERT_EQ(values[1], "i");
ASSERT_EQ(values[2], "am");
ASSERT_EQ(values[3], "rocks");

// GetMergeOperands() in ReadOnly DB
ASSERT_OK(Merge("k6", "better"));
ASSERT_OK(Merge("k6", "call"));
ASSERT_OK(Merge("k6", "saul"));

ASSERT_OK(ReadOnlyReopen(options));
std::vector<PinnableSlice> readonly_values(num_records);
ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
"k6", readonly_values.data(),
&merge_operands_info, &number_of_operands));
ASSERT_EQ(number_of_operands, limit);
ASSERT_EQ(readonly_values[0], "call");
ASSERT_EQ(readonly_values[1], "saul");
}

TEST_F(DBMergeOperandTest, BlobDBGetMergeOperandsBasic) {
Expand Down
40 changes: 38 additions & 2 deletions db/db_secondary_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@
#include "rocksdb/utilities/transaction_db.h"
#include "test_util/sync_point.h"
#include "test_util/testutil.h"
#include "utilities/fault_injection_env.h"
#include "utilities/merge_operators/string_append/stringappend2.h"

namespace ROCKSDB_NAMESPACE {

class DBSecondaryTestBase : public DBBasicTestWithTimestampBase {
public:
explicit DBSecondaryTestBase(const std::string& dbname)
Expand Down Expand Up @@ -331,6 +330,43 @@ TEST_F(DBSecondaryTest, InternalCompactionMultiLevels) {
// cfh, input1, &result));
}

TEST_F(DBSecondaryTest, GetMergeOperands) {
Options options;
options.merge_operator = MergeOperators::CreateStringAppendOperator();
options.env = env_;
Reopen(options);

ASSERT_OK(Merge("k1", "v1"));
ASSERT_OK(Merge("k1", "v2"));
ASSERT_OK(Merge("k1", "v3"));
ASSERT_OK(Merge("k1", "v4"));

options.max_open_files = -1;
OpenSecondary(options);

ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());

int num_records = 4;
int number_of_operands = 0;
std::vector<PinnableSlice> values(num_records);
GetMergeOperandsOptions merge_operands_info;
merge_operands_info.expected_max_number_of_operands = num_records;

auto cfh = db_secondary_->DefaultColumnFamily();

const Status s = db_secondary_->GetMergeOperands(
ReadOptions(), cfh, "k1", values.data(), &merge_operands_info,
&number_of_operands);
ASSERT_NOK(s);
ASSERT_TRUE(s.IsMergeInProgress());

ASSERT_EQ(number_of_operands, 4);
ASSERT_EQ(values[0].ToString(), "v1");
ASSERT_EQ(values[1].ToString(), "v2");
ASSERT_EQ(values[2].ToString(), "v3");
ASSERT_EQ(values[3].ToString(), "v4");
}

TEST_F(DBSecondaryTest, InternalCompactionCompactedFiles) {
Options options;
options.env = env_;
Expand Down
1 change: 1 addition & 0 deletions unreleased_history/bug_fixes/get_merge_operand_fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed GetMergeOperands() API in ReadOnlyDB and SecondaryDB

0 comments on commit 2e0dc21

Please sign in to comment.