Skip to content

Commit

Permalink
Fixed but with a bunch of extra testing junk in it
Browse files Browse the repository at this point in the history
  • Loading branch information
tylerkaraszewski committed Jan 14, 2025
1 parent bd669e0 commit 8bc0851
Showing 1 changed file with 56 additions and 24 deletions.
80 changes: 56 additions & 24 deletions sqlitecluster/SQLiteNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ void SQLiteNode::_replicate() {
SQLiteScopedHandle dbScope(*_dbPool, _dbPool->getIndex(false));
SQLite& db = dbScope.db();

bool skipNext = false;
uint64_t commitNumber = 0;

while (true) {
unique_lock<mutex> lock(_replicateMutex);
while (!_replicateThreadShouldExit && _replicateQueue.empty()) {
Expand All @@ -187,33 +190,60 @@ void SQLiteNode::_replicate() {
_replicateQueue.pop();
uint64_t dequeueTime = STimeNow();

if (skipNext) {
skipNext = false;
continue;
}

if (SIEquals(command.methodLine, "BEGIN_TRANSACTION")) {
uint64_t messageCommitCount = command.calcU64("newCount");
uint64_t myCommitCount = db.getCommitCount();
if (myCommitCount >= messageCommitCount) {
SALERT("Got BEGIN_TRANSACTION for commit " << messageCommitCount << " but have commit " << myCommitCount);
skipNext = true;
continue;
}
}

try {
if (SIEquals(command.methodLine, "BEGIN_TRANSACTION")) {
auto start = chrono::steady_clock::now();
_handleBeginTransaction(db, peer, command);
_handlePrepareTransaction(db, peer, command, dequeueTime);
auto duration = chrono::steady_clock::now() - start;
SINFO("[performance] Wrote replicate transaction in " << chrono::duration_cast<chrono::microseconds>(duration).count() << "us.");
}
commitNumber = command.calcU64("newCount");
} catch (const SQLite::constraint_error& e) {
// A contraints error can happen in a situation where we're forked because we can try and insert the same row
// twice, violating the uniqueness of the key. This happens *before* we compute the hash of the entire trandsaction,
// so we see the constraint_error first.
SALERT("constraint_error in begin/prepare. CommitCount:" << db.getCommitCount() << ", message: " << command.serialize());
db.rollback();
}

bool shouldGoSearchingAndExit = false;
{
try {
if (SIEquals(command.methodLine, "BEGIN_TRANSACTION")) {
auto start = chrono::steady_clock::now();
_handleBeginTransaction(db, peer, command);
_handlePrepareTransaction(db, peer, command, dequeueTime);
auto duration = chrono::steady_clock::now() - start;
SINFO("[performance] Wrote replicate transaction in " << chrono::duration_cast<chrono::microseconds>(duration).count() << "us.");
} else if (SIEquals(command.methodLine, "COMMIT_TRANSACTION")) {
int result = _handleCommitTransaction(db, peer, command.calcU64("NewCount"), command["NewHash"]);
if (result != SQLITE_OK) {
STHROW("commit failed");
}
} else if (SIEquals(command.methodLine, "ROLLBACK_TRANSACTION")) {
// `decrementer` needs to be destroyed to decrement our thread count before we can change state out of
// FOLLOWING.
_handleRollbackTransaction(db, peer, command);
shouldGoSearchingAndExit = true;
}
} catch (const SException& e) {
SALERT("Caught exception in replication thread. Assuming this means we want to stop following. Exception: " << e.what());
try {
if (commitNumber != command.calcU64("newCount")) {
SALERT("Instructed to commit transaction: " << command.calcU64("newCount") << " but expected " << commitNumber);
}
if (SIEquals(command.methodLine, "COMMIT_TRANSACTION")) {
int result = _handleCommitTransaction(db, peer, command.calcU64("NewCount"), command["NewHash"]);
if (result != SQLITE_OK) {
STHROW("commit failed");
}
} else if (SIEquals(command.methodLine, "ROLLBACK_TRANSACTION")) {
_handleRollbackTransaction(db, peer, command);
shouldGoSearchingAndExit = true;
db.rollback();
}

} catch (const SException& e) {
SALERT("Caught SException in replication thread. Assuming this means we want to stop following. Exception: " << e.what());
shouldGoSearchingAndExit = true;
db.rollback();
}
if (shouldGoSearchingAndExit) {
SALERT("Attempted to handle " << command.serialize() << " but got error. Will give up and go SEARCHING");
// We can lock here for this state change because we're in our own thread, and this won't be recursive with
// the calling thread. This is also a really weird exception case that should never happen, so the performance
// implications aren't significant so long as we don't break.
Expand Down Expand Up @@ -1812,8 +1842,6 @@ void SQLiteNode::_sendToAllPeers(const SData& message, bool subscribedOnly) {
}

void SQLiteNode::_changeState(SQLiteNodeState newState, uint64_t commitIDToCancelAfter) {
SINFO("[NOTIFY] setting commit count to: " << _db.getCommitCount());

if (newState != _state) {
// First, we notify all plugins about the state change
_server.notifyStateChangeToPlugins(*pluginDB, newState);
Expand All @@ -1826,6 +1854,10 @@ void SQLiteNode::_changeState(SQLiteNodeState newState, uint64_t commitIDToCance
_replicateThread->join();
delete _replicateThread;
_replicateThread = nullptr;
while (_replicateQueue.size()) {
SALERT("Discarding: " << _replicateQueue.front().second.methodLine);
_replicateQueue.pop();
}
_replicateThreadShouldExit = false;
}

Expand Down

0 comments on commit 8bc0851

Please sign in to comment.