From 95780b458995b32e75d8256faffe94838510e46e Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Mon, 13 Jan 2025 14:52:06 -0800 Subject: [PATCH] Fix fork when COMMIT_TRANSACTION is dropped --- sqlitecluster/SQLiteNode.cpp | 53 ++++++++++++++++++++++++++---------- sqlitecluster/SQLiteNode.h | 2 ++ sqlitecluster/SQLitePeer.cpp | 3 +- sqlitecluster/SQLitePeer.h | 1 + 4 files changed, 43 insertions(+), 16 deletions(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 047a0fd78..d0c4e0427 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -2563,6 +2563,37 @@ STCPManager::Socket* SQLiteNode::_acceptSocket() { return socket; } +void SQLiteNode::_processPeerMessages(uint64_t& nextActivity, SQLitePeer* peer, bool unlimited) { + try { + size_t messagesDeqeued = 0; + if (unlimited) { + if (peer->socket) { + string recvBuffer = peer->socket->recvBuffer.c_str(); + if (recvBuffer.size()) { + SINFO("TYLER: peer recv buffer " << peer->socket->recvBuffer); + } + } else { + SINFO("TYLER: no socket"); + } + } + while (true) { + SData message = peer->popMessage(); + _onMESSAGE(peer, message); + if (unlimited) { + SINFO("TYLER: processed message with unlimited set " << message.methodLine); + } + messagesDeqeued++; + if (messagesDeqeued >= 100 && !unlimited) { + // We should run again immediately, we have more to do. + nextActivity = STimeNow(); + break; + } + } + } catch (const out_of_range& e) { + // Ok, just no messages. + } +} + void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) { unique_lock uniqueLock(_stateMutex); @@ -2633,15 +2664,18 @@ void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) { // Now check established peer connections. for (SQLitePeer* peer : _peerList) { auto result = peer->postPoll(fdm, nextActivity); + switch (result) { case SQLitePeer::PeerPostPollStatus::JUST_CONNECTED: { _onConnect(peer); _sendPING(peer); + _processPeerMessages(nextActivity, peer); } break; case SQLitePeer::PeerPostPollStatus::SOCKET_ERROR: { + _processPeerMessages(nextActivity, peer, true); SData reconnect("RECONNECT"); reconnect["Reason"] = "socket error"; _sendToPeer(peer, reconnect); @@ -2650,6 +2684,8 @@ void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) { break; case SQLitePeer::PeerPostPollStatus::SOCKET_CLOSED: { + _processPeerMessages(nextActivity, peer, true); + peer->reset(); _onDisconnect(peer); } break; @@ -2666,21 +2702,8 @@ void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) { _sendPING(peer); } } - try { - size_t messagesDeqeued = 0; - while (true) { - SData message = peer->popMessage(); - _onMESSAGE(peer, message); - messagesDeqeued++; - if (messagesDeqeued >= 100) { - // We should run again immediately, we have more to do. - nextActivity = STimeNow(); - break; - } - } - } catch (const out_of_range& e) { - // Ok, just no messages. - } + + _processPeerMessages(nextActivity, peer); } break; } diff --git a/sqlitecluster/SQLiteNode.h b/sqlitecluster/SQLiteNode.h index 6377c9511..5cb8365b8 100644 --- a/sqlitecluster/SQLiteNode.h +++ b/sqlitecluster/SQLiteNode.h @@ -278,6 +278,8 @@ class SQLiteNode : public STCPManager { void _dieIfForkedFromCluster(); + void _processPeerMessages(uint64_t& nextActivity, SQLitePeer* peer, bool unlimited = false); + const string _commandAddress; const string _name; const vector _peerList; diff --git a/sqlitecluster/SQLitePeer.cpp b/sqlitecluster/SQLitePeer.cpp index a9636ca1d..0053f8fc2 100644 --- a/sqlitecluster/SQLitePeer.cpp +++ b/sqlitecluster/SQLitePeer.cpp @@ -121,7 +121,8 @@ SQLitePeer::PeerPostPollStatus SQLitePeer::postPoll(fd_map& fdm, uint64_t& nextA } else { SHMMM("Lost peer connection after " << (STimeNow() - socket->openTime) / 1000 << "ms, reconnecting in " << delay / 1000 << "ms"); } - reset(); + // Can't reset until the buffer is cleared. + // reset(); nextReconnect = STimeNow() + delay; nextActivity = min(nextActivity, nextReconnect.load()); return PeerPostPollStatus::SOCKET_CLOSED; diff --git a/sqlitecluster/SQLitePeer.h b/sqlitecluster/SQLitePeer.h index 48bc3433c..7f98a8734 100644 --- a/sqlitecluster/SQLitePeer.h +++ b/sqlitecluster/SQLitePeer.h @@ -104,6 +104,7 @@ class SQLitePeer { mutable recursive_mutex peerMutex; // Not named with an underscore because it's only sort-of private (see friend class declaration above). + public: STCPManager::Socket* socket = nullptr; };