From a03be99b7fe10ab6d720c34b3bcb4e924d4f6ca6 Mon Sep 17 00:00:00 2001 From: Thomas Brady Date: Mon, 25 Nov 2024 11:02:58 -0800 Subject: [PATCH] Cleanup copy logic, add rewriteLiveToInitEntries to BucketList, test conversion of lowest level via merge in BucketListTests --- src/bucket/Bucket.cpp | 5 +-- src/bucket/Bucket.h | 2 +- src/bucket/BucketList.cpp | 6 ++++ src/bucket/BucketList.h | 4 +++ src/bucket/BucketOutputIterator.cpp | 50 +++++++++++++++-------------- src/bucket/BucketOutputIterator.h | 4 ++- src/bucket/FutureBucket.cpp | 8 ++--- src/bucket/test/BucketListTests.cpp | 6 ++++ src/bucket/test/BucketTests.cpp | 31 ------------------ 9 files changed, 53 insertions(+), 63 deletions(-) diff --git a/src/bucket/Bucket.cpp b/src/bucket/Bucket.cpp index 61e96b3a28..a9cf2e1957 100644 --- a/src/bucket/Bucket.cpp +++ b/src/bucket/Bucket.cpp @@ -783,7 +783,8 @@ Bucket::merge(BucketManager& bucketManager, uint32_t maxProtocolVersion, std::shared_ptr const& newBucket, std::vector> const& shadows, bool keepDeadEntries, bool countMergeEvents, - asio::io_context& ctx, bool doFsync) + asio::io_context& ctx, bool doFsync, + bool rewriteLiveToInitEntries) { ZoneScoped; // This is the key operation in the scheme: merging two (read-only) @@ -809,7 +810,7 @@ Bucket::merge(BucketManager& bucketManager, uint32_t maxProtocolVersion, BucketMetadata meta; meta.ledgerVersion = protocolVersion; BucketOutputIterator out(bucketManager.getTmpDir(), keepDeadEntries, meta, - mc, ctx, doFsync); + mc, ctx, doFsync, rewriteLiveToInitEntries); BucketEntryIdCmp cmp; size_t iter = 0; diff --git a/src/bucket/Bucket.h b/src/bucket/Bucket.h index c4b6773949..e201e9c196 100644 --- a/src/bucket/Bucket.h +++ b/src/bucket/Bucket.h @@ -176,7 +176,7 @@ class Bucket : public std::enable_shared_from_this, std::shared_ptr const& newBucket, std::vector> const& shadows, bool keepDeadEntries, bool countMergeEvents, asio::io_context& ctx, - bool doFsync); + bool doFsync, bool rewriteLiveToInitEntries = false); static uint32_t getBucketVersion(std::shared_ptr const& bucket); static uint32_t diff --git a/src/bucket/BucketList.cpp b/src/bucket/BucketList.cpp index aa3820dd47..ef63eda667 100644 --- a/src/bucket/BucketList.cpp +++ b/src/bucket/BucketList.cpp @@ -435,6 +435,12 @@ BucketList::keepDeadEntries(uint32_t level) return level < BucketList::kNumLevels - 1; } +bool +BucketList::rewriteLiveToInitEntries(uint32_t level) +{ + return level == BucketList::kNumLevels - 1; +} + BucketLevel const& BucketList::getLevel(uint32_t i) const { diff --git a/src/bucket/BucketList.h b/src/bucket/BucketList.h index 5599132f4c..0aec752a55 100644 --- a/src/bucket/BucketList.h +++ b/src/bucket/BucketList.h @@ -439,6 +439,10 @@ class BucketList // Returns true if at given `level` dead entries should be kept. static bool keepDeadEntries(uint32_t level); + // Returns true if at given `level` live entries should be rewritten to + // init entries. + static bool rewriteLiveToInitEntries(uint32_t level); + // Number of ledgers it takes a bucket to spill/receive an incoming spill static uint32_t bucketUpdatePeriod(uint32_t level, bool isCurr); diff --git a/src/bucket/BucketOutputIterator.cpp b/src/bucket/BucketOutputIterator.cpp index d1feaf697b..e5382ed321 100644 --- a/src/bucket/BucketOutputIterator.cpp +++ b/src/bucket/BucketOutputIterator.cpp @@ -21,7 +21,8 @@ BucketOutputIterator::BucketOutputIterator(std::string const& tmpDir, bool keepDeadEntries, BucketMetadata const& meta, MergeCounters& mc, - asio::io_context& ctx, bool doFsync) + asio::io_context& ctx, bool doFsync, + bool rewriteLiveToInitEntries) : mFilename(Bucket::randomBucketName(tmpDir)) , mOut(ctx, doFsync) , mCtx(ctx) @@ -29,6 +30,7 @@ BucketOutputIterator::BucketOutputIterator(std::string const& tmpDir, , mKeepDeadEntries(keepDeadEntries) , mMeta(meta) , mMergeCounters(mc) + , mRewriteLiveToInitEntries(rewriteLiveToInitEntries) { ZoneScoped; CLOG_TRACE(Bucket, "BucketOutputIterator opening file to write: {}", @@ -67,36 +69,17 @@ BucketOutputIterator::put(BucketEntry const& e) ++mMergeCounters.mOutputIteratorTombstoneElisions; return; } - std::optional maybeInitEntry; - if (!mKeepDeadEntries && e.type() == LIVEENTRY && - protocolVersionStartsFrom( - mMeta.ledgerVersion, - Bucket::FIRST_PROTOCOL_SUPPORTING_INITENTRY_AND_METAENTRY)) - { - // If mKeepDeadEntries is false (lowest level), - // we also want to convert the LIVEENTRY to an INITENTRY. - // This is because each level of the bucket list contains - // only one entry per key, and per CAP-0020, INIT ENTRY - // implies that either no entry with the same ledger key - // exists in an older bucket. Therefore, all entries of type - // LIVEENTRY in the lowest level are also of type INITENTRY. - ++mMergeCounters.mOutputIteratorLiveToInitConversions; - // Make a copy of e and set the type of the new entry to INITENTRY. - maybeInitEntry.emplace(e); - maybeInitEntry->type(INITENTRY); - } // Check to see if there's an existing buffered entry. if (mBuf) { // mCmp(e, *mBuf) means e < *mBuf; this should never be true since // it would mean that we're getting entries out of order. - releaseAssert( - !mCmp(maybeInitEntry.has_value() ? *maybeInitEntry : e, *mBuf)); + releaseAssert(!mCmp(e, *mBuf)); // Check to see if the new entry should flush (greater identity), or // merely replace (same identity), the buffered entry. - if (mCmp(*mBuf, maybeInitEntry.has_value() ? *maybeInitEntry : e)) + if (mCmp(*mBuf, e)) { ++mMergeCounters.mOutputIteratorActualWrites; mOut.writeOne(*mBuf, &mHasher, &mBytesPut); @@ -108,9 +91,28 @@ BucketOutputIterator::put(BucketEntry const& e) mBuf = std::make_unique(); } - // In any case, replace *mBuf with e. + if (mRewriteLiveToInitEntries && e.type() == LIVEENTRY && + protocolVersionStartsFrom( + mMeta.ledgerVersion, + Bucket::FIRST_PROTOCOL_SUPPORTING_INITENTRY_AND_METAENTRY)) + { + // If mRewriteLiveToInitEntries, we also want to convert the LIVEENTRY + // to an INITENTRY. This is because each level of the bucket list + // contains only one entry per key, and per CAP-0020, INIT ENTRY implies + // that either no entry with the same ledger key exists in an older + // bucket. Therefore, all entries of type LIVEENTRY in the lowest level + // are also of type INITENTRY. + ++mMergeCounters.mOutputIteratorLiveToInitConversions; + // Make a copy of e and set the type of the new entry to INITENTRY. + BucketEntry eCopy = e; + eCopy.type(INITENTRY); + *mBuf = eCopy; + } + else + { + *mBuf = e; + } ++mMergeCounters.mOutputIteratorBufferUpdates; - *mBuf = maybeInitEntry.has_value() ? *maybeInitEntry : e; } std::shared_ptr diff --git a/src/bucket/BucketOutputIterator.h b/src/bucket/BucketOutputIterator.h index 20aed133c5..8c3dc7dda8 100644 --- a/src/bucket/BucketOutputIterator.h +++ b/src/bucket/BucketOutputIterator.h @@ -35,6 +35,7 @@ class BucketOutputIterator BucketMetadata mMeta; bool mPutMeta{false}; MergeCounters& mMergeCounters; + bool mRewriteLiveToInitEntries{false}; public: // BucketOutputIterators must _always_ be constructed with BucketMetadata, @@ -46,7 +47,8 @@ class BucketOutputIterator // (or forget to do), it's handled automatically. BucketOutputIterator(std::string const& tmpDir, bool keepDeadEntries, BucketMetadata const& meta, MergeCounters& mc, - asio::io_context& ctx, bool doFsync); + asio::io_context& ctx, bool doFsync, + bool rewriteLiveToInitEntries = false); void put(BucketEntry const& e); diff --git a/src/bucket/FutureBucket.cpp b/src/bucket/FutureBucket.cpp index 981708e196..4b1622c3a3 100644 --- a/src/bucket/FutureBucket.cpp +++ b/src/bucket/FutureBucket.cpp @@ -362,10 +362,10 @@ FutureBucket::startMerge(Application& app, uint32_t maxProtocolVersion, ZoneNamedN(mergeZone, "Merge task", true); ZoneValueV(mergeZone, static_cast(level)); - auto res = - Bucket::merge(bm, maxProtocolVersion, curr, snap, shadows, - BucketList::keepDeadEntries(level), - countMergeEvents, ctx, doFsync); + auto res = Bucket::merge( + bm, maxProtocolVersion, curr, snap, shadows, + BucketList::keepDeadEntries(level), countMergeEvents, ctx, + doFsync, BucketList::rewriteLiveToInitEntries(level)); if (res) { diff --git a/src/bucket/test/BucketListTests.cpp b/src/bucket/test/BucketListTests.cpp index edf229d439..05f28bed0a 100644 --- a/src/bucket/test/BucketListTests.cpp +++ b/src/bucket/test/BucketListTests.cpp @@ -411,6 +411,12 @@ TEST_CASE_VERSIONS("bucket tombstones expire at bottom level", REQUIRE(e0.nDead != 0); REQUIRE(e1.nDead != 0); REQUIRE(e2.nDead == 0); + // Assert that init entries are converted to live entries + // at the lowest level. + REQUIRE(e0.nLive == 0); + REQUIRE(e0.nInit != 0); + // But not the level above. + REQUIRE(e1.nInit == 0); }); } diff --git a/src/bucket/test/BucketTests.cpp b/src/bucket/test/BucketTests.cpp index 3272227d89..4da46e26a7 100644 --- a/src/bucket/test/BucketTests.cpp +++ b/src/bucket/test/BucketTests.cpp @@ -399,37 +399,6 @@ TEST_CASE("merges proceed old-style despite newer shadows", } } -TEST_CASE("lowest level merge converts live to init", "[bucket][bucketinit]") -{ - VirtualClock clock; - Config const& cfg = getTestConfig(); - Application::pointer app = createTestApplication(clock, cfg); - auto& bm = app->getBucketManager(); - auto vers = getAppLedgerVersion(app); - - LedgerEntry entry = generateAccount(); - auto b1 = Bucket::fresh(bm, vers, {}, {entry}, {}, - /*countMergeEvents=*/true, clock.getIOContext(), - /*doFsync=*/true); - EntryCounts e1(b1); - CHECK(e1.nInit == 0); - CHECK(e1.nLive == 1); - auto b2 = Bucket::fresh(bm, vers, {}, {}, {}, - /*countMergeEvents=*/true, clock.getIOContext(), - /*doFsync=*/true); - - // Merge b1 and b2 into a new, lowest-level bucket. - // keepDeadEntries = false signfies the lowest level of the bucket list - auto bMerged = - Bucket::merge(bm, vers, b1, b2, /*shadows=*/{}, - /*keepDeadEntries=*/false, - /*countMergeEvents=*/true, clock.getIOContext(), - /*doFsync=*/true); - EntryCounts e2(bMerged); - CHECK(e2.nInit == 1); - CHECK(e2.nLive == 0); -} - TEST_CASE("merges refuse to exceed max protocol version", "[bucket][bucketmaxprotocol]") {