diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 047a0fd78..87edfd469 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -2563,6 +2563,24 @@ STCPManager::Socket* SQLiteNode::_acceptSocket() { return socket; } +void SQLiteNode::_processPeerMessages(uint64_t& nextActivity, SQLitePeer* peer, bool unlimited) { + try { + size_t messagesDeqeued = 0; + while (true) { + SData message = peer->popMessage(); + _onMESSAGE(peer, message); + messagesDeqeued++; + if (messagesDeqeued >= 100 && !unlimited) { + // We should run again immediately, we have more to do. + nextActivity = STimeNow(); + break; + } + } + } catch (const out_of_range& e) { + // Ok, just no messages. + } +} + void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) { unique_lock uniqueLock(_stateMutex); @@ -2638,10 +2656,12 @@ void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) { { _onConnect(peer); _sendPING(peer); + _processPeerMessages(nextActivity, peer); } break; case SQLitePeer::PeerPostPollStatus::SOCKET_ERROR: { + _processPeerMessages(nextActivity, peer, true); SData reconnect("RECONNECT"); reconnect["Reason"] = "socket error"; _sendToPeer(peer, reconnect); @@ -2650,6 +2670,8 @@ void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) { break; case SQLitePeer::PeerPostPollStatus::SOCKET_CLOSED: { + _processPeerMessages(nextActivity, peer, true); + peer->reset(); _onDisconnect(peer); } break; @@ -2666,21 +2688,8 @@ void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) { _sendPING(peer); } } - try { - size_t messagesDeqeued = 0; - while (true) { - SData message = peer->popMessage(); - _onMESSAGE(peer, message); - messagesDeqeued++; - if (messagesDeqeued >= 100) { - // We should run again immediately, we have more to do. - nextActivity = STimeNow(); - break; - } - } - } catch (const out_of_range& e) { - // Ok, just no messages. - } + + _processPeerMessages(nextActivity, peer); } break; } diff --git a/sqlitecluster/SQLiteNode.h b/sqlitecluster/SQLiteNode.h index 6377c9511..5cb8365b8 100644 --- a/sqlitecluster/SQLiteNode.h +++ b/sqlitecluster/SQLiteNode.h @@ -278,6 +278,8 @@ class SQLiteNode : public STCPManager { void _dieIfForkedFromCluster(); + void _processPeerMessages(uint64_t& nextActivity, SQLitePeer* peer, bool unlimited = false); + const string _commandAddress; const string _name; const vector _peerList; diff --git a/sqlitecluster/SQLitePeer.cpp b/sqlitecluster/SQLitePeer.cpp index a9636ca1d..abc50b1d2 100644 --- a/sqlitecluster/SQLitePeer.cpp +++ b/sqlitecluster/SQLitePeer.cpp @@ -121,7 +121,6 @@ SQLitePeer::PeerPostPollStatus SQLitePeer::postPoll(fd_map& fdm, uint64_t& nextA } else { SHMMM("Lost peer connection after " << (STimeNow() - socket->openTime) / 1000 << "ms, reconnecting in " << delay / 1000 << "ms"); } - reset(); nextReconnect = STimeNow() + delay; nextActivity = min(nextActivity, nextReconnect.load()); return PeerPostPollStatus::SOCKET_CLOSED; diff --git a/sqlitecluster/SQLitePeer.h b/sqlitecluster/SQLitePeer.h index 48bc3433c..a11d5a075 100644 --- a/sqlitecluster/SQLitePeer.h +++ b/sqlitecluster/SQLitePeer.h @@ -45,6 +45,8 @@ class SQLitePeer { // If there are no messages, throws `std::out_of_range`. SData popMessage(); + // NOTE: If this returns PeerPostPollStatus::SOCKET_CLOSED then the caller must call `reset` on this peer. + // This is not done internally becuase we need to expose the outstanding data on the socket before deleting it. PeerPostPollStatus postPoll(fd_map& fdm, uint64_t& nextActivity); // Send a message to this peer. Thread-safe.