Skip to content

Commit

Permalink
Merge branch 'main' into dsilva_preventSeveralCrashesFromHappening
Browse files Browse the repository at this point in the history
  • Loading branch information
danieldoglas committed Jan 12, 2025
2 parents 137df6b + 4412c73 commit abb9d81
Show file tree
Hide file tree
Showing 29 changed files with 871 additions and 622 deletions.
20 changes: 11 additions & 9 deletions BedrockCore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ uint64_t BedrockCore::_getRemainingTime(const unique_ptr<BedrockCommand>& comman
return isProcessing ? min(processTimeout, adjustedTimeout) : adjustedTimeout;
}

bool BedrockCore::isTimedOut(unique_ptr<BedrockCommand>& command) {
bool BedrockCore::isTimedOut(unique_ptr<BedrockCommand>& command, SQLite* db, const BedrockServer* server) {
try {
_getRemainingTime(command, false);
} catch (const SException& e) {
// Yep, timed out.
_handleCommandException(command, e);
_handleCommandException(command, e, db, server);
command->complete = true;
return true;
}
Expand Down Expand Up @@ -104,7 +104,7 @@ void BedrockCore::prePeekCommand(unique_ptr<BedrockCommand>& command, bool isBlo
STHROW("555 Timeout prePeeking command");
}
} catch (const SException& e) {
_handleCommandException(command, e);
_handleCommandException(command, e, &_db, &_server);
command->complete = true;
} catch (...) {
SALERT("Unhandled exception typename: " << SGetCurrentExceptionName() << ", command: " << request.methodLine);
Expand Down Expand Up @@ -186,7 +186,7 @@ BedrockCore::RESULT BedrockCore::peekCommand(unique_ptr<BedrockCommand>& command
}
} catch (const SException& e) {
command->repeek = false;
_handleCommandException(command, e);
_handleCommandException(command, e, &_db, &_server);
} catch (const SHTTPSManager::NotLeading& e) {
command->repeek = false;
returnValue = RESULT::SHOULD_PROCESS;
Expand Down Expand Up @@ -284,7 +284,7 @@ BedrockCore::RESULT BedrockCore::processCommand(unique_ptr<BedrockCommand>& comm
}
}
} catch (const SException& e) {
_handleCommandException(command, e);
_handleCommandException(command, e, &_db, &_server);
_db.rollback();
needsCommit = false;
} catch (const SQLite::constraint_error& e) {
Expand Down Expand Up @@ -353,7 +353,7 @@ void BedrockCore::postProcessCommand(unique_ptr<BedrockCommand>& command, bool i
STHROW("555 Timeout postProcessing command");
}
} catch (const SException& e) {
_handleCommandException(command, e);
_handleCommandException(command, e, &_db, &_server);
} catch (...) {
SALERT("Unhandled exception typename: " << SGetCurrentExceptionName() << ", command: " << request.methodLine);
command->response.methodLine = "500 Unhandled Exception";
Expand All @@ -367,7 +367,7 @@ void BedrockCore::postProcessCommand(unique_ptr<BedrockCommand>& command, bool i
_db.setQueryOnly(false);
}

void BedrockCore::_handleCommandException(unique_ptr<BedrockCommand>& command, const SException& e) {
void BedrockCore::_handleCommandException(unique_ptr<BedrockCommand>& command, const SException& e, SQLite* db, const BedrockServer* server) {
string msg = "Error processing command '" + command->request.methodLine + "' (" + e.what() + "), ignoring.";
if (!e.body.empty()) {
msg = msg + " Request body: " + e.body;
Expand Down Expand Up @@ -396,9 +396,11 @@ void BedrockCore::_handleCommandException(unique_ptr<BedrockCommand>& command, c
}

// Add the commitCount header to the response.
command->response["commitCount"] = to_string(_db.getCommitCount());
if (db) {
command->response["commitCount"] = to_string(db->getCommitCount());
}

if (_server.args.isSet("-extraExceptionLogging")) {
if (server && server->args.isSet("-extraExceptionLogging")) {
auto stack = e.details();
command->response["exceptionSource"] = stack.back();
}
Expand Down
6 changes: 3 additions & 3 deletions BedrockCore.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class BedrockCore : public SQLiteCore {
// Checks if a command has already timed out. Like `peekCommand` without doing any work. Returns `true` and sets
// the same command state as `peekCommand` would if the command has timed out. Returns `false` and does nothing if
// the command hasn't timed out.
bool isTimedOut(unique_ptr<BedrockCommand>& command);
static bool isTimedOut(unique_ptr<BedrockCommand>& command, SQLite* db = nullptr, const BedrockServer* server = nullptr);

void prePeekCommand(unique_ptr<BedrockCommand>& command, bool isBlockingCommitThread);

Expand Down Expand Up @@ -71,8 +71,8 @@ class BedrockCore : public SQLiteCore {
// Gets the amount of time remaining until this command times out. This is the difference between the command's
// 'timeout' value (or the default timeout, if not set) and the time the command was initially scheduled to run. If
// this time is already expired, this throws `555 Timeout`
uint64_t _getRemainingTime(const unique_ptr<BedrockCommand>& command, bool isProcessing);
static uint64_t _getRemainingTime(const unique_ptr<BedrockCommand>& command, bool isProcessing);

void _handleCommandException(unique_ptr<BedrockCommand>& command, const SException& e);
static void _handleCommandException(unique_ptr<BedrockCommand>& command, const SException& e, SQLite* db = nullptr, const BedrockServer* server = nullptr);
const BedrockServer& _server;
};
43 changes: 22 additions & 21 deletions BedrockServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ void BedrockServer::sync()
// We use fewer FDs on test machines that have other resource restrictions in place.

SINFO("Setting dbPool size to: " << _dbPoolSize);
_dbPool = make_shared<SQLitePool>(_dbPoolSize, args["-db"], args.calc("-cacheSize"), args.calc("-maxJournalSize"), journalTables, mmapSizeGB, args.isSet("-hctree"));
_dbPool = make_shared<SQLitePool>(_dbPoolSize, args["-db"], args.calc("-cacheSize"), args.calc("-maxJournalSize"), journalTables, mmapSizeGB, args.isSet("-hctree"), args["-checkpointMode"]);
SQLite& db = _dbPool->getBase();

// Initialize the command processor.
Expand Down Expand Up @@ -213,8 +213,9 @@ void BedrockServer::sync()
// we're leading, then the next update() loop will set us to standing down, and then we won't accept any new
// commands, and we'll shortly run through the existing queue.
if (_shutdownState.load() == COMMANDS_FINISHED) {
SINFO("All clients responded to, " << BedrockCommand::getCommandCount() << " commands remaining. Shutting down sync node.");
SINFO("All clients responded to, " << BedrockCommand::getCommandCount() << " commands remaining.");
if (_syncNode->beginShutdown()) {
SINFO("Beginning shuttdown of sync node.");
// This will cause us to skip the next `poll` iteration which avoids a 1 second wait.
_notifyDoneSync.push(true);
}
Expand Down Expand Up @@ -357,7 +358,7 @@ void BedrockServer::sync()
committingCommand = true;
_syncNode->startCommit(SQLiteNode::QUORUM);
_lastQuorumCommandTime = STimeNow();

// This interrupts the next poll loop immediately. This prevents a 1-second wait when running as a single server.
_notifyDoneSync.push(true);
SDEBUG("Finished sending distributed transaction for db upgrade.");
Expand Down Expand Up @@ -800,6 +801,14 @@ void BedrockServer::runCommand(unique_ptr<BedrockCommand>&& _command, bool isBlo
// We just spin until the node looks ready to go. Typically, this doesn't happen expect briefly at startup.
size_t waitCount = 0;
while (_upgradeInProgress || (getState() != SQLiteNodeState::LEADING && getState() != SQLiteNodeState::FOLLOWING)) {

// It's feasible that our command times out in this loop. In this case, we do not have a DB object to pass.
// The only implication of this is the response does not get the commitCount attached to it.
if (BedrockCore::isTimedOut(command, nullptr, this)) {
_reply(command);
return;
}

// This sleep call is pretty ugly, but it should almost never happen. We're accepting the potential
// looping sleep call for the general case where we just check some bools and continue, instead of
// avoiding the sleep call but having every thread lock a mutex here on every loop.
Expand Down Expand Up @@ -879,7 +888,7 @@ void BedrockServer::runCommand(unique_ptr<BedrockCommand>&& _command, bool isBlo
// to be returned to the main queue, where they would have timed out in `peek`, but it was never called
// because the commands already had a HTTPS request attached, and then they were immediately re-sent to the
// sync queue, because of the QUORUM consistency requirement, resulting in an endless loop.
if (core.isTimedOut(command)) {
if (core.isTimedOut(command, &db, this)) {
_reply(command);
return;
}
Expand Down Expand Up @@ -1700,14 +1709,14 @@ void BedrockServer::_status(unique_ptr<BedrockCommand>& command) {
size_t totalCount = 0;
for (const auto& s : _crashCommands) {
totalCount += s.second.size();

vector<string> paramsArray;
for (const STable& params : s.second) {
if (!params.empty()) {
paramsArray.push_back(SComposeJSONObject(params));
}
}

STable commandObject;
commandObject[s.first] = SComposeJSONArray(paramsArray);
crashCommandListArray.push_back(SComposeJSONObject(commandObject));
Expand Down Expand Up @@ -1805,7 +1814,6 @@ bool BedrockServer::_isControlCommand(const unique_ptr<BedrockCommand>& command)
SIEquals(command->request.methodLine, "EnableSQLTracing") ||
SIEquals(command->request.methodLine, "BlockWrites") ||
SIEquals(command->request.methodLine, "UnblockWrites") ||
SIEquals(command->request.methodLine, "SetMaxPeerFallBehind") ||
SIEquals(command->request.methodLine, "SetMaxSocketThreads") ||
SIEquals(command->request.methodLine, "CRASH_COMMAND")
) {
Expand All @@ -1829,6 +1837,7 @@ atomic<bool> __quiesceShouldUnlock(false);
thread* __quiesceThread = nullptr;

void BedrockServer::_control(unique_ptr<BedrockCommand>& command) {
SINFO("Received control command: " << command->request.methodLine);
SData& response = command->response;
string reason = "MANUAL";
response.methodLine = "200 OK";
Expand Down Expand Up @@ -1919,7 +1928,9 @@ void BedrockServer::_control(unique_ptr<BedrockCommand>& command) {
if (dbPoolCopy) {
SQLiteScopedHandle dbScope(*_dbPool, _dbPool->getIndex());
SQLite& db = dbScope.db();
SINFO("[quiesce] Exclusive locking DB");
db.exclusiveLockDB();
SINFO("[quiesce] Exclusive locked DB");
locked = true;
while (true) {
if (__quiesceShouldUnlock) {
Expand All @@ -1942,12 +1953,16 @@ void BedrockServer::_control(unique_ptr<BedrockCommand>& command) {
response.methodLine = "200 Blocked";
}
} else if (SIEquals(command->request.methodLine, "UnblockWrites")) {
SINFO("[quiesce] Locking __quiesceLock");
lock_guard lock(__quiesceLock);
SINFO("[quiesce] __quiesceLock locked");
if (!__quiesceThread) {
response.methodLine = "200 Not Blocked";
} else {
__quiesceShouldUnlock = true;
SINFO("[quiesce] Joining __quiesceThread");
__quiesceThread->join();
SINFO("[quiesce] __quiesceThread joined");
delete __quiesceThread;
__quiesceThread = nullptr;
response.methodLine = "200 Unblocked";
Expand All @@ -1960,20 +1975,6 @@ void BedrockServer::_control(unique_ptr<BedrockCommand>& command) {
} else {
response.methodLine = "401 Don't Use Zero";
}
} else if (SIEquals(command->request.methodLine, "SetMaxPeerFallBehind")) {
// Look up the existing value so we can report what it was.
uint64_t existingValue = SQLiteNode::MAX_PEER_FALL_BEHIND;
response["previousValue"] = to_string(existingValue);

uint64_t newValue = command->request.calcU64("value");
if (newValue < SQLiteNode::MIN_APPROVE_FREQUENCY) {
// We won't break everything on purpose. This can be used to check the existing value without changing anything by passing `0`.
response.methodLine = "400 Refusing to set peer fall behind below " + to_string(SQLiteNode::MIN_APPROVE_FREQUENCY);
} else {
// Set the new value and return 200 OK.
SQLiteNode::MAX_PEER_FALL_BEHIND = newValue;
response["previousValue"] = to_string(existingValue);
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions BedrockServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ class BedrockServer : public SQLiteServer {
void onNodeLogin(SQLitePeer* peer) override;

// You must block and unblock the command port with *identical strings*.
void blockCommandPort(const string& reason);
void unblockCommandPort(const string& reason);
void blockCommandPort(const string& reason) override;
void unblockCommandPort(const string& reason) override;

// Legacy version of above.
void suppressCommandPort(const string& reason, bool suppress, bool manualOverride = false);
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ INCLUDE = -I$(PROJECT) -I$(PROJECT)/mbedtls/include
CXXFLAGS = -g -std=c++20 -fPIC -DSQLITE_ENABLE_NORMALIZE $(BEDROCK_OPTIM_COMPILE_FLAG) -Wall -Werror -Wformat-security -Wno-unqualified-std-cast-call -Wno-error=deprecated-declarations $(INCLUDE)

# Amalgamation flags
AMALGAMATION_FLAGS = -Wno-unused-but-set-variable -DSQLITE_ENABLE_FTS5 -DSQLITE_ENABLE_STAT4 -DSQLITE_ENABLE_JSON1 -DSQLITE_ENABLE_SESSION -DSQLITE_ENABLE_PREUPDATE_HOOK -DSQLITE_ENABLE_UPDATE_DELETE_LIMIT -DSQLITE_ENABLE_NOOP_UPDATE -DSQLITE_MUTEX_ALERT_MILLISECONDS=20 -DHAVE_USLEEP=1 -DSQLITE_MAX_MMAP_SIZE=17592186044416ull -DSQLITE_SHARED_MAPPING -DSQLITE_ENABLE_NORMALIZE -DSQLITE_MAX_PAGE_COUNT=4294967294 -DSQLITE_DISABLE_PAGECACHE_OVERFLOW_STATS
AMALGAMATION_FLAGS = -Wno-unused-but-set-variable -DSQLITE_ENABLE_FTS5 -DSQLITE_ENABLE_STAT4 -DSQLITE_ENABLE_JSON1 -DSQLITE_ENABLE_SESSION -DSQLITE_ENABLE_PREUPDATE_HOOK -DSQLITE_ENABLE_UPDATE_DELETE_LIMIT -DSQLITE_ENABLE_NOOP_UPDATE -DSQLITE_MUTEX_ALERT_MILLISECONDS=20 -DHAVE_USLEEP=1 -DSQLITE_MAX_MMAP_SIZE=17592186044416ull -DSQLITE_SHARED_MAPPING -DSQLITE_ENABLE_NORMALIZE -DSQLITE_MAX_PAGE_COUNT=4294967294 -DSQLITE_DISABLE_PAGECACHE_OVERFLOW_STATS -DSQLITE_DEFAULT_CACHE_SIZE=-51200

# All our intermediate, dependency, object, etc files get hidden in here.
INTERMEDIATEDIR = .build
Expand Down
8 changes: 8 additions & 0 deletions libstuff/SLog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,17 @@ static set<string> PARAMS_WHITELIST = {
"indexName",
"isUnique",
"logParam",
"message",
"peer",
"reason",
"requestID",
"status",
"userID",
"policyID",
"employeeEmail",
"approver",
"approvers",
"employees",
};

string addLogParams(string&& message, const STable& params) {
Expand Down
Loading

0 comments on commit abb9d81

Please sign in to comment.