Skip to content

Commit

Permalink
Fix fork when COMMIT_TRANSACTION is dropped
Browse files Browse the repository at this point in the history
  • Loading branch information
tylerkaraszewski committed Jan 13, 2025
1 parent a88cf54 commit 95780b4
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 16 deletions.
53 changes: 38 additions & 15 deletions sqlitecluster/SQLiteNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2563,6 +2563,37 @@ STCPManager::Socket* SQLiteNode::_acceptSocket() {
return socket;
}

void SQLiteNode::_processPeerMessages(uint64_t& nextActivity, SQLitePeer* peer, bool unlimited) {
try {
size_t messagesDeqeued = 0;
if (unlimited) {
if (peer->socket) {
string recvBuffer = peer->socket->recvBuffer.c_str();
if (recvBuffer.size()) {
SINFO("TYLER: peer recv buffer " << peer->socket->recvBuffer);
}
} else {
SINFO("TYLER: no socket");
}
}
while (true) {
SData message = peer->popMessage();
_onMESSAGE(peer, message);
if (unlimited) {
SINFO("TYLER: processed message with unlimited set " << message.methodLine);
}
messagesDeqeued++;
if (messagesDeqeued >= 100 && !unlimited) {
// We should run again immediately, we have more to do.
nextActivity = STimeNow();
break;
}
}
} catch (const out_of_range& e) {
// Ok, just no messages.
}
}

void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) {
unique_lock<decltype(_stateMutex)> uniqueLock(_stateMutex);

Expand Down Expand Up @@ -2633,15 +2664,18 @@ void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) {
// Now check established peer connections.
for (SQLitePeer* peer : _peerList) {
auto result = peer->postPoll(fdm, nextActivity);

switch (result) {
case SQLitePeer::PeerPostPollStatus::JUST_CONNECTED:
{
_onConnect(peer);
_sendPING(peer);
_processPeerMessages(nextActivity, peer);
}
break;
case SQLitePeer::PeerPostPollStatus::SOCKET_ERROR:
{
_processPeerMessages(nextActivity, peer, true);
SData reconnect("RECONNECT");
reconnect["Reason"] = "socket error";
_sendToPeer(peer, reconnect);
Expand All @@ -2650,6 +2684,8 @@ void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) {
break;
case SQLitePeer::PeerPostPollStatus::SOCKET_CLOSED:
{
_processPeerMessages(nextActivity, peer, true);
peer->reset();
_onDisconnect(peer);
}
break;
Expand All @@ -2666,21 +2702,8 @@ void SQLiteNode::postPoll(fd_map& fdm, uint64_t& nextActivity) {
_sendPING(peer);
}
}
try {
size_t messagesDeqeued = 0;
while (true) {
SData message = peer->popMessage();
_onMESSAGE(peer, message);
messagesDeqeued++;
if (messagesDeqeued >= 100) {
// We should run again immediately, we have more to do.
nextActivity = STimeNow();
break;
}
}
} catch (const out_of_range& e) {
// Ok, just no messages.
}

_processPeerMessages(nextActivity, peer);
}
break;
}
Expand Down
2 changes: 2 additions & 0 deletions sqlitecluster/SQLiteNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,8 @@ class SQLiteNode : public STCPManager {

void _dieIfForkedFromCluster();

void _processPeerMessages(uint64_t& nextActivity, SQLitePeer* peer, bool unlimited = false);

const string _commandAddress;
const string _name;
const vector<SQLitePeer*> _peerList;
Expand Down
3 changes: 2 additions & 1 deletion sqlitecluster/SQLitePeer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ SQLitePeer::PeerPostPollStatus SQLitePeer::postPoll(fd_map& fdm, uint64_t& nextA
} else {
SHMMM("Lost peer connection after " << (STimeNow() - socket->openTime) / 1000 << "ms, reconnecting in " << delay / 1000 << "ms");
}
reset();
// Can't reset until the buffer is cleared.
// reset();
nextReconnect = STimeNow() + delay;
nextActivity = min(nextActivity, nextReconnect.load());
return PeerPostPollStatus::SOCKET_CLOSED;
Expand Down
1 change: 1 addition & 0 deletions sqlitecluster/SQLitePeer.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ class SQLitePeer {
mutable recursive_mutex peerMutex;

// Not named with an underscore because it's only sort-of private (see friend class declaration above).
public:
STCPManager::Socket* socket = nullptr;
};

Expand Down

0 comments on commit 95780b4

Please sign in to comment.