Skip to content

Commit

Permalink
Cleanup copy logic, add rewriteLiveToInitEntries to BucketList, test …
Browse files Browse the repository at this point in the history
…conversion of lowest level via merge in BucketListTests
  • Loading branch information
ThomasBrady committed Nov 25, 2024
1 parent 3956cd5 commit a03be99
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 63 deletions.
5 changes: 3 additions & 2 deletions src/bucket/Bucket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,8 @@ Bucket::merge(BucketManager& bucketManager, uint32_t maxProtocolVersion,
std::shared_ptr<Bucket> const& newBucket,
std::vector<std::shared_ptr<Bucket>> const& shadows,
bool keepDeadEntries, bool countMergeEvents,
asio::io_context& ctx, bool doFsync)
asio::io_context& ctx, bool doFsync,
bool rewriteLiveToInitEntries)
{
ZoneScoped;
// This is the key operation in the scheme: merging two (read-only)
Expand All @@ -809,7 +810,7 @@ Bucket::merge(BucketManager& bucketManager, uint32_t maxProtocolVersion,
BucketMetadata meta;
meta.ledgerVersion = protocolVersion;
BucketOutputIterator out(bucketManager.getTmpDir(), keepDeadEntries, meta,
mc, ctx, doFsync);
mc, ctx, doFsync, rewriteLiveToInitEntries);

BucketEntryIdCmp cmp;
size_t iter = 0;
Expand Down
2 changes: 1 addition & 1 deletion src/bucket/Bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class Bucket : public std::enable_shared_from_this<Bucket>,
std::shared_ptr<Bucket> const& newBucket,
std::vector<std::shared_ptr<Bucket>> const& shadows,
bool keepDeadEntries, bool countMergeEvents, asio::io_context& ctx,
bool doFsync);
bool doFsync, bool rewriteLiveToInitEntries = false);

static uint32_t getBucketVersion(std::shared_ptr<Bucket> const& bucket);
static uint32_t
Expand Down
6 changes: 6 additions & 0 deletions src/bucket/BucketList.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,12 @@ BucketList::keepDeadEntries(uint32_t level)
return level < BucketList::kNumLevels - 1;
}

bool
BucketList::rewriteLiveToInitEntries(uint32_t level)
{
return level == BucketList::kNumLevels - 1;
}

BucketLevel const&
BucketList::getLevel(uint32_t i) const
{
Expand Down
4 changes: 4 additions & 0 deletions src/bucket/BucketList.h
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,10 @@ class BucketList
// Returns true if at given `level` dead entries should be kept.
static bool keepDeadEntries(uint32_t level);

// Returns true if at given `level` live entries should be rewritten to
// init entries.
static bool rewriteLiveToInitEntries(uint32_t level);

// Number of ledgers it takes a bucket to spill/receive an incoming spill
static uint32_t bucketUpdatePeriod(uint32_t level, bool isCurr);

Expand Down
50 changes: 26 additions & 24 deletions src/bucket/BucketOutputIterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@ BucketOutputIterator::BucketOutputIterator(std::string const& tmpDir,
bool keepDeadEntries,
BucketMetadata const& meta,
MergeCounters& mc,
asio::io_context& ctx, bool doFsync)
asio::io_context& ctx, bool doFsync,
bool rewriteLiveToInitEntries)
: mFilename(Bucket::randomBucketName(tmpDir))
, mOut(ctx, doFsync)
, mCtx(ctx)
, mBuf(nullptr)
, mKeepDeadEntries(keepDeadEntries)
, mMeta(meta)
, mMergeCounters(mc)
, mRewriteLiveToInitEntries(rewriteLiveToInitEntries)
{
ZoneScoped;
CLOG_TRACE(Bucket, "BucketOutputIterator opening file to write: {}",
Expand Down Expand Up @@ -67,36 +69,17 @@ BucketOutputIterator::put(BucketEntry const& e)
++mMergeCounters.mOutputIteratorTombstoneElisions;
return;
}
std::optional<BucketEntry> maybeInitEntry;
if (!mKeepDeadEntries && e.type() == LIVEENTRY &&
protocolVersionStartsFrom(
mMeta.ledgerVersion,
Bucket::FIRST_PROTOCOL_SUPPORTING_INITENTRY_AND_METAENTRY))
{
// If mKeepDeadEntries is false (lowest level),
// we also want to convert the LIVEENTRY to an INITENTRY.
// This is because each level of the bucket list contains
// only one entry per key, and per CAP-0020, INIT ENTRY
// implies that either no entry with the same ledger key
// exists in an older bucket. Therefore, all entries of type
// LIVEENTRY in the lowest level are also of type INITENTRY.
++mMergeCounters.mOutputIteratorLiveToInitConversions;
// Make a copy of e and set the type of the new entry to INITENTRY.
maybeInitEntry.emplace(e);
maybeInitEntry->type(INITENTRY);
}

// Check to see if there's an existing buffered entry.
if (mBuf)
{
// mCmp(e, *mBuf) means e < *mBuf; this should never be true since
// it would mean that we're getting entries out of order.
releaseAssert(
!mCmp(maybeInitEntry.has_value() ? *maybeInitEntry : e, *mBuf));
releaseAssert(!mCmp(e, *mBuf));

// Check to see if the new entry should flush (greater identity), or
// merely replace (same identity), the buffered entry.
if (mCmp(*mBuf, maybeInitEntry.has_value() ? *maybeInitEntry : e))
if (mCmp(*mBuf, e))
{
++mMergeCounters.mOutputIteratorActualWrites;
mOut.writeOne(*mBuf, &mHasher, &mBytesPut);
Expand All @@ -108,9 +91,28 @@ BucketOutputIterator::put(BucketEntry const& e)
mBuf = std::make_unique<BucketEntry>();
}

// In any case, replace *mBuf with e.
if (mRewriteLiveToInitEntries && e.type() == LIVEENTRY &&
protocolVersionStartsFrom(
mMeta.ledgerVersion,
Bucket::FIRST_PROTOCOL_SUPPORTING_INITENTRY_AND_METAENTRY))
{
// If mRewriteLiveToInitEntries, we also want to convert the LIVEENTRY
// to an INITENTRY. This is because each level of the bucket list
// contains only one entry per key, and per CAP-0020, INIT ENTRY implies
// that either no entry with the same ledger key exists in an older
// bucket. Therefore, all entries of type LIVEENTRY in the lowest level
// are also of type INITENTRY.
++mMergeCounters.mOutputIteratorLiveToInitConversions;
// Make a copy of e and set the type of the new entry to INITENTRY.
BucketEntry eCopy = e;
eCopy.type(INITENTRY);
*mBuf = eCopy;
}
else
{
*mBuf = e;
}
++mMergeCounters.mOutputIteratorBufferUpdates;
*mBuf = maybeInitEntry.has_value() ? *maybeInitEntry : e;
}

std::shared_ptr<Bucket>
Expand Down
4 changes: 3 additions & 1 deletion src/bucket/BucketOutputIterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class BucketOutputIterator
BucketMetadata mMeta;
bool mPutMeta{false};
MergeCounters& mMergeCounters;
bool mRewriteLiveToInitEntries{false};

public:
// BucketOutputIterators must _always_ be constructed with BucketMetadata,
Expand All @@ -46,7 +47,8 @@ class BucketOutputIterator
// (or forget to do), it's handled automatically.
BucketOutputIterator(std::string const& tmpDir, bool keepDeadEntries,
BucketMetadata const& meta, MergeCounters& mc,
asio::io_context& ctx, bool doFsync);
asio::io_context& ctx, bool doFsync,
bool rewriteLiveToInitEntries = false);

void put(BucketEntry const& e);

Expand Down
8 changes: 4 additions & 4 deletions src/bucket/FutureBucket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -362,10 +362,10 @@ FutureBucket::startMerge(Application& app, uint32_t maxProtocolVersion,
ZoneNamedN(mergeZone, "Merge task", true);
ZoneValueV(mergeZone, static_cast<int64_t>(level));

auto res =
Bucket::merge(bm, maxProtocolVersion, curr, snap, shadows,
BucketList::keepDeadEntries(level),
countMergeEvents, ctx, doFsync);
auto res = Bucket::merge(
bm, maxProtocolVersion, curr, snap, shadows,
BucketList::keepDeadEntries(level), countMergeEvents, ctx,
doFsync, BucketList::rewriteLiveToInitEntries(level));

if (res)
{
Expand Down
6 changes: 6 additions & 0 deletions src/bucket/test/BucketListTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,12 @@ TEST_CASE_VERSIONS("bucket tombstones expire at bottom level",
REQUIRE(e0.nDead != 0);
REQUIRE(e1.nDead != 0);
REQUIRE(e2.nDead == 0);
// Assert that init entries are converted to live entries
// at the lowest level.
REQUIRE(e0.nLive == 0);
REQUIRE(e0.nInit != 0);
// But not the level above.
REQUIRE(e1.nInit == 0);
});
}

Expand Down
31 changes: 0 additions & 31 deletions src/bucket/test/BucketTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -399,37 +399,6 @@ TEST_CASE("merges proceed old-style despite newer shadows",
}
}

TEST_CASE("lowest level merge converts live to init", "[bucket][bucketinit]")
{
VirtualClock clock;
Config const& cfg = getTestConfig();
Application::pointer app = createTestApplication(clock, cfg);
auto& bm = app->getBucketManager();
auto vers = getAppLedgerVersion(app);

LedgerEntry entry = generateAccount();
auto b1 = Bucket::fresh(bm, vers, {}, {entry}, {},
/*countMergeEvents=*/true, clock.getIOContext(),
/*doFsync=*/true);
EntryCounts e1(b1);
CHECK(e1.nInit == 0);
CHECK(e1.nLive == 1);
auto b2 = Bucket::fresh(bm, vers, {}, {}, {},
/*countMergeEvents=*/true, clock.getIOContext(),
/*doFsync=*/true);

// Merge b1 and b2 into a new, lowest-level bucket.
// keepDeadEntries = false signfies the lowest level of the bucket list
auto bMerged =
Bucket::merge(bm, vers, b1, b2, /*shadows=*/{},
/*keepDeadEntries=*/false,
/*countMergeEvents=*/true, clock.getIOContext(),
/*doFsync=*/true);
EntryCounts e2(bMerged);
CHECK(e2.nInit == 1);
CHECK(e2.nLive == 0);
}

TEST_CASE("merges refuse to exceed max protocol version",
"[bucket][bucketmaxprotocol]")
{
Expand Down

0 comments on commit a03be99

Please sign in to comment.