diff --git a/src/budget/budgetmanager.cpp b/src/budget/budgetmanager.cpp index 317b5b8f3ab9..8fe2d10983a7 100644 --- a/src/budget/budgetmanager.cpp +++ b/src/budget/budgetmanager.cpp @@ -5,14 +5,16 @@ #include "budget/budgetmanager.h" +#include "addrman.h" +#include "chainparams.h" #include "consensus/validation.h" #include "evo/deterministicmns.h" #include "masternodeman.h" #include "netmessagemaker.h" -#include "tiertwo/tiertwo_sync_state.h" #include "tiertwo/netfulfilledman.h" +#include "tiertwo/tiertwo_sync_state.h" #include "util/validation.h" -#include "validation.h" // GetTransaction, cs_main +#include "validation.h" // GetTransaction, cs_main #ifdef ENABLE_WALLET #include "wallet/wallet.h" // future: use interface instead. @@ -28,6 +30,8 @@ CBudgetManager g_budgetman; // Used to check both proposals and finalized-budgets collateral txes bool CheckCollateral(const uint256& nTxCollateralHash, const uint256& nExpectedHash, std::string& strError, int64_t& nTime, int nCurrentHeight, bool fBudgetFinalization); +void EraseObjectRequest(NodeId nodeId, const CInv& inv); + void CBudgetManager::ReloadMapSeen() { const auto reloadSeenMap = [](auto& mutex1, auto& mutex2, const auto& mapBudgets, auto& mapSeen, auto& mapOrphans) { @@ -1306,7 +1310,7 @@ int CBudgetManager::ProcessMessageInner(CNode* pfrom, std::string& strCommand, C { // Clear inv request LOCK(cs_main); - g_connman->RemoveAskFor(proposal.GetHash(), MSG_BUDGET_PROPOSAL); + EraseObjectRequest(pfrom->GetId(), CInv(MSG_BUDGET_PROPOSAL, proposal.GetHash())); } return ProcessProposal(proposal); } @@ -1319,7 +1323,7 @@ int CBudgetManager::ProcessMessageInner(CNode* pfrom, std::string& strCommand, C { // Clear inv request LOCK(cs_main); - g_connman->RemoveAskFor(vote.GetHash(), MSG_BUDGET_VOTE); + EraseObjectRequest(pfrom->GetId(), CInv(MSG_BUDGET_VOTE, vote.GetHash())); } CValidationState state; @@ -1342,7 +1346,7 @@ int CBudgetManager::ProcessMessageInner(CNode* pfrom, std::string& strCommand, C { // Clear inv request LOCK(cs_main); - g_connman->RemoveAskFor(finalbudget.GetHash(), MSG_BUDGET_FINALIZED); + EraseObjectRequest(pfrom->GetId(), CInv(MSG_BUDGET_FINALIZED, finalbudget.GetHash())); } return ProcessFinalizedBudget(finalbudget, pfrom); } @@ -1355,7 +1359,7 @@ int CBudgetManager::ProcessMessageInner(CNode* pfrom, std::string& strCommand, C { // Clear inv request LOCK(cs_main); - g_connman->RemoveAskFor(vote.GetHash(), MSG_BUDGET_FINALIZED_VOTE); + EraseObjectRequest(pfrom->GetId(), CInv(MSG_BUDGET_FINALIZED_VOTE, vote.GetHash())); } CValidationState state; diff --git a/src/llmq/quorums_blockprocessor.cpp b/src/llmq/quorums_blockprocessor.cpp index 000ba8e752e0..4665df91183f 100644 --- a/src/llmq/quorums_blockprocessor.cpp +++ b/src/llmq/quorums_blockprocessor.cpp @@ -18,6 +18,8 @@ #include "spork.h" #include "validation.h" +void EraseObjectRequest(NodeId nodeId, const CInv& inv); + namespace llmq { std::unique_ptr quorumBlockProcessor{nullptr}; @@ -52,7 +54,7 @@ void CQuorumBlockProcessor::ProcessMessage(CNode* pfrom, CDataStream& vRecv, int uint256 qfc_hash{::SerializeHash(qc)}; { LOCK(cs_main); - g_connman->RemoveAskFor(qfc_hash, MSG_QUORUM_FINAL_COMMITMENT); + EraseObjectRequest(pfrom->GetId(), CInv(MSG_QUORUM_FINAL_COMMITMENT, qfc_hash)); } if (qc.IsNull()) { diff --git a/src/llmq/quorums_chainlocks.cpp b/src/llmq/quorums_chainlocks.cpp index 3f627df2a627..50b4cbb6d89e 100644 --- a/src/llmq/quorums_chainlocks.cpp +++ b/src/llmq/quorums_chainlocks.cpp @@ -103,7 +103,7 @@ void CChainLocksHandler::ProcessNewChainLock(NodeId from, const llmq::CChainLock { { LOCK(cs_main); - g_connman->RemoveAskFor(hash, MSG_CLSIG); + EraseObjectRequest(from, CInv(MSG_CLSIG, hash)); } { @@ -484,4 +484,5 @@ void CChainLocksHandler::Cleanup() lastCleanupTime = GetTimeMillis(); } -} +} // namespace llmq + diff --git a/src/llmq/quorums_dkgsessionhandler.cpp b/src/llmq/quorums_dkgsessionhandler.cpp index 9efca8a0dfc4..93c057ceeac3 100644 --- a/src/llmq/quorums_dkgsessionhandler.cpp +++ b/src/llmq/quorums_dkgsessionhandler.cpp @@ -45,7 +45,7 @@ void CDKGPendingMessages::PushPendingMessage(NodeId from, CDataStream& vRecv, in LOCK2(cs_main, cs); - g_connman->RemoveAskFor(hash, invType); + EraseObjectRequest(from, CInv(invType, hash)); if (!seenMessages.emplace(hash).second) { LogPrint(BCLog::NET, "CDKGPendingMessages::%s -- already seen %s, peer=%d\n", __func__, hash.ToString(), from); diff --git a/src/llmq/quorums_signing.cpp b/src/llmq/quorums_signing.cpp index ec8ac8e0a26e..55313bd886a0 100644 --- a/src/llmq/quorums_signing.cpp +++ b/src/llmq/quorums_signing.cpp @@ -4,6 +4,7 @@ // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include "quorums_signing.h" +#include "chainparams.h" #include "clientversion.h" #include "netaddress.h" #include "quorums_signing_shares.h" @@ -12,13 +13,15 @@ #include "activemasternode.h" #include "bls/bls_batchverifier.h" #include "cxxtimer.h" -#include "net_processing.h" #include "validation.h" #include #include #include +void EraseObjectRequest(NodeId nodeId, const CInv& inv); +void Misbehaving(NodeId nodeid, int howmuch, const std::string& message = ""); + namespace llmq { @@ -640,7 +643,7 @@ void CSigningManager::ProcessRecoveredSig(NodeId nodeId, const CRecoveredSig& re { LOCK(cs_main); - connman.RemoveAskFor(recoveredSig.GetHash(), MSG_QUORUM_RECOVERED_SIG); + EraseObjectRequest(nodeId, CInv(MSG_QUORUM_RECOVERED_SIG, recoveredSig.GetHash())); } if (db.HasRecoveredSigForHash(recoveredSig.GetHash())) { diff --git a/src/masternode-payments.cpp b/src/masternode-payments.cpp index 8d3da0bb7292..cd0779560435 100644 --- a/src/masternode-payments.cpp +++ b/src/masternode-payments.cpp @@ -5,20 +5,21 @@ #include "masternode-payments.h" +#include "budget/budgetmanager.h" #include "chainparams.h" #include "evo/deterministicmns.h" #include "fs.h" -#include "budget/budgetmanager.h" #include "masternodeman.h" #include "netmessagemaker.h" -#include "tiertwo/netfulfilledman.h" #include "spork.h" #include "sync.h" +#include "tiertwo/netfulfilledman.h" #include "tiertwo/tiertwo_sync_state.h" #include "util/system.h" #include "utilmoneystr.h" #include "validation.h" +void EraseObjectRequest(NodeId nodeId, const CInv& inv); /** Object for who's going to get paid on which blocks */ CMasternodePayments masternodePayments; @@ -430,7 +431,7 @@ bool CMasternodePayments::ProcessMessageMasternodePayments(CNode* pfrom, std::st { // Clear inv request LOCK(cs_main); - g_connman->RemoveAskFor(winner.GetHash(), MSG_MASTERNODE_WINNER); + EraseObjectRequest(pfrom->GetId(), CInv(MSG_MASTERNODE_WINNER, winner.GetHash())); } ProcessMNWinner(winner, pfrom, state); diff --git a/src/masternodeman.cpp b/src/masternodeman.cpp index e4248af9afbf..1eb0f1f69235 100644 --- a/src/masternodeman.cpp +++ b/src/masternodeman.cpp @@ -6,12 +6,12 @@ #include "masternodeman.h" #include "addrman.h" +#include "chainparams.h" #include "evo/deterministicmns.h" #include "fs.h" #include "masternode-payments.h" #include "masternode-sync.h" #include "masternode.h" -#include "messagesigner.h" #include "netbase.h" #include "netmessagemaker.h" #include "shutdown.h" @@ -23,6 +23,8 @@ #define MN_WINNER_MINIMUM_AGE 8000 // Age in seconds. This should be > MASTERNODE_REMOVAL_SECONDS to avoid misconfigured new nodes in the list. +void EraseObjectRequest(NodeId nodeId, const CInv& inv); + /** Masternode manager */ CMasternodeMan mnodeman; /** Keep track of the active Masternode */ @@ -968,7 +970,7 @@ int CMasternodeMan::ProcessMessageInner(CNode* pfrom, std::string& strCommand, C { // Clear inv request LOCK(cs_main); - g_connman->RemoveAskFor(mnb.GetHash(), MSG_MASTERNODE_ANNOUNCE); + EraseObjectRequest(pfrom->GetId(), CInv(MSG_MASTERNODE_ANNOUNCE, mnb.GetHash())); } return ProcessMNBroadcast(pfrom, mnb); @@ -979,7 +981,7 @@ int CMasternodeMan::ProcessMessageInner(CNode* pfrom, std::string& strCommand, C { // Clear inv request LOCK(cs_main); - g_connman->RemoveAskFor(mnb.GetHash(), MSG_MASTERNODE_ANNOUNCE); + EraseObjectRequest(pfrom->GetId(), CInv(MSG_MASTERNODE_ANNOUNCE, mnb.GetHash())); } // For now, let's not process mnb2 with pre-BIP155 node addr format. @@ -998,7 +1000,7 @@ int CMasternodeMan::ProcessMessageInner(CNode* pfrom, std::string& strCommand, C { // Clear inv request LOCK(cs_main); - g_connman->RemoveAskFor(mnp.GetHash(), MSG_MASTERNODE_PING); + EraseObjectRequest(pfrom->GetId(), CInv(MSG_MASTERNODE_PING, mnp.GetHash())); } return ProcessMNPing(pfrom, mnp); diff --git a/src/net.cpp b/src/net.cpp index a04687b5b36e..1a720fcb3109 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -93,8 +93,6 @@ std::map mapLocalHost; static bool vfLimited[NET_MAX] = {}; std::string strSubVersion; -limitedmap mapAlreadyAskedFor(MAX_INV_SZ); - void CConnman::AddOneShot(const std::string& strDest) { LOCK(cs_vOneShots); @@ -2571,16 +2569,6 @@ void CConnman::RelayInv(CInv& inv, int minProtoVersion) } } -void CConnman::RemoveAskFor(const uint256& invHash, int invType) -{ - mapAlreadyAskedFor.erase(CInv(invType, invHash)); - - LOCK(cs_vNodes); - for (const auto& pnode : vNodes) { - pnode->AskForInvReceived(invHash); - } -} - void CConnman::UpdateQuorumRelayMemberIfNeeded(CNode* pnode) { if (!pnode->m_masternode_iqr_connection && pnode->m_masternode_connection && @@ -2699,52 +2687,6 @@ CNode::~CNode() CloseSocket(hSocket); } -void CNode::AskFor(const CInv& inv, int64_t doubleRequestDelay) -{ - if (mapAskFor.size() > MAPASKFOR_MAX_SZ || setAskFor.size() > SETASKFOR_MAX_SZ) - return; - // a peer may not have multiple non-responded queue positions for a single inv item - if (!setAskFor.insert(inv.hash).second) - return; - - // We're using mapAskFor as a priority queue, - // the key is the earliest time the request can be sent - int64_t nRequestTime; - limitedmap::const_iterator it = mapAlreadyAskedFor.find(inv); - if (it != mapAlreadyAskedFor.end()) - nRequestTime = it->second; - else - nRequestTime = 0; - LogPrint(BCLog::NET, "askfor %s %d (%s) peer=%d\n", inv.ToString(), nRequestTime, FormatISO8601Time(nRequestTime / 1000000), id); - - // Make sure not to reuse time indexes to keep things in the same order - int64_t nNow = GetTimeMicros() - 1000000; - static int64_t nLastTime; - ++nLastTime; - nNow = std::max(nNow, nLastTime); - nLastTime = nNow; - - // Each retry is 2 minutes after the last - nRequestTime = std::max(nRequestTime + doubleRequestDelay, nNow); - if (it != mapAlreadyAskedFor.end()) - mapAlreadyAskedFor.update(it, nRequestTime); - else - mapAlreadyAskedFor.insert(std::make_pair(inv, nRequestTime)); - mapAskFor.insert(std::make_pair(nRequestTime, inv)); -} - -void CNode::AskForInvReceived(const uint256& invHash) -{ - setAskFor.erase(invHash); - for (auto it = mapAskFor.begin(); it != mapAskFor.end();) { - if (it->second.hash == invHash) { - it = mapAskFor.erase(it); - } else { - ++it; - } - } -} - bool CConnman::NodeFullyConnected(const CNode* pnode) { return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect; diff --git a/src/net.h b/src/net.h index 2b25df53f4dd..c2703922844f 100644 --- a/src/net.h +++ b/src/net.h @@ -86,10 +86,6 @@ static const int MAX_ADDNODE_CONNECTIONS = 16; static const int INBOUND_EVICTION_PROTECTION_TIME = 1; /** -listen default */ static const bool DEFAULT_LISTEN = true; -/** The maximum number of entries in mapAskFor */ -static const size_t MAPASKFOR_MAX_SZ = MAX_INV_SZ; -/** The maximum number of entries in setAskFor (larger due to getdata latency)*/ -static const size_t SETASKFOR_MAX_SZ = 2 * MAX_INV_SZ; /** The maximum number of peer connections to maintain. */ static const unsigned int DEFAULT_MAX_PEER_CONNECTIONS = 125; /** Disconnected peers are added to setOffsetDisconnectedPeers only if node has less than ENOUGH_CONNECTIONS */ @@ -299,9 +295,6 @@ class CConnman std::vector CopyNodeVector(); void ReleaseNodeVector(const std::vector& vecNodes); - // Clears AskFor requests for every known peer - void RemoveAskFor(const uint256& invHash, int invType); - void RelayInv(CInv& inv, int minProtoVersion = ActiveProtocol()); bool IsNodeConnected(const CAddress& addr); // Retrieves a connected peer (if connection success). Used only to check peer address availability for now. @@ -568,8 +561,6 @@ bool validateMasternodeIP(const std::string& addrStr); // valid, reacha extern bool fDiscover; extern bool fListen; -extern limitedmap mapAlreadyAskedFor; - /** Subversion as sent to the P2P network in `version` messages */ extern std::string strSubVersion; @@ -773,8 +764,6 @@ class CNode // Set of tier two messages ids we still have to announce. std::vector vInventoryTierTwoToSend; RecursiveMutex cs_inventory; - std::multimap mapAskFor; - std::set setAskFor; std::vector vBlockRequested; std::chrono::microseconds nNextInvSend{0}; // Used for BIP35 mempool sending, also protected by cs_inventory @@ -925,10 +914,6 @@ class CNode } } - void AskFor(const CInv& inv, int64_t doubleRequestDelay = 2 * 60 * 1000000); - // inv response received, clear it from the waiting inv set. - void AskForInvReceived(const uint256& invHash); - void CloseSocketDisconnect(); bool DisconnectOldProtocol(int nVersionIn, int nVersionRequired); diff --git a/src/net_processing.cpp b/src/net_processing.cpp index bdda9fdf2726..c889738b7cf9 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -36,6 +36,23 @@ using namespace std::chrono_literals; static const uint64_t RANDOMIZER_ID_ADDRESS_RELAY = 0x3cac0035b5866b90ULL; // SHA256("main address relay")[0:8] +/** Maximum number of in-flight transactions from a peer */ +static constexpr int32_t MAX_PEER_TX_IN_FLIGHT = 100; +/** Maximum number of announced transactions from a peer */ +static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 2 * MAX_INV_SZ; +/** How many microseconds to delay requesting transactions from inbound peers */ +static constexpr std::chrono::microseconds INBOUND_PEER_TX_DELAY{std::chrono::seconds{2}}; +/** How long to wait (in microseconds) before downloading a transaction from an additional peer */ +static constexpr std::chrono::microseconds GETDATA_TX_INTERVAL{std::chrono::seconds{60}}; +/** Maximum delay (in microseconds) for transaction requests to avoid biasing some peers over others. */ +/** How long to wait (expiry * factor microseconds) before expiring an in-flight getdata request to a peer */ +static constexpr int64_t TX_EXPIRY_INTERVAL_FACTOR = 10; +/** Maximum delay (in microseconds) for transaction requests to avoid biasing some peers over others. */ +static constexpr std::chrono::microseconds MAX_GETDATA_RANDOM_DELAY{std::chrono::seconds{2}}; +static_assert(INBOUND_PEER_TX_DELAY >= MAX_GETDATA_RANDOM_DELAY, + "To preserve security, MAX_GETDATA_RANDOM_DELAY should not exceed INBOUND_PEER_DELAY"); +/** Limit to avoid sending big packets. Not used in processing incoming GETDATA for compatibility */ +static const unsigned int MAX_GETDATA_SZ = 1000; /** the maximum percentage of addresses from our addrman to return in response to a getaddr message. */ static constexpr size_t MAX_PCT_ADDR_TO_SEND = 23; @@ -231,7 +248,73 @@ struct CNodeState { CNodeBlocks nodeBlocks; - CNodeState(CAddress addrIn, std::string addrNameIn) : address(addrIn), name(addrNameIn) { + /* + * State associated with transaction download. + * + * Tx download algorithm: + * + * When inv comes in, queue up (process_time, txid) inside the peer's + * CNodeState (m_tx_process_time) as long as m_tx_announced for the peer + * isn't too big (MAX_PEER_TX_ANNOUNCEMENTS). + * + * The process_time for a transaction is set to nNow for outbound peers, + * nNow + 2 seconds for inbound peers. This is the time at which we'll + * consider trying to request the transaction from the peer in + * SendMessages(). The delay for inbound peers is to allow outbound peers + * a chance to announce before we request from inbound peers, to prevent + * an adversary from using inbound connections to blind us to a + * transaction (InvBlock). + * + * When we call SendMessages() for a given peer, + * we will loop over the transactions in m_tx_process_time, looking + * at the transactions whose process_time <= nNow. We'll request each + * such transaction that we don't have already and that hasn't been + * requested from another peer recently, up until we hit the + * MAX_PEER_TX_IN_FLIGHT limit for the peer. Then we'll update + * g_already_asked_for for each requested txid, storing the time of the + * GETDATA request. We use g_already_asked_for to coordinate transaction + * requests amongst our peers. + * + * For transactions that we still need but we have already recently + * requested from some other peer, we'll reinsert (process_time, txid) + * back into the peer's m_tx_process_time at the point in the future at + * which the most recent GETDATA request would time out (ie + * GETDATA_TX_INTERVAL + the request time stored in g_already_asked_for). + * We add an additional delay for inbound peers, again to prefer + * attempting download from outbound peers first. + * We also add an extra small random delay up to 2 seconds + * to avoid biasing some peers over others. (e.g., due to fixed ordering + * of peer processing in ThreadMessageHandler). + * + * When we receive a transaction from a peer, we remove the txid from the + * peer's m_tx_in_flight set and from their recently announced set + * (m_tx_announced). We also clear g_already_asked_for for that entry, so + * that if somehow the transaction is not accepted but also not added to + * the reject filter, then we will eventually redownload from other + * peers. + * + * PIVX: For PIVX, this does not only handles TXs but also all PIVX specific objects + */ + struct TxDownloadState { + /* Track when to attempt download of announced transactions (process + * time in micros -> txid) + */ + std::multimap m_tx_process_time; + + //! Store all the transactions a peer has recently announced + std::set m_tx_announced; + + //! Store transactions which were requested by us, with timestamp + std::map m_tx_in_flight; + + //! Periodically check for stuck getdata requests + std::chrono::microseconds m_check_expiry_timer{0}; + }; + + TxDownloadState m_tx_download; + + CNodeState(CAddress addrIn, std::string addrNameIn) : address(addrIn), name(addrNameIn) + { fCurrentlyConnected = false; nMisbehavior = 0; fShouldBan = false; @@ -245,6 +328,10 @@ struct CNodeState { } }; +// Keeps track of the time (in microseconds) when transactions were requested last time +limitedmap g_already_asked_for(MAX_INV_SZ); +limitedmap g_erased_object_requests(MAX_INV_SZ); + /** Map maintaining per-node state. Requires cs_main. */ std::map mapNodeState; @@ -454,9 +541,150 @@ static void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vec } } -} // anon namespace +} // namespace + +void EraseObjectRequest(CNodeState* nodestate, const CInv& inv) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +{ + AssertLockHeld(cs_main); + LogPrint(BCLog::NET, "%s -- inv=(%s)\n", __func__, inv.ToString()); + g_already_asked_for.erase(inv.hash); + g_erased_object_requests.insert(std::make_pair(inv.hash, GetTime())); + + if (nodestate) { + nodestate->m_tx_download.m_tx_announced.erase(inv); + nodestate->m_tx_download.m_tx_in_flight.erase(inv); + } +} + +void EraseObjectRequest(NodeId nodeId, const CInv& inv) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +{ + AssertLockHeld(cs_main); + auto* state = State(nodeId); + if (!state) { + return; + } + EraseObjectRequest(state, inv); +} + +std::chrono::microseconds GetObjectRequestTime(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +{ + AssertLockHeld(cs_main); + auto it = g_already_asked_for.find(hash); + if (it != g_already_asked_for.end()) { + return it->second; + } + return {}; +} + +void UpdateObjectRequestTime(const uint256& hash, std::chrono::microseconds request_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +{ + auto it = g_already_asked_for.find(hash); + + if (it == g_already_asked_for.end()) { + g_already_asked_for.insert(std::make_pair(hash, request_time)); + } else { + g_already_asked_for.update(it, request_time); + } +} + +std::chrono::microseconds GetObjectInterval(int invType) +{ + // some messages need to be re-requested faster when the first announcing peer did not answer to GETDATA + switch (invType) { + case MSG_QUORUM_RECOVERED_SIG: + return std::chrono::seconds(15); + case MSG_CLSIG: + return std::chrono::seconds(5); + default: + return GETDATA_TX_INTERVAL; + } +} + +std::chrono::microseconds GetObjectExpiryInterval(int invType) +{ + return GetObjectInterval(invType) * TX_EXPIRY_INTERVAL_FACTOR; +} + +std::chrono::microseconds GetObjectRandomDelay(int invType) +{ + if (invType == MSG_TX) { + return GetRandMicros(MAX_GETDATA_RANDOM_DELAY); + } + return {}; +} + +std::chrono::microseconds CalculateObjectGetDataTime(const CInv& inv, std::chrono::microseconds current_time, bool use_inbound_delay) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +{ + AssertLockHeld(cs_main); + std::chrono::microseconds process_time; + const auto last_request_time = GetObjectRequestTime(inv.hash); + // First time requesting this tx + if (last_request_time.count() == 0) { + process_time = current_time; + } else { + // Randomize the delay to avoid biasing some peers over others (such as due to + // fixed ordering of peer processing in ThreadMessageHandler) + process_time = last_request_time + GetObjectInterval(inv.type) + GetObjectRandomDelay(inv.type); + } + + // We delay processing announcements from inbound peers + if (use_inbound_delay) process_time += INBOUND_PEER_TX_DELAY; + return process_time; +} + +void RequestObject(CNodeState* state, const CInv& inv, std::chrono::microseconds current_time, bool fForce = true) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +{ + CNodeState::TxDownloadState& peer_download_state = state->m_tx_download; + if (peer_download_state.m_tx_announced.size() >= MAX_PEER_TX_ANNOUNCEMENTS || peer_download_state.m_tx_announced.count(inv)) { + // Too many queued announcements from this peer, or we already have + // this announcement + return; + } + peer_download_state.m_tx_announced.insert(inv); + // Calculate the time to try requesting this transaction. Use + // fPreferredDownload as a proxy for outbound peers. + std::chrono::microseconds process_time = CalculateObjectGetDataTime(inv, current_time, !state->fPreferredDownload); + + peer_download_state.m_tx_process_time.emplace(process_time, inv); + if (fForce) { + // make sure this object is actually requested ASAP + g_erased_object_requests.erase(inv.hash); + g_already_asked_for.erase(inv.hash); + } + + LogPrint(BCLog::NET, "%s -- inv=(%s), current_time=%d, process_time=%d, delta=%d\n", __func__, inv.ToString(), current_time.count(), process_time.count(), (process_time - current_time).count()); +} + +void RequestObject(NodeId nodeId, const CInv& inv, std::chrono::microseconds current_time, bool fForce) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +{ + AssertLockHeld(cs_main); + auto* state = State(nodeId); + if (!state) { + return; + } + + RequestObject(state, inv, current_time, fForce); +} + +size_t GetRequestedObjectCount(NodeId nodeId) +{ + AssertLockHeld(cs_main); + auto* state = State(nodeId); + if (!state) { + return 0; + } + return state->m_tx_download.m_tx_process_time.size(); +} + +// Returns true for outbound peers, excluding manual connections, feelers, and +// one-shots +bool IsOutboundDisconnectionCandidate(const CNode* node) +{ + return !(node->fInbound || node->fFeeler || node->fOneShot); +} -void PeerLogicValidation::InitializeNode(CNode *pnode) { +void PeerLogicValidation::InitializeNode(CNode* pnode) +{ CAddress addr = pnode->addr; std::string addrName = pnode->GetAddrName(); NodeId nodeid = pnode->GetId(); @@ -1587,6 +1815,7 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR } LOCK(cs_main); + const auto current_time = GetTime(); std::vector vToFetch; @@ -1634,22 +1863,11 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR // wait until we are sync if (!fAlreadyHave) { bool allowWhileInIBD = allowWhileInIBDObjs.count(inv.type); - if (allowWhileInIBD || !IsInitialBlockDownload()) { - int64_t doubleRequestDelay = 2 * 60 * 1000000; - // some messages need to be re-requested faster when the first announcing peer did not answer to GETDATA - switch (inv.type) { - case MSG_QUORUM_RECOVERED_SIG: - doubleRequestDelay = 5 * 1000000; - break; - case MSG_CLSIG: - doubleRequestDelay = 5 * 1000000; - break; - } - pfrom->AskFor(inv, doubleRequestDelay); + if (allowWhileInIBD || (!fImporting && !fReindex && !IsInitialBlockDownload())) { + RequestObject(State(pfrom->GetId()), inv, current_time); } } } - } if (!vToFetch.empty()) @@ -1678,7 +1896,6 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR else if (strCommand == NetMsgType::GETBLOCKS || strCommand == NetMsgType::GETHEADERS) { - // Don't relay blocks inv to masternode-only connections if (!pfrom->CanRelay()) { LogPrint(BCLog::NET, "getblocks, don't relay blocks inv to masternode connection. peer=%d\n", pfrom->GetId()); @@ -1779,8 +1996,10 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR bool fMissingInputs = false; CValidationState state; - pfrom->setAskFor.erase(inv.hash); - mapAlreadyAskedFor.erase(inv); + CNodeState* nodestate = State(pfrom->GetId()); + nodestate->m_tx_download.m_tx_announced.erase(inv); + nodestate->m_tx_download.m_tx_in_flight.erase(inv); + EraseObjectRequest(pfrom->GetId(), inv); if (ptx->ContainsZerocoins()) { // Don't even try to check zerocoins at all. @@ -1869,10 +2088,16 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR } } if (!fRejectedParents) { - for (const uint256& parent_txid : unique_parents) { - CInv _inv(MSG_TX, parent_txid); + const auto current_time = GetTime(); + + for (const CTxIn& txin : tx.vin) { + CInv _inv(MSG_TX, txin.prevout.hash); pfrom->AddInventoryKnown(_inv); - if (!AlreadyHave(_inv)) pfrom->AskFor(_inv); + if (!AlreadyHave(_inv)) RequestObject(State(pfrom->GetId()), _inv, current_time); + // We don't know if the previous tx was a regular or a mixing one, try both + CInv _inv2(MSG_DSTX, txin.prevout.hash); + pfrom->AddInventoryKnown(_inv2); + if (!AlreadyHave(_inv2)) RequestObject(State(pfrom->GetId()), _inv2, current_time); } AddOrphanTx(ptx, pfrom->GetId()); @@ -1882,7 +2107,7 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR if (nEvicted > 0) LogPrint(BCLog::MEMPOOL, "mapOrphan overflow, removed %u tx\n", nEvicted); } else { - LogPrint(BCLog::MEMPOOL, "not keeping orphan with rejected parents %s\n",tx.GetHash().ToString()); + LogPrint(BCLog::MEMPOOL, "not keeping orphan with rejected parents %s\n", tx.GetHash().ToString()); } } else { // AcceptToMemoryPool() returned false, possibly because the tx is @@ -2031,7 +2256,6 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR else if (strCommand == NetMsgType::MEMPOOL) { - if (!(pfrom->GetLocalServices() & NODE_BLOOM) && !pfrom->fWhitelisted) { LogPrint(BCLog::NET, "mempool request with bloom filters disabled, disconnect peer=%d\n", pfrom->GetId()); pfrom->fDisconnect = true; @@ -2176,8 +2400,25 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR } else if (strCommand == NetMsgType::NOTFOUND) { - // We do not care about the NOTFOUND message (for now), but logging an Unknown Command - // message is undesirable as we transmit it ourselves. + // Remove the NOTFOUND transactions from the peer + LOCK(cs_main); + CNodeState *state = State(pfrom->GetId()); + std::vector vInv; + vRecv >> vInv; + if (vInv.size() <= MAX_PEER_TX_IN_FLIGHT + MAX_BLOCKS_IN_TRANSIT_PER_PEER) { + for (CInv &inv : vInv) { + // If we receive a NOTFOUND message for a txid we requested, erase + // it from our data structures for this peer. + auto in_flight_it = state->m_tx_download.m_tx_in_flight.find(inv); + if (in_flight_it == state->m_tx_download.m_tx_in_flight.end()) { + // Skip any further work if this is a spurious NOTFOUND + // message. + continue; + } + state->m_tx_download.m_tx_in_flight.erase(in_flight_it); + state->m_tx_download.m_tx_announced.erase(inv); + } + } return true; } @@ -2599,7 +2840,9 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic& interruptM // Detect whether we're stalling current_time = GetTime(); - nNow = GetTimeMicros(); + // nNow is the current system time (GetTimeMicros is not mockable) and + // should be replaced by the mockable current_time eventually + if (state.nStallingSince && state.nStallingSince < nNow - 1000000 * BLOCK_STALLING_TIMEOUT) { // Stalling only triggers when the block download window cannot move. During normal steady state, // the download window should be much larger than the to-be-downloaded set of blocks, so disconnection @@ -2644,20 +2887,69 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic& interruptM // // Message: getdata (non-blocks) // - while (!pto->mapAskFor.empty() && (*pto->mapAskFor.begin()).first <= nNow) { - const CInv& inv = (*pto->mapAskFor.begin()).second; + + // For robustness, expire old requests after a long timeout, so that + // we can resume downloading transactions from a peer even if they + // were unresponsive in the past. + // Eventually we should consider disconnecting peers, but this is + // conservative. + if (state.m_tx_download.m_check_expiry_timer <= current_time) { + for (auto it = state.m_tx_download.m_tx_in_flight.begin(); it != state.m_tx_download.m_tx_in_flight.end();) { + if (it->second <= current_time - GetObjectExpiryInterval(it->first.type)) { + LogPrint(BCLog::NET, "timeout of inflight tx %s from peer=%d\n", it->first.ToString(), pto->GetId()); + state.m_tx_download.m_tx_announced.erase(it->first); + state.m_tx_download.m_tx_in_flight.erase(it++); + } else { + ++it; + } + } + // On average, we do this check every TX_EXPIRY_INTERVAL. Randomize + // so that we're not doing this for all peers at the same time. + state.m_tx_download.m_check_expiry_timer = current_time + GetObjectExpiryInterval(MSG_TX) / 2 + GetRandMicros(GetObjectExpiryInterval(MSG_TX)); + } + + // DASH this code also handles non-TXs (Dash specific messages) + auto& tx_process_time = state.m_tx_download.m_tx_process_time; + while (!tx_process_time.empty() && tx_process_time.begin()->first <= current_time && state.m_tx_download.m_tx_in_flight.size() < MAX_PEER_TX_IN_FLIGHT) { + const CInv inv = tx_process_time.begin()->second; + // Erase this entry from tx_process_time (it may be added back for + // processing at a later time, see below) + tx_process_time.erase(tx_process_time.begin()); + if (g_erased_object_requests.count(inv.hash)) { + LogPrint(BCLog::NET, "%s -- GETDATA skipping inv=(%s), peer=%d\n", __func__, inv.ToString(), pto->GetId()); + state.m_tx_download.m_tx_announced.erase(inv); + state.m_tx_download.m_tx_in_flight.erase(inv); + continue; + } if (!AlreadyHave(inv)) { - LogPrint(BCLog::NET, "Requesting %s peer=%d\n", inv.ToString(), pto->GetId()); - vGetData.push_back(inv); - if (vGetData.size() >= 1000) { - connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); - vGetData.clear(); + // If this transaction was last requested more than 1 minute ago, + // then request. + const auto last_request_time = GetObjectRequestTime(inv.hash); + if (last_request_time <= current_time - GetObjectInterval(inv.type)) { + LogPrint(BCLog::NET, "Requesting %s peer=%d\n", inv.ToString(), pto->GetId()); + vGetData.push_back(inv); + if (vGetData.size() >= MAX_GETDATA_SZ) { + connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); + vGetData.clear(); + } + + UpdateObjectRequestTime(inv.hash, current_time); + state.m_tx_download.m_tx_in_flight.emplace(inv, current_time); + } else { + // This transaction is in flight from someone else; queue + // up processing to happen after the download times out + // (with a slight delay for inbound peers, to prefer + // requests to outbound peers). + const auto next_process_time = CalculateObjectGetDataTime(inv, current_time, !state.fPreferredDownload); + tx_process_time.emplace(next_process_time, inv); + LogPrint(BCLog::NET, "%s -- GETDATA re-queue inv=(%s), next_process_time=%d, delta=%d, peer=%d\n", __func__, inv.ToString(), next_process_time.count(), (next_process_time - current_time).count(), pto->GetId()); } } else { - //If we're not going to ask, don't expect a response. - pto->setAskFor.erase(inv.hash); + // We have already seen this transaction, no need to download. + state.m_tx_download.m_tx_announced.erase(inv); + state.m_tx_download.m_tx_in_flight.erase(inv); + LogPrint(BCLog::NET, "%s -- GETDATA already seen inv=(%s), peer=%d\n", __func__, inv.ToString(), pto->GetId()); } - pto->mapAskFor.erase(pto->mapAskFor.begin()); } if (!vGetData.empty()) connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); diff --git a/src/net_processing.h b/src/net_processing.h index 35f2e83b566f..61d82f5e547e 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -15,9 +15,9 @@ extern RecursiveMutex cs_main; // !TODO: change mutex to cs_orphans /** Default for -maxorphantx, maximum number of orphan transactions kept in memory */ static const unsigned int DEFAULT_MAX_ORPHAN_TRANSACTIONS = 25; /** Expiration time for orphan transactions in seconds */ -static const int64_t ORPHAN_TX_EXPIRE_TIME = 20 * 60; +static const int64_t ORPHAN_TX_EXPIRE_TIME = 5 * 60; /** Minimum time between orphan transactions expire time checks in seconds */ -static const int64_t ORPHAN_TX_EXPIRE_INTERVAL = 5 * 60; +static const int64_t ORPHAN_TX_EXPIRE_INTERVAL = 1 * 60; /** Default for -blockspamfilter, use header spam filter */ static const bool DEFAULT_BLOCK_SPAM_FILTER = true; /** Default for -blockspamfiltermaxsize, maximum size of the list of indexes in the block spam filter */ @@ -81,4 +81,7 @@ using SecondsDouble = std::chrono::duration +#include // For std::chrono::microseconds #include +#include /** * Overall design of the RNG and entropy sources. @@ -251,4 +252,6 @@ bool Random_SanityCheck(); */ void RandomInit(); +std::chrono::microseconds GetRandMicros(std::chrono::microseconds duration_max) noexcept; + #endif // PIVX_RANDOM_H diff --git a/src/test/util_tests.cpp b/src/test/util_tests.cpp index 41e4ec451e39..005fe1e8474a 100644 --- a/src/test/util_tests.cpp +++ b/src/test/util_tests.cpp @@ -121,13 +121,7 @@ BOOST_AUTO_TEST_CASE(util_FormatISO8601Date) BOOST_CHECK_EQUAL(FormatISO8601Date(1317425777), "2011-09-30"); } -BOOST_AUTO_TEST_CASE(util_FormatISO8601Time) -{ - BOOST_CHECK_EQUAL(FormatISO8601Time(1317425777), "23:36:17Z"); -} - -struct TestArgsManager : public ArgsManager -{ +struct TestArgsManager : public ArgsManager { TestArgsManager() { m_network_only_args.clear(); } std::map >& GetOverrideArgs() { return m_override_args; } std::map >& GetConfigArgs() { return m_config_args; } diff --git a/src/utiltime.cpp b/src/utiltime.cpp index f518a61a08c6..c0f4b6d26f0d 100644 --- a/src/utiltime.cpp +++ b/src/utiltime.cpp @@ -137,16 +137,3 @@ std::string FormatISO8601Date(int64_t nTime) { } return strprintf("%04i-%02i-%02i", ts.tm_year + 1900, ts.tm_mon + 1, ts.tm_mday); } - -std::string FormatISO8601Time(int64_t nTime) { - struct tm ts; - time_t time_val = nTime; -#ifdef HAVE_GMTIME_R - if (gmtime_r(&time_val, &ts) == nullptr) { -#else - if (gmtime_s(&ts, &time_val) != 0) { -#endif - return {}; - } - return strprintf("%02i:%02i:%02iZ", ts.tm_hour, ts.tm_min, ts.tm_sec); -} diff --git a/test/functional/p2p_invalid_messages.py b/test/functional/p2p_invalid_messages.py index 8c52b10af2a8..8ceffb20115f 100755 --- a/test/functional/p2p_invalid_messages.py +++ b/test/functional/p2p_invalid_messages.py @@ -8,11 +8,11 @@ import time from test_framework import messages +from test_framework.messages import CTxIn, COutPoint, msg_mnping from test_framework.mininode import ( P2PDataStore, P2PInterface, ) -from test_framework.messages import CTxIn, COutPoint, msg_mnping from test_framework.test_framework import PivxTestFramework from test_framework.util import ( assert_equal, @@ -212,24 +212,26 @@ def test_large_inv(self): def test_fill_askfor(self): self.nodes[0].generate(1) # IBD conn = self.nodes[0].add_p2p_connection(InvReceiver()) + self.disable_mocktime() invs = [] blockhash = int(self.nodes[0].getbestblockhash(), 16) - for _ in range(50000): + total_requests = 100 + for _ in range(total_requests): mnp = msg_mnping(CTxIn(COutPoint(getrandbits(256))), blockhash, int(time.time())) - conn.vec_mnp[mnp.get_hash()] = mnp - invs.append(messages.CInv(15, mnp.get_hash())) - assert_equal(len(conn.vec_mnp), 50000) - assert_equal(len(invs), 50000) + hash = mnp.get_hash() + conn.vec_mnp[hash] = mnp + invs.append(messages.CInv(15, hash)) + assert_equal(len(conn.vec_mnp), total_requests) + assert_equal(len(invs), total_requests) msg = messages.msg_inv(invs) conn.send_message(msg) - conn.wait_for_p2p_messages(50000) - + conn.wait_for_p2p_messages(total_requests) # Prior #2611 the node was blocking any follow-up request. mnp = msg_mnping(CTxIn(COutPoint(getrandbits(256))), getrandbits(256), int(time.time())) conn.vec_mnp[mnp.get_hash()] = mnp msg = messages.msg_inv([messages.CInv(15, mnp.get_hash())]) conn.send_and_ping(msg) - conn.wait_for_p2p_messages(50001) + conn.wait_for_p2p_messages(total_requests + 1) self.nodes[0].disconnect_p2ps() def test_resource_exhaustion(self): diff --git a/test/functional/test_framework/messages.py b/test/functional/test_framework/messages.py index 694397c96bd5..1cb62393b515 100755 --- a/test/functional/test_framework/messages.py +++ b/test/functional/test_framework/messages.py @@ -1246,10 +1246,16 @@ def serialize(self): r += ser_string(self.vch_sig) r += struct.pack(" llmq/quorums_connections -> llmq/quorums" "llmq/quorums_dkgsession -> llmq/quorums_dkgsessionmgr -> llmq/quorums_dkgsessionhandler -> llmq/quorums_dkgsession" "llmq/quorums_dkgsessionhandler -> net_processing -> llmq/quorums_dkgsessionmgr -> llmq/quorums_dkgsessionhandler" - "llmq/quorums_signing -> net_processing -> llmq/quorums_signing" "llmq/quorums_chainlocks -> net_processing -> llmq/quorums_chainlocks" "llmq/quorums_chainlocks -> validation -> llmq/quorums_chainlocks" "chain -> legacy/stakemodifier -> validation -> validationinterface -> chain"