From f55924049eb8563b197975d8b4252d0fb689bf1d Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Tue, 14 Jan 2025 10:48:19 -0800 Subject: [PATCH] Bulk of change --- sqlitecluster/SQLiteNode.cpp | 325 +++++++-------------- sqlitecluster/SQLiteNode.h | 36 +-- sqlitecluster/SQLiteSequentialNotifier.cpp | 146 --------- sqlitecluster/SQLiteSequentialNotifier.h | 61 ---- 4 files changed, 117 insertions(+), 451 deletions(-) delete mode 100644 sqlitecluster/SQLiteSequentialNotifier.cpp delete mode 100644 sqlitecluster/SQLiteSequentialNotifier.h diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 87edfd469..74e896920 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -68,8 +68,6 @@ const string SQLiteNode::CONSISTENCY_LEVEL_NAMES[] = {"ASYNC", "ONE", "QUORUM"}; -atomic SQLiteNode::currentReplicateThreadID(0); - const size_t SQLiteNode::MIN_APPROVE_FREQUENCY{10}; const vector SQLiteNode::_initPeers(const string& peerListString) { @@ -106,7 +104,7 @@ const vector SQLiteNode::_initPeers(const string& peerListString) { return peerList; } -SQLiteNode::SQLiteNode(SQLiteServer& server, shared_ptr dbPool, const string& name, +SQLiteNode::SQLiteNode(SQLiteServer& server, const shared_ptr& dbPool, const string& name, const string& host, const string& peerList, int priority, uint64_t firstTimeout, const string& version, const string& commandPort) : STCPManager(), @@ -123,13 +121,12 @@ SQLiteNode::SQLiteNode(SQLiteServer& server, shared_ptr dbPool, cons _lastSentTransactionID(0), _leadPeer(nullptr), _priority(-1), - _replicationThreadCount(0), - _replicationThreadsShouldExit(false), _server(server), _state(SQLiteNodeState::UNKNOWN), _stateChangeCount(0), _stateTimeout(STimeNow() + firstTimeout), - _syncPeer(nullptr) + _syncPeer(nullptr), + _replicateThreadShouldExit(false) { KILLABLE_SQLITE_NODE = this; SASSERT(_originalPriority >= 0); @@ -139,8 +136,6 @@ SQLiteNode::SQLiteNode(SQLiteServer& server, shared_ptr dbPool, cons // its own handle to operate on. This avoids conflicts where the sync thread and the plugin are trying to both run // queries at the same time. This also avoids the need to create any share locking between the two. pluginDB = new SQLite(_db); - SINFO("[NOTIFY] setting commit count to: " << _db.getCommitCount()); - _localCommitNotifier.notifyThrough(_db.getCommitCount()); // Get this party started _changeState(SQLiteNodeState::SEARCHING); @@ -165,142 +160,102 @@ SQLiteNode::~SQLiteNode() { } } -void SQLiteNode::_replicate(SQLitePeer* peer, SData command, size_t sqlitePoolIndex, uint64_t threadAttemptStartTimestamp) { - // Notify the sync thread that this thread has begun. - { - unique_lock lock(_replicateStartMutex); - _replicateThreadStarted = true; - } - _replicateStartCV.notify_all(); +void SQLiteNode::_replicate() { + SInitialize("replication"); + // Allow the DB handle to be returned regardless of how this function exits. + SQLiteScopedHandle dbScope(*_dbPool, _dbPool->getIndex(false)); + SQLite& db = dbScope.db(); - // Initialize each new thread with a new number. - SInitialize("replicate" + to_string(currentReplicateThreadID.fetch_add(1))); + while (!_replicateThreadShouldExit) { + // If this happens mid-commit, we need to rollback. + unique_lock lock(_replicateMutex); + while (!_replicateThreadShouldExit && _replicateQueue.empty()) { + // Note, we need to interrupt this on purpose to exit the thread. + // We'll set _replicationThreadsShouldExit and `notify_one` without adding any actual work. + _replicateCV.wait(lock); + } - // Actual thread startup time. - uint64_t threadStartTime = STimeNow(); + // If _replicationThreadsShouldExit was set while we were waiting, we will exit straight away. + if (_replicateThreadShouldExit) { + return; + } - // Allow the DB handle to be returned regardless of how this function exits. - SQLiteScopedHandle dbScope(*_dbPool, sqlitePoolIndex); - SQLite& db = dbScope.db(); + // Get the first message from the queue. + SQLitePeer* peer = _replicateQueue.front().first; + SData command = move(_replicateQueue.front().second); + _replicateQueue.pop(); + uint64_t dequeueTime = STimeNow(); - bool goSearchingOnExit = false; - { - // Make sure when this thread exits we decrement our thread counter. - ScopedDecrement decrementer(_replicationThreadCount); - - SDEBUG("Replicate thread started: " << command.methodLine); - if (SIEquals(command.methodLine, "BEGIN_TRANSACTION")) { - uint64_t newCount = command.calcU64("NewCount"); - uint64_t currentCount = newCount - 1; - - // Transactions are either ASYNC or QUORUM. QUORUM transactions can only start when the DB is completely - // up-to-date. ASYNC transactions can start as soon as the DB is at `dbCountAtStart` (the same value that - // the DB was at when the transaction began on leader). - bool quorum = !SStartsWith(command["ID"], "ASYNC"); - uint64_t waitForCount = SStartsWith(command["ID"], "ASYNC") ? command.calcU64("dbCountAtStart") : currentCount; - SINFO("[performance] BEGIN_TRANSACTION replicate thread for commit " << newCount << " waiting on DB count " << waitForCount << " (" << (quorum ? "QUORUM" : "ASYNC") << ")"); - while (true) { - SQLiteSequentialNotifier::RESULT result = _localCommitNotifier.waitFor(waitForCount, false); - if (result == SQLiteSequentialNotifier::RESULT::UNKNOWN) { - // This should be impossible. - SERROR("Got UNKNOWN result from waitFor, which shouldn't happen"); - } else if (result == SQLiteSequentialNotifier::RESULT::COMPLETED) { - // Success case. - break; - } else if (result == SQLiteSequentialNotifier::RESULT::CANCELED) { - SINFO("_localCommitNotifier.waitFor canceled early, returning."); - return; - } else { - SERROR("Got unhandled SQLiteSequentialNotifier::RESULT value, did someone update the enum without updating this block?"); - } - } - SINFO("[performance] Finished waiting for commit count " << waitForCount << ", beginning replicate write."); + // Now we are locked again, and there's work in the queue to do. + bool goSearchingOnExit = false; + { + if (SIEquals(command.methodLine, "BEGIN_TRANSACTION")) { + uint64_t newCount = command.calcU64("NewCount"); + uint64_t currentCount = newCount - 1; - try { - int result = -1; - int commitAttemptCount = 1; - while (result != SQLITE_OK) { - if (commitAttemptCount > 1) { - SINFO("Commit attempt number " << commitAttemptCount << " for concurrent replication."); - } - SINFO("[performance] BEGIN for commit " << newCount); - bool uniqueContraintsError = false; - try { - auto start = chrono::steady_clock::now(); - _handleBeginTransaction(db, peer, command, commitAttemptCount > 1); - - // Now we need to wait for the DB to be up-to-date (if the transaction is QUORUM, we can - // skip this, we did it above) to enforce that commits are in the same order on followers as on - // leader. - if (!quorum) { - SDEBUG("[performance] Waiting at commit " << db.getCommitCount() << " for commit " << currentCount); - SQLiteSequentialNotifier::RESULT waitResult = _localCommitNotifier.waitFor(currentCount, true); - if (waitResult == SQLiteSequentialNotifier::RESULT::CANCELED) { - SINFO("Replication canceled mid-transaction, stopping."); - --_concurrentReplicateTransactions; - db.rollback(); - break; - } - } + // Transactions are either ASYNC or QUORUM. QUORUM transactions can only start when the DB is completely + // up-to-date. ASYNC transactions can start as soon as the DB is at `dbCountAtStart` (the same value that + // the DB was at when the transaction began on leader). + bool quorum = !SStartsWith(command["ID"], "ASYNC"); + uint64_t waitForCount = SStartsWith(command["ID"], "ASYNC") ? command.calcU64("dbCountAtStart") : currentCount; + SINFO("[performance] BEGIN_TRANSACTION replicate thread for commit " << newCount << " waiting on DB count " << waitForCount << " (" << (quorum ? "QUORUM" : "ASYNC") << ")"); - // Ok, almost ready. - // Note:: calls _sendToPeer() which is a write operation. - _handlePrepareTransaction(db, peer, command, threadAttemptStartTimestamp, threadStartTime); - auto duration = chrono::steady_clock::now() - start; - SINFO("[performance] Wrote replicate transaction in " << chrono::duration_cast(duration).count() << "us. " << _concurrentReplicateTransactions.load() - << " concurrent replicate transactions in " << _replicationThreadCount << " threads."); - } catch (const SQLite::constraint_error& e) { - // We could `continue` immediately upon catching this exception, but instead, we wait for the - // leader commit notifier to be ready. This prevents us from spinning in an endless loop on the - // same error over and over until whatever thread we're waiting for finishes. - uniqueContraintsError = true; - } - // Now see if we can commit. We wait until *after* prepare because for QUORUM transactions, we - // don't send LEADER the approval for this until inside of `prepare`. This potentially makes us - // wait while holding the commit lock for non-concurrent transactions, but I guess nobody else with - // a commit after us will be able to commit, either. - SINFO("[performance] Waiting on leader to say it has committed transaction " << command.calcU64("NewCount")); - SQLiteSequentialNotifier::RESULT waitResult = _leaderCommitNotifier.waitFor(command.calcU64("NewCount"), true); - SINFO("[performance] Leader reported committing transaction " << command.calcU64("NewCount") << ", committing."); - if (uniqueContraintsError) { - SINFO("Got unique constraints error in replication, restarting."); - --_concurrentReplicateTransactions; - db.rollback(); - continue; - } else if (waitResult == SQLiteSequentialNotifier::RESULT::CANCELED) { - SINFO("Replication canceled mid-transaction, stopping."); - --_concurrentReplicateTransactions; - db.rollback(); - break; - } + try { + int result = -1; + int commitAttemptCount = 1; + while (result != SQLITE_OK) { + if (commitAttemptCount > 1) { + SINFO("Commit attempt number " << commitAttemptCount << " for concurrent replication."); + } + SINFO("[performance] BEGIN for commit " << newCount); + bool uniqueContraintsError = false; + try { + auto start = chrono::steady_clock::now(); + _handleBeginTransaction(db, peer, command, commitAttemptCount > 1); + + // Ok, almost ready. + // Note:: calls _sendToPeer() which is a write operation. + _handlePrepareTransaction(db, peer, command, dequeueTime); + auto duration = chrono::steady_clock::now() - start; + SINFO("[performance] Wrote replicate transaction in " << chrono::duration_cast(duration).count() << "us."); + } catch (const SQLite::constraint_error& e) { + // We could `continue` immediately upon catching this exception, but instead, we wait for the + // leader commit notifier to be ready. This prevents us from spinning in an endless loop on the + // same error over and over until whatever thread we're waiting for finishes. + uniqueContraintsError = true; + } + if (uniqueContraintsError) { + SINFO("Got unique constraints error in replication, restarting."); + db.rollback(); + continue; + } - // Leader says it has committed this transaction, so we can too. - ++commitAttemptCount; - result = _handleCommitTransaction(db, peer, command.calcU64("NewCount"), command["NewHash"]); - if (result != SQLITE_OK) { - db.rollback(); + // Leader says it has committed this transaction, so we can too. + ++commitAttemptCount; + result = _handleCommitTransaction(db, peer, command.calcU64("NewCount"), command["NewHash"]); + if (result != SQLITE_OK) { + db.rollback(); + } } + } catch (const SException& e) { + SALERT("Caught exception in replication thread. Assuming this means we want to stop following. Exception: " << e.what()); + goSearchingOnExit = true; + db.rollback(); } - } catch (const SException& e) { - SALERT("Caught exception in replication thread. Assuming this means we want to stop following. Exception: " << e.what()); + } else if (SIEquals(command.methodLine, "ROLLBACK_TRANSACTION")) { + // `decrementer` needs to be destroyed to decrement our thread count before we can change state out of + // FOLLOWING. + _handleRollbackTransaction(db, peer, command); goSearchingOnExit = true; - --_concurrentReplicateTransactions; - db.rollback(); - } - } else if (SIEquals(command.methodLine, "ROLLBACK_TRANSACTION")) { - // `decrementer` needs to be destroyed to decrement our thread count before we can change state out of - // FOLLOWING. - _handleRollbackTransaction(db, peer, command); - --_concurrentReplicateTransactions; - goSearchingOnExit = true; - } - } - if (goSearchingOnExit) { - // We can lock here for this state change because we're in our own thread, and this won't be recursive with - // the calling thread. This is also a really weird exception case that should never happen, so the performance - // implications aren't significant so long as we don't break. - unique_lock uniqueLock(_stateMutex); - _changeState(SQLiteNodeState::SEARCHING); + } + } + if (goSearchingOnExit) { + // We can lock here for this state change because we're in our own thread, and this won't be recursive with + // the calling thread. This is also a really weird exception case that should never happen, so the performance + // implications aren't significant so long as we don't break. + unique_lock uniqueLock(_stateMutex); + _changeState(SQLiteNodeState::SEARCHING); + } } } @@ -1650,46 +1605,17 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { SINFO("Ignoring " << message.methodLine << " in state " << stateName(_state)); return; } - if (_replicationThreadsShouldExit) { + if (_replicateThreadShouldExit) { SINFO("Discarding replication message, stopping FOLLOWING"); } else { - if (SIEquals(message.methodLine, "COMMIT_TRANSACTION")) { - // For COMMIT_TRANSACTION messages, we do not start a new thread. This avoids a race condition where we could spin up the - // COMMIT thread, but not yet have called `_leaderCommitNotifier.notifyThrough` for the current transaction number while - // the sync thread changes states. Particularly, if the sync thread dropped out of FOLLOWING before this happened, - // We could have dropped commits that leader had sent us, because we hadn't recorded that we received them. - // When leader is standing down this could have ultimately led to a fork because no other node saved those commits. - SINFO("[performance] Notifying threads that leader has committed transaction " << message.calcU64("CommitCount")); - _leaderCommitNotifier.notifyThrough(message.calcU64("CommitCount")); - } else { - try { - auto threadID = _replicationThreadCount.fetch_add(1); - SDEBUG("Spawning concurrent replicate thread (blocks until DB handle available): " << threadID); - uint64_t threadAttemptStartTimestamp = STimeNow(); - _replicateThreadStarted = false; - thread(&SQLiteNode::_replicate, this, peer, message, _dbPool->getIndex(false), threadAttemptStartTimestamp).detach(); - { - unique_lock lock(_replicateStartMutex); - while (!_replicateThreadStarted) { - _replicateStartCV.wait(lock); - if (!_replicateThreadStarted) { - SINFO("condition variable finished waiting but replicate thread not started."); - } - } - } - SDEBUG("Done spawning concurrent replicate thread: " << threadID); - } catch (const system_error& e) { - // If the server is strugling and falling behind on replication, we might have too many threads - // causing a resource exhaustion. If that happens, all the transactions that are already threaded - // and waiting for the transaction that failed will be stuck in an infinite loop. To prevent that - // we're changing the state to SEARCHING and sending the cancelAfter property to drop all threads - // that depend on the transaction that failed to be threaded. - _replicationThreadCount.fetch_sub(1); - SWARN("Caught system_error starting _replicate thread with " << _replicationThreadCount.load() << " threads. e.what()=" << e.what()); - _changeState(SQLiteNodeState::SEARCHING, message.calcU64("NewCount") - 1); - STHROW("Error starting replicate thread so giving up and reconnecting."); - } + if (!_replicateThread) { + _replicateThread = new thread(&SQLiteNode::_replicate, this); + } + { + lock_guard lock(_replicateMutex); + _replicateQueue.push(make_pair(peer, message)); } + _replicateCV.notify_one(); } } else if (SIEquals(message.methodLine, "APPROVE_TRANSACTION") || SIEquals(message.methodLine, "DENY_TRANSACTION")) { // APPROVE_TRANSACTION: Sent to the leader by a follower when it confirms it was able to begin a transaction and @@ -1920,7 +1846,6 @@ void SQLiteNode::_sendToAllPeers(const SData& message, bool subscribedOnly) { void SQLiteNode::_changeState(SQLiteNodeState newState, uint64_t commitIDToCancelAfter) { SINFO("[NOTIFY] setting commit count to: " << _db.getCommitCount()); - _localCommitNotifier.notifyThrough(_db.getCommitCount()); if (newState != _state) { // First, we notify all plugins about the state change @@ -1928,30 +1853,11 @@ void SQLiteNode::_changeState(SQLiteNodeState newState, uint64_t commitIDToCance // If we were following, and now we're not, we give up an any replications. if (_state == SQLiteNodeState::FOLLOWING) { - _replicationThreadsShouldExit = true; - uint64_t cancelAfter = commitIDToCancelAfter ? commitIDToCancelAfter : _leaderCommitNotifier.getValue(); - SINFO("Replication threads should exit, canceling commits after current leader commit " << cancelAfter); - _localCommitNotifier.cancel(cancelAfter); - _leaderCommitNotifier.cancel(cancelAfter); - - // Polling wait for threads to quit. This could use a notification model such as with a condition_variable, - // which would probably be "better" but introduces yet more state variables for a state that we're rarely - // in, and so I've left it out for the time being. - size_t infoCount = 1; - while (_replicationThreadCount) { - if (infoCount % 100 == 0) { - SINFO("Waiting for " << _replicationThreadCount << " remaining replication threads."); - } - infoCount++; - usleep(10'000); - } - - // Done exiting. Reset so that we can resume FOLLOWING in the future. - _replicationThreadsShouldExit = false; - - // Guaranteed to be done right now. - _localCommitNotifier.reset(); - _leaderCommitNotifier.reset(); + _replicateThreadShouldExit = true; + _replicateThread->join(); + delete(_replicateThread); + _replicateThread = nullptr; + _replicateThreadShouldExit = false; // We have no leader anymore. _leadPeer = nullptr; @@ -2190,10 +2096,6 @@ void SQLiteNode::_recvSynchronize(SQLitePeer* peer, const SData& message) { SDEBUG("Committing current transaction because _recvSynchronize: " << _db.getUncommittedQuery()); _db.commit(stateName(_state)); - // Should work here. - SINFO("[NOTIFY] setting commit count to: " << _db.getCommitCount()); - _localCommitNotifier.notifyThrough(_db.getCommitCount()); - if (_db.getCommittedHash() != commit["Hash"]) STHROW("potential hash mismatch"); --commitsRemaining; @@ -2333,11 +2235,6 @@ void SQLiteNode::_handleBeginTransaction(SQLite& db, SQLitePeer* peer, const SDa STHROW("already in a transaction"); } - // If we are running this after a conflict, we'll grab an exclusive lock here. This makes no practical - // difference in replication, as transactions must commit in order, thus if we've failed one commit, nobody - // else can attempt to commit anyway, but this logs our time spent in the commit mutex in EXCLUSIVE rather - // than SHARED mode. - ++_concurrentReplicateTransactions; if (!db.beginTransaction(wasConflict ? SQLite::TRANSACTION_TYPE::EXCLUSIVE : SQLite::TRANSACTION_TYPE::SHARED)) { STHROW("failed to begin transaction"); } @@ -2348,7 +2245,8 @@ void SQLiteNode::_handleBeginTransaction(SQLite& db, SQLitePeer* peer, const SDa } } -void SQLiteNode::_handlePrepareTransaction(SQLite& db, SQLitePeer* peer, const SData& message, uint64_t dequeueTime, uint64_t threadStartTime) { +void SQLiteNode::_handlePrepareTransaction(SQLite& db, SQLitePeer* peer, const SData& message, uint64_t dequeueTime) { + uint64_t prepareStartTime = STimeNow(); // BEGIN_TRANSACTION: Sent by the LEADER to all subscribed followers to begin a new distributed transaction. Each // follower begins a local transaction with this query and responds APPROVE_TRANSACTION. If the follower cannot start // the transaction for any reason, it is broken somehow -- disconnect from the leader. @@ -2405,13 +2303,11 @@ void SQLiteNode::_handlePrepareTransaction(SQLite& db, SQLitePeer* peer, const S } uint64_t leaderSentTimestamp = message.calcU64("leaderSendTime"); uint64_t transitTimeUS = dequeueTime - leaderSentTimestamp; - uint64_t threadStartTimeUS = threadStartTime - dequeueTime; - uint64_t applyTimeUS = STimeNow() - threadStartTime; + uint64_t applyTimeUS = STimeNow() - prepareStartTime; float transitTimeMS = (float)transitTimeUS / 1000.0; - float threadStartTimeMS = (float)threadStartTimeUS / 1000.0; float applyTimeMS = (float)applyTimeUS / 1000.0; PINFO("[performance] Replicated transaction " << message.calcU64("NewCount") << ", sent by leader at " << leaderSentTimestamp - << ", transit/dequeue time: " << transitTimeMS << "ms, thread start time: " << threadStartTimeMS << "ms, applied in: " << applyTimeMS << "ms, should COMMIT next."); + << ", transit/dequeue time: " << transitTimeMS << "ms, applied in: " << applyTimeMS << "ms, should COMMIT next."); } int SQLiteNode::_handleCommitTransaction(SQLite& db, SQLitePeer* peer, const uint64_t commandCommitCount, const string& commandCommitHash) { @@ -2433,15 +2329,8 @@ int SQLiteNode::_handleCommitTransaction(SQLite& db, SQLitePeer* peer, const uin SDEBUG("Committing current transaction because COMMIT_TRANSACTION: " << db.getUncommittedQuery()); - // Let the commit handler notify any other waiting threads that our commit is complete before it starts a checkpoint. - function notifyIfCommitted = [&]() { - auto commitCount = db.getCommitCount(); - SINFO("[performance] Notifying waiting threads that we've locally committed " << commitCount); - _localCommitNotifier.notifyThrough(commitCount); - }; - int result = db.commit(stateName(_state), ¬ifyIfCommitted); - --_concurrentReplicateTransactions; + int result = db.commit(stateName(_state)); if (result == SQLITE_BUSY_SNAPSHOT) { // conflict, bail out early. return result; diff --git a/sqlitecluster/SQLiteNode.h b/sqlitecluster/SQLiteNode.h index 5cb8365b8..430901211 100644 --- a/sqlitecluster/SQLiteNode.h +++ b/sqlitecluster/SQLiteNode.h @@ -8,6 +8,7 @@ #include #include +#include // This file is long and complex. For each nested sub-structure (I.e., classes inside classes) we have attempted to // arrange things as such: @@ -168,7 +169,7 @@ class SQLiteNode : public STCPManager { void postPoll(fd_map& fdm, uint64_t& nextActivity); // Constructor/Destructor - SQLiteNode(SQLiteServer& server, shared_ptr dbPool, const string& name, const string& host, + SQLiteNode(SQLiteServer& server, const shared_ptr& dbPool, const string& name, const string& host, const string& peerList, int priority, uint64_t firstTimeout, const string& version, const string& commandPort = "localhost:8890"); ~SQLiteNode(); @@ -208,9 +209,6 @@ class SQLiteNode : public STCPManager { // for logging. static const string CONSISTENCY_LEVEL_NAMES[NUM_CONSISTENCY_LEVELS]; - // Monotonically increasing thread counter, used for thread IDs for logging purposes. - static atomic currentReplicateThreadID; - static const vector _initPeers(const string& peerList); // Queue a SYNCHRONIZE message based on the current state of the node, thread-safe, but you need to pass the @@ -232,7 +230,7 @@ class SQLiteNode : public STCPManager { // Handlers for transaction messages. void _handleBeginTransaction(SQLite& db, SQLitePeer* peer, const SData& message, bool wasConflict); - void _handlePrepareTransaction(SQLite& db, SQLitePeer* peer, const SData& message, uint64_t dequeueTime, uint64_t threadStartTime); + void _handlePrepareTransaction(SQLite& db, SQLitePeer* peer, const SData& message, uint64_t dequeueTime); int _handleCommitTransaction(SQLite& db, SQLitePeer* peer, const uint64_t commandCommitCount, const string& commandCommitHash); void _handleRollbackTransaction(SQLite& db, SQLitePeer* peer, const SData& message); @@ -263,7 +261,7 @@ class SQLiteNode : public STCPManager { // // This thread exits on completion of handling the command or when node._replicationThreadsShouldExit is set, // which happens when a node stops FOLLOWING. - void _replicate(SQLitePeer* peer, SData command, size_t sqlitePoolIndex, uint64_t threadAttemptStartTimestamp); + void _replicate(); // Replicates any transactions that have been made on our database by other threads to peers. void _sendOutstandingTransactions(const set& commitOnlyIDs = {}); @@ -333,10 +331,6 @@ class SQLiteNode : public STCPManager { // Pointer to the peer that is the leader. Null if we're the leader, or if we don't have a leader yet. atomic _leadPeer; - // These are used in _replicate, _changeState, and _recvSynchronize to coordinate the replication threads. - SQLiteSequentialNotifier _leaderCommitNotifier; - SQLiteSequentialNotifier _localCommitNotifier; - // We can spin up threads to handle responding to `SYNCHRONIZE` messages out-of-band. We want to make sure we don't // shut down in the middle of running these, so we keep a count of them. atomic _pendingSynchronizeResponses = 0; @@ -348,18 +342,6 @@ class SQLiteNode : public STCPManager { // Remove. See: https://github.com/Expensify/Expensify/issues/208449 atomic _priority; - // These three variables are used to coordinate the startup of replication threads to guarantee each thread starts before we attempt to start the next one. - mutex _replicateStartMutex; - condition_variable _replicateStartCV; - bool _replicateThreadStarted = false; - - // Counter of the total number of currently active replication threads. This is used to let us know when all - // threads have finished. - atomic _replicationThreadCount; - - // State variable that indicates when the above threads should quit. - atomic _replicationThreadsShouldExit; - // Server that implements `SQLiteServer` interface. SQLiteServer& _server; @@ -387,11 +369,13 @@ class SQLiteNode : public STCPManager { // Remove. See: https://github.com/Expensify/Expensify/issues/208439 SQLitePeer* _syncPeer; - // Debugging info. Log the current number of transactions we're actually performing in replicate threads. - // This can be removed once we've figured out why replication falls behind. See this issue: https://github.com/Expensify/Expensify/issues/210528 - atomic _concurrentReplicateTransactions = 0; - // A pointer to a SQLite instance that is passed to plugin's stateChanged function. This prevents plugins from operating on the same handle that // the sync node is when they run queries in stateChanged. SQLite* pluginDB; + + thread* _replicateThread = nullptr; + mutex _replicateMutex; + condition_variable _replicateCV; + queue> _replicateQueue; + atomic _replicateThreadShouldExit; }; diff --git a/sqlitecluster/SQLiteSequentialNotifier.cpp b/sqlitecluster/SQLiteSequentialNotifier.cpp deleted file mode 100644 index e12505ecd..000000000 --- a/sqlitecluster/SQLiteSequentialNotifier.cpp +++ /dev/null @@ -1,146 +0,0 @@ -#include -#include "SQLiteSequentialNotifier.h" - -SQLiteSequentialNotifier::RESULT SQLiteSequentialNotifier::waitFor(uint64_t value, bool insideTransaction) { - shared_ptr state(nullptr); - { - lock_guard lock(_internalStateMutex); - if (value <= _value) { - return RESULT::COMPLETED; - } - - // Create a new WaitState object and save a shared_ptr to it in `state`. - state = make_shared(); - if (insideTransaction) { - _valueToPendingThreadMap.emplace(value, state); - } else { - _valueToPendingThreadMapNoCurrentTransaction.emplace(value, state); - } - } - - while (true) { - unique_lock lock(state->waitingThreadMutex); - if (_globalResult == RESULT::CANCELED) { - if (_cancelAfter != 0 && value <= _cancelAfter) { - // If cancelAfter is set, but higher than what we're waiting for, we ignore the CANCELED and wait for - // this WaitState to have a result anyway. - if (state->result != RESULT::UNKNOWN) { - return state->result; - } - // If there's no result yet, log that we're waiting for it. - SINFO("Canceled after " << _cancelAfter << ", but waiting for " << value << " so not returning yet."); - } else { - // Canceled and we're not before the cancellation cutoff. - return RESULT::CANCELED; - } - } else if (_globalResult != RESULT::UNKNOWN) { - return _globalResult; - } else if (state->result != RESULT::UNKNOWN) { - return state->result; - } - cv_status result = state->waitingThreadConditionVariable.wait_for(lock, 1s); - if (result == cv_status::timeout) { - // We shouldn't need this 1s timeout at all, and should be able to wait indefinitely until this thread is woken up, because that should always happen eventually. But it seems - // like there might be a bug *somewhere* that causes us to either miss a notification that we've canceled some outstanding transactions, or that we are failing to notify them - // that they're canceled. To handle that case, we wake up every second and will re-check, but warn if such a thing has happened. - // - // Note that this can also happen in the case of successful notifications - it seems like we can get a timeout even in the case that the notification was sent, if these two - // things happen more-or-less simultaneously. The condition_variable documentation does not make it clear if this should be the case or not, but in every examined case, the - // waited-for commit and the timeout happened more or less simultaneously (not with up to a 1s gap between them, which would indicate a missed notification and eventual timeout) - // so we are ignoring that case which seems to work OK. - // - // We should investigate any instances of thew below logline to see if they're same as for the success cases mentioned above (i.e., the timeout happens simultaneously as the - // cancellation) or if the log line is delayed by up to a second (indicating a problem). - if (_globalResult == RESULT::CANCELED || state->result == RESULT::CANCELED) { - // It's possible that we hit the timeout here after `cancel()` has set the global value, but before we received the notification. - // This isn't a problem, and we can jump back to the top of the loop and check again. If there's some problem, we'll see it there. - SINFO("Hit 1s timeout while global cancel " << (_globalResult == RESULT::CANCELED) << " or specific cancel " << (state->result == RESULT::CANCELED)); - continue; - } - } - } -} - -uint64_t SQLiteSequentialNotifier::getValue() { - lock_guard lock(_internalStateMutex); - return _value; -} - -void SQLiteSequentialNotifier::notifyThrough(uint64_t value) { - lock_guard lock(_internalStateMutex); - if (value > _value) { - _value = value; - } - for (auto valueThreadMapPtr : {&_valueToPendingThreadMap, &_valueToPendingThreadMapNoCurrentTransaction}) { - auto& valueThreadMap = *valueThreadMapPtr; - auto lastToDelete = valueThreadMap.begin(); - for (auto it = valueThreadMap.begin(); it != valueThreadMap.end(); it++) { - if (it->first > value) { - // If we've passed our value, there's nothing else to erase, so we can stop. - SINFO("[performance] Breaking out of thread notifications because " << it->first << " > " << value); - break; - } - - // Note that we'll delete this item from the map. - lastToDelete++; - - // Make the changes to the state object - mark it complete and notify anyone waiting. - lock_guard lock(it->second->waitingThreadMutex); - it->second->result = RESULT::COMPLETED; - it->second->waitingThreadConditionVariable.notify_all(); - } - - // Now we've finished with all of our updates and notifications and can remove everything from our map. - // Note that erasing an empty range (i.e., from() begin to begin()) is tested to be a no-op. The documentation I've - // found for multimap is unclear on this, though the documentation for `std::list` specifies: - // "The iterator first does not need to be dereferenceable if first==last: erasing an empty range is a no-op." - // - // I think it's reasonable to assume this is the intention for multimap as well, and in my testing, that was the - // case. - valueThreadMap.erase(valueThreadMap.begin(), lastToDelete); - } -} - -void SQLiteSequentialNotifier::cancel(uint64_t cancelAfter) { - SINFO("Canceling all pending transactions after " << cancelAfter); - lock_guard lock(_internalStateMutex); - - // It's important that _cancelAfter is set before _globalResult. This avoids a race condition where we check - // _globalResult in waitFor but then find _cancelAfter unset. - _cancelAfter = cancelAfter; - _globalResult = RESULT::CANCELED; - - for (auto valueThreadMapPtr : {&_valueToPendingThreadMap, &_valueToPendingThreadMapNoCurrentTransaction}) { - auto& valueThreadMap = *valueThreadMapPtr; - // If cancelAfter is specified, start from that value. Otherwise, we start from the beginning. - auto start = _cancelAfter ? valueThreadMap.upper_bound(_cancelAfter) : valueThreadMap.begin(); - if (start == valueThreadMap.end()) { - // There's nothing to remove. - SINFO("[performance] No available values to cancel after " << cancelAfter); - return; - } - SINFO("[performance] Next value to cancel after " << cancelAfter << " is " << start->first); - - // Now iterate across whatever's remaining and mark it canceled. - auto current = start; - while(current != valueThreadMap.end()) { - SINFO("[performance] Setting canceled for thread waiting on " << current->first); - lock_guard lock(current->second->waitingThreadMutex); - current->second->result = RESULT::CANCELED; - current->second->waitingThreadConditionVariable.notify_all(); - current++; - SINFO("[performance] Canceled for thread waiting on " << current->first); - } - - // And remove these items entirely. - valueThreadMap.erase(start, valueThreadMap.end()); - } - SINFO("Canceled all pending transactions after " << cancelAfter); -} - -void SQLiteSequentialNotifier::reset() { - lock_guard lock(_internalStateMutex); - _globalResult = RESULT::UNKNOWN; - _value = 0; - _cancelAfter = 0; -} diff --git a/sqlitecluster/SQLiteSequentialNotifier.h b/sqlitecluster/SQLiteSequentialNotifier.h deleted file mode 100644 index 432193904..000000000 --- a/sqlitecluster/SQLiteSequentialNotifier.h +++ /dev/null @@ -1,61 +0,0 @@ -#pragma once -#include -#include "SQLite.h" -#include - -class SQLiteSequentialNotifier { - public: - - // Enumeration of all the possible states to result from waiting. - enum class RESULT { - UNKNOWN, - COMPLETED, - CANCELED, - }; - - // Constructor - SQLiteSequentialNotifier() : _value(0), _globalResult(RESULT::UNKNOWN), _cancelAfter(0) {} - - // Blocks until `_value` meets or exceeds `value`, unless an exceptional case (CANCELED, CHEKPOINT_REQUIRED) is - // hit, and returns the corresponding RESULT. - SQLiteSequentialNotifier::RESULT waitFor(uint64_t value, bool insideTransaction); - - // Causes any threads waiting for a value up to and including `value` to return `true`. - void notifyThrough(uint64_t value); - - // Causes any thread waiting for any value to return `false`. Also, any future calls to `waitFor` will return - // `RESULT::CANCELED` until `reset` is called. - // If `cancelAfter` is specified, then only threads waiting for a value *greater than* cancelAfter are interrupted, - // and only calls to `waitFor` with values higher than the current _value return `RESULT::CANCELED`. - void cancel(uint64_t cancelAfter = 0); - - // Returns the current value of this notifier. - uint64_t getValue(); - - // After calling `reset`, all calls to `waitFor` return `false` until this is called, and then they will wait - // again. This allows for a caller to call `cancel`, wait for the completion of their threads, and then call - // `reset` to use the object again. - void reset(); - - private: - // This encapsulates the set of values we need to have a thread wait. It's a mutex and condition_variable that the - // thread can use to wait, and a result indicating if the required result has actually been reached (because - // condition_variables can be spuriously interrupted and need a second `wait()` call). - struct WaitState { - WaitState() : result(RESULT::UNKNOWN) {} - mutex waitingThreadMutex; - condition_variable waitingThreadConditionVariable; - RESULT result; - }; - - mutex _internalStateMutex; - multimap> _valueToPendingThreadMap; - multimap> _valueToPendingThreadMapNoCurrentTransaction; - uint64_t _value; - - // If there is a global result for all pending operations (i.e., they've been canceled), that is stored here. - atomic _globalResult; - - // For saving the value after which new or existing waiters will be returned a CANCELED result. - atomic _cancelAfter; -};