diff --git a/src/ipc/libmultiprocess/doc/versions.md b/src/ipc/libmultiprocess/doc/versions.md index 2c2ec50e..3cfa28e3 100644 --- a/src/ipc/libmultiprocess/doc/versions.md +++ b/src/ipc/libmultiprocess/doc/versions.md @@ -7,9 +7,14 @@ Library versions are tracked with simple Versioning policy is described in the [version.h](../include/mp/version.h) include. -## v10 +## v11 - Current unstable version. +## [v10.0](https://github.com/bitcoin-core/libmultiprocess/commits/v10.0) +- Increases spawn test timeout to avoid spurious failures. +- Uses `throwRecoverableException` instead of raw `throw` to improve runtime error messages in macOS builds. +- Used in Bitcoin Core master branch, pulled in by [#34977](https://github.com/bitcoin/bitcoin/pull/34977). Also pulled into Bitcoin Core 31.x stable branch by [#35028](https://github.com/bitcoin/bitcoin/pull/35028). + ## [v9.0](https://github.com/bitcoin-core/libmultiprocess/commits/v9.0) - Fixes race conditions where worker thread could be used after destruction, where getParams() could be called after request cancel, and where m_on_cancel could be called after request finishes. - Adds `CustomHasField` hook to map Cap'n Proto null values to C++ null values. diff --git a/src/ipc/libmultiprocess/include/mp/proxy-io.h b/src/ipc/libmultiprocess/include/mp/proxy-io.h index d7b9f0e5..09465c04 100644 --- a/src/ipc/libmultiprocess/include/mp/proxy-io.h +++ b/src/ipc/libmultiprocess/include/mp/proxy-io.h @@ -538,7 +538,12 @@ ProxyClientBase::ProxyClientBase(typename Interface::Client cli // the remote object, waiting for it to be deleted server side. If the // capnp interface does not define a destroy method, this will just call // an empty stub defined in the ProxyClientBase class and do nothing. - Sub::destroy(*this); + // Exceptions are caught and logged rather than propagated because + // ~ProxyClientBase is noexcept and the peer may be gone by the time + // this runs. + if (kj::runCatchingExceptions([&]{ Sub::destroy(*this); }) != nullptr) { + MP_LOG(*m_context.loop, Log::Warning) << "Remote destroy call failed during cleanup. Continuing."; + } // FIXME: Could just invoke removed addCleanup fn here instead of duplicating code m_context.loop->sync([&]() { diff --git a/src/ipc/libmultiprocess/include/mp/version.h b/src/ipc/libmultiprocess/include/mp/version.h index 964667a9..423ed460 100644 --- a/src/ipc/libmultiprocess/include/mp/version.h +++ b/src/ipc/libmultiprocess/include/mp/version.h @@ -24,7 +24,7 @@ //! pointing at the prior merge commit. The /doc/versions.md file should also be //! updated, noting any significant or incompatible changes made since the //! previous version. -#define MP_MAJOR_VERSION 10 +#define MP_MAJOR_VERSION 11 //! Minor version number. Should be incremented in stable branches after //! backporting changes. The /doc/versions.md file should also be updated to diff --git a/src/ipc/libmultiprocess/src/mp/proxy.cpp b/src/ipc/libmultiprocess/src/mp/proxy.cpp index d24208db..963050c3 100644 --- a/src/ipc/libmultiprocess/src/mp/proxy.cpp +++ b/src/ipc/libmultiprocess/src/mp/proxy.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -245,7 +246,12 @@ void EventLoop::loop() if (read_bytes != 1) throw std::logic_error("EventLoop wait_stream closed unexpectedly"); Lock lock(m_mutex); if (m_post_fn) { - Unlock(lock, *m_post_fn); + // m_post_fn throwing is never expected. If it does happen, the caller + // of EventLoop::post() will return without any indication of failure, + // which will likely cause other bugs. Log the error and continue. + KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() MP_REQUIRES(m_mutex) { Unlock(lock, *m_post_fn); })) { + MP_LOG(*this, Log::Error) << "EventLoop: m_post_fn threw: " << kj::str(*exception).cStr(); + } m_post_fn = nullptr; m_cv.notify_all(); } else if (done()) { diff --git a/src/ipc/libmultiprocess/test/mp/test/test.cpp b/src/ipc/libmultiprocess/test/mp/test/test.cpp index d91edb40..b259790b 100644 --- a/src/ipc/libmultiprocess/test/mp/test/test.cpp +++ b/src/ipc/libmultiprocess/test/mp/test/test.cpp @@ -427,6 +427,32 @@ KJ_TEST("Calling async IPC method, with server disconnect after cleanup") } } +KJ_TEST("Destroying ProxyClient<> with destroy method after peer disconnect") +{ + // Regression test for bitcoin-core/libmultiprocess#219 where + // ~ProxyClientBase would call std::terminate if the remote destroy RPC + // failed during teardown. + // + // Save a callback on the server so it holds a ProxyClient + // pointing back to this side, then disconnect. When the server is torn + // down, the ProxyClient destructor issues a destroy RPC over + // the now dead connection; without the bugfix the exception escapes the + // noexcept destructor and aborts the process. + + TestSetup setup{/*client_owns_connection=*/false}; + ProxyClient* foo = setup.client.get(); + foo->initThreadMap(); + + class Callback : public FooCallback + { + public: + int call(int arg) override { return arg; } + }; + + foo->saveCallback(std::make_shared()); + setup.client_disconnect(); +} + KJ_TEST("Make simultaneous IPC calls on single remote thread") { TestSetup setup; diff --git a/src/sv2-tp.cpp b/src/sv2-tp.cpp index 419998f1..96f01b59 100644 --- a/src/sv2-tp.cpp +++ b/src/sv2-tp.cpp @@ -125,6 +125,11 @@ static void AddArgs(ArgsManager& args) } static bool g_interrupt{false}; +namespace { +constexpr auto IPC_RECONNECT_INITIAL_DELAY{std::chrono::seconds{1}}; +constexpr auto IPC_RECONNECT_MAX_DELAY{std::chrono::seconds{32}}; +constexpr auto IPC_RECONNECT_POLL_INTERVAL{100ms}; +} #ifndef WIN32 static void registerSignalHandler(int signal, void(*handler)(int)) @@ -231,34 +236,52 @@ MAIN_FUNCTION options.template_interval = std::chrono::seconds(args.GetIntArg("-templateinterval", 0)); } - // Connect to bitcoin-node via IPC - // - // If the node is not available, keep retrying in a loop every 10 seconds. std::unique_ptr mine_init{interfaces::MakeBasicInit("sv2-tp", argc > 0 ? argv[0] : "")}; assert(mine_init); - std::unique_ptr node_init; std::string address{args.GetArg("-ipcconnect", "unix")}; LogPrintf("Attempting IPC connection to bitcoin-node at %s\n", address); LogPrintf("Ensure Bitcoin Core is running with '-ipcbind=unix' and the correct network (%s)\n", ChainTypeToString(chain_type)); - while (true) { - try { - node_init = mine_init->ipc()->connectAddress(address); - LogPrintf("Connected to bitcoin-node via IPC at: %s\n", address); - break; // Success: break out of the loop - } catch (const std::exception& exception) { - LogPrintf("IPC connection failed: %s\n", exception.what()); - LogPrintf("Retrying in 10 seconds... (Ensure Bitcoin Core is running with '-ipcbind=unix')\n"); - std::this_thread::sleep_for(std::chrono::seconds(10)); + const auto connect_to_node = [&]() -> std::pair, std::unique_ptr> { + auto retry_delay = IPC_RECONNECT_INITIAL_DELAY; + while (!g_interrupt) { + try { + std::unique_ptr node_init = mine_init->ipc()->connectAddress(address); + if (!node_init) { + throw std::runtime_error("IPC connection returned no remote init interface"); + } + std::unique_ptr mining = node_init->makeMining(); + if (!mining) { + throw std::runtime_error("IPC connection returned no mining interface"); + } + LogPrintf("Connected to bitcoin-node via IPC at: %s\n", address); + return {std::move(node_init), std::move(mining)}; + } catch (const std::exception& exception) { + LogPrintf("IPC connection failed: %s\n", exception.what()); + LogPrintf("Retrying in %d seconds... (Ensure Bitcoin Core is running with '-ipcbind=unix')\n", + retry_delay.count()); + auto waited{0ms}; + while (!g_interrupt && waited < retry_delay) { + std::this_thread::sleep_for(IPC_RECONNECT_POLL_INTERVAL); + waited += IPC_RECONNECT_POLL_INTERVAL; + } + retry_delay = std::min(retry_delay * 2, IPC_RECONNECT_MAX_DELAY); + } } - } + return {}; + }; + + auto connection = connect_to_node(); + auto node_init = std::move(connection.first); + auto mining = std::move(connection.second); + if (g_interrupt) return EXIT_SUCCESS; assert(node_init); - std::unique_ptr mining{node_init->makeMining()}; assert(mining); - auto tp = std::make_unique(*mining); + auto tp = std::make_unique(); + tp->ReplaceBackend(std::move(node_init), std::move(mining)); if (!tp->Start(options)) { tfm::format(std::cerr, "Unable to start Stratum v2 Template Provider"); @@ -273,8 +296,20 @@ MAIN_FUNCTION registerSignalHandler(SIGINT, HandleSIGTERM); #endif - while(!g_interrupt) { - UninterruptibleSleep(100ms); + while (!g_interrupt) { + if (!tp->BackendDisconnected()) { + UninterruptibleSleep(100ms); + continue; + } + + LogPrintf("Restarting sv2-tp after Bitcoin Core IPC disconnect\n"); + connection = connect_to_node(); + node_init = std::move(connection.first); + mining = std::move(connection.second); + if (g_interrupt) break; + assert(node_init); + assert(mining); + tp->ReplaceBackend(std::move(node_init), std::move(mining)); } tp->Interrupt(); diff --git a/src/sv2/template_provider.cpp b/src/sv2/template_provider.cpp index a445bd46..694e6657 100644 --- a/src/sv2/template_provider.cpp +++ b/src/sv2/template_provider.cpp @@ -17,10 +17,103 @@ #include #include +Sv2TemplateProvider::BackendSession::BackendSession(std::unique_ptr init, + std::unique_ptr mining) : + m_init(std::move(init)), + m_mining(std::move(mining)) +{ +} + // Allow a few seconds for clients to submit a block or to request transactions constexpr size_t STALE_TEMPLATE_GRACE_PERIOD{10}; -Sv2TemplateProvider::Sv2TemplateProvider(interfaces::Mining& mining) : m_mining{mining} +void Sv2TemplateProvider::ReplaceBackend(std::unique_ptr node_init, + std::unique_ptr mining) +{ + auto backend = std::make_shared(std::move(node_init), std::move(mining)); + { + LOCK(m_backend_mutex); + m_backend = backend; + } + m_backend_disconnected = false; + { + LOCK(m_tp_mutex); + ClearTemplateCache(); + } + m_backend_cv.notify_all(); +} + +std::shared_ptr Sv2TemplateProvider::WaitForBackend() +{ + WAIT_LOCK(m_backend_mutex, lock); + while (!m_flag_interrupt_sv2 && !m_backend) { + m_backend_cv.wait(lock); + } + if (m_flag_interrupt_sv2) return nullptr; + return m_backend; +} + +void Sv2TemplateProvider::DisconnectBackend(const std::shared_ptr& backend, + const char* operation, + const std::exception& exception) +{ + if (!backend) return; + const bool first_disconnect = backend->MarkDisconnected(); + std::shared_ptr active_backend; + { + LOCK(m_backend_mutex); + if (m_backend == backend) { + active_backend = m_backend; + m_backend.reset(); + } + } + if (!active_backend) return; + + m_backend_disconnected = true; + { + LOCK(m_tp_mutex); + ClearTemplateCache(); + } + if (first_disconnect) { + LogPrintLevel(BCLog::SV2, BCLog::Level::Error, + "Bitcoin Core IPC connection lost during %s: %s\n", + operation, exception.what()); + } else { + LogPrintLevel(BCLog::SV2, BCLog::Level::Trace, + "Ignoring repeated Bitcoin Core IPC failure during %s: %s\n", + operation, exception.what()); + } + + m_backend_cv.notify_all(); +} + +void Sv2TemplateProvider::InterruptBackend() +{ + std::shared_ptr backend; + { + LOCK(m_backend_mutex); + backend = m_backend; + } + if (!backend) return; + + { + LOCK(m_tp_mutex); + for (auto& t : GetBlockTemplates()) { + try { + t.second.second->interruptWait(); + } catch (const ipc::Exception& e) { + DisconnectBackend(backend, "interruptWait", e); + } + } + } + try { + backend->Mining().interrupt(); + } catch (const ipc::Exception& e) { + DisconnectBackend(backend, "interrupt", e); + } +} + +Sv2TemplateProvider::Sv2TemplateProvider() { // TODO: persist static key CKey static_key; @@ -121,6 +214,18 @@ Sv2TemplateProvider::~Sv2TemplateProvider() m_connman->StopThreads(); Interrupt(); + { + LOCK(m_backend_mutex); + if (m_backend) { + m_backend->MarkDisconnected(); + m_backend.reset(); + } + } + { + LOCK(m_tp_mutex); + ClearTemplateCache(); + } + m_backend_cv.notify_all(); StopThreads(); } @@ -129,15 +234,9 @@ void Sv2TemplateProvider::Interrupt() AssertLockNotHeld(m_tp_mutex); LogPrintLevel(BCLog::SV2, BCLog::Level::Trace, "Interrupt pending mining waits..."); - { - LOCK(m_tp_mutex); - for (auto& t : GetBlockTemplates()) { - t.second.second->interruptWait(); - } - } - m_flag_interrupt_sv2 = true; - m_mining.interrupt(); + m_backend_cv.notify_all(); + InterruptBackend(); // Also interrupt network threads so client handlers can wind down quickly. if (m_connman) m_connman->Interrupt(); } @@ -174,57 +273,54 @@ class Timer { } }; +void Sv2TemplateProvider::ClearTemplateCache() +{ + AssertLockHeld(m_tp_mutex); + m_block_template_cache.clear(); + m_best_prev_hash = uint256::ZERO; + m_last_block_time = GetTime(); +} + void Sv2TemplateProvider::ThreadSv2Handler() { - // Make sure it's initialized, doesn't need to be accurate. { LOCK(m_tp_mutex); m_last_block_time = GetTime(); } - // Wait to come out of IBD, except on signet, where we might be the only miner. - size_t log_ibd{0}; - while (!m_flag_interrupt_sv2 && gArgs.GetChainType() != ChainType::SIGNET) { - // TODO: Wait until there's no headers-only branch with more work than our chaintip. - // The current check can still cause us to broadcast a few dozen useless templates - // at startup. - if (!m_mining.isInitialBlockDownload()) break; - if (log_ibd == 0) { - LogPrintf("Waiting for IBD to complete on %s network before serving templates (this may take a while)\n", - ChainTypeToString(gArgs.GetChainType())); - } else if (log_ibd % 10 == 0) { - LogPrintf(".\n"); - } - log_ibd++; - std::this_thread::sleep_for(1000ms); - } - std::map client_threads; while (!m_flag_interrupt_sv2) { - // We start with one template per client, which has an interface through - // which we monitor for better templates. + std::shared_ptr backend; + { + LOCK(m_backend_mutex); + backend = m_backend; + } + + if (backend && gArgs.GetChainType() != ChainType::SIGNET) { + try { + if (backend->Mining().isInitialBlockDownload()) { + std::this_thread::sleep_for(1000ms); + continue; + } + } catch (const ipc::Exception& e) { + DisconnectBackend(backend, "template provider main loop", e); + continue; + } + } m_connman->ForEachClient([this, &client_threads](Sv2Client& client) { - /** - * The initial handshake is handled on the Sv2Connman thread. This - * consists of the noise protocol handshake and the initial Stratum - * v2 messages SetupConnection and CoinbaseOutputConstraints. - * - * A further refactor should make that part non-blocking. But for - * now we spin up a thread here. - */ if (!client.m_coinbase_output_constraints_recv) return; if (client_threads.contains(client.m_id)) return; - client_threads.emplace(client.m_id, + const size_t client_id = client.m_id; + client_threads.emplace(client_id, std::thread(&util::TraceThread, - strprintf("sv2-%zu", client.m_id), - [this, &client] { ThreadSv2ClientHandler(client.m_id); })); + strprintf("sv2-%zu", client_id), + [this, client_id] { ThreadSv2ClientHandler(client_id); })); }); - // Take a break (handling new connections is not urgent) std::this_thread::sleep_for(100ms); LOCK(m_tp_mutex); @@ -233,13 +329,9 @@ void Sv2TemplateProvider::ThreadSv2Handler() for (auto& thread : client_threads) { if (thread.second.joinable()) { - // If the node is shutting down, then all pending waitNext() calls - // should return in under a second. thread.second.join(); } } - - } void Sv2TemplateProvider::ThreadSv2ClientHandler(size_t client_id) @@ -271,9 +363,28 @@ void Sv2TemplateProvider::ThreadSv2ClientHandler(size_t client_id) }; std::shared_ptr block_template; + std::shared_ptr backend; + std::shared_ptr template_backend; // Cache most recent block_template->getBlockHeader().hashPrevBlock result. uint256 prev_hash; while (!m_flag_interrupt_sv2) { + if (!backend) { + backend = WaitForBackend(); + if (!backend) break; + } + + if (backend->Disconnected()) { + backend.reset(); + block_template.reset(); + template_backend.reset(); + continue; + } + + if (template_backend && template_backend != backend) { + block_template.reset(); + template_backend.reset(); + } + if (!block_template) { LogPrintLevel(BCLog::SV2, BCLog::Level::Trace, "Generate initial block template for client id=%zu\n", client_id); @@ -286,17 +397,30 @@ void Sv2TemplateProvider::ThreadSv2ClientHandler(size_t client_id) if (!prepare_block_create_options(block_create_options)) break; const auto time_start{SteadyClock::now()}; - block_template = m_mining.createNewBlock(block_create_options); + try { + block_template = backend->Mining().createNewBlock(block_create_options); + } catch (const ipc::Exception& e) { + DisconnectBackend(backend, "createNewBlock", e); + backend.reset(); + continue; + } if (!block_template) { LogPrintLevel(BCLog::SV2, BCLog::Level::Trace, "No new template for client id=%zu, node is shutting down\n", client_id); break; } + template_backend = backend; LogPrintLevel(BCLog::SV2, BCLog::Level::Trace, "Assemble template: %.2fms\n", Ticks(SteadyClock::now() - time_start)); - prev_hash = block_template->getBlockHeader().hashPrevBlock; + try { + prev_hash = block_template->getBlockHeader().hashPrevBlock; + } catch (const ipc::Exception& e) { + DisconnectBackend(backend, "getBlockHeader", e); + backend.reset(); + continue; + } { LOCK(m_tp_mutex); if (prev_hash != m_best_prev_hash) { @@ -315,11 +439,17 @@ void Sv2TemplateProvider::ThreadSv2ClientHandler(size_t client_id) std::shared_ptr client = m_connman->GetClientById(client_id); if (!client) break; - if (!SendWork(*client, template_id, *block_template, /*future_template=*/true)) { - LogPrintLevel(BCLog::SV2, BCLog::Level::Trace, "Disconnecting client id=%zu\n", - client_id); - LOCK(client->cs_status); - client->m_disconnect_flag = true; + try { + if (!SendWork(*client, template_id, *block_template, /*future_template=*/true)) { + LogPrintLevel(BCLog::SV2, BCLog::Level::Trace, "Disconnecting client id=%zu\n", + client_id); + LOCK(client->cs_status); + client->m_disconnect_flag = true; + } + } catch (const ipc::Exception& e) { + DisconnectBackend(backend, "SendWork", e); + backend.reset(); + continue; } } @@ -353,7 +483,16 @@ void Sv2TemplateProvider::ThreadSv2ClientHandler(size_t client_id) client_id); } - std::shared_ptr tmpl = block_template->waitNext(options); + std::shared_ptr tmpl; + try { + tmpl = block_template->waitNext(options); + } catch (const ipc::Exception& e) { + DisconnectBackend(template_backend, "template provider client loop", e); + backend.reset(); + block_template.reset(); + template_backend.reset(); + continue; + } // The client may have disconnected during the wait, check now to avoid // a spurious IPC call and confusing log statements. { @@ -364,7 +503,17 @@ void Sv2TemplateProvider::ThreadSv2ClientHandler(size_t client_id) // After timeout and during node shutdown this is expect to not be set if (tmpl) { block_template = tmpl; - uint256 new_prev_hash{block_template->getBlockHeader().hashPrevBlock}; + template_backend = backend; + uint256 new_prev_hash; + try { + new_prev_hash = block_template->getBlockHeader().hashPrevBlock; + } catch (const ipc::Exception& e) { + DisconnectBackend(template_backend, "getBlockHeader", e); + backend.reset(); + block_template.reset(); + template_backend.reset(); + continue; + } { LOCK(m_tp_mutex); @@ -389,11 +538,19 @@ void Sv2TemplateProvider::ThreadSv2ClientHandler(size_t client_id) std::shared_ptr client = m_connman->GetClientById(client_id); if (!client) break; - if (!SendWork(*client, WITH_LOCK(m_tp_mutex, return m_template_id;), *block_template, future_template)) { - LogPrintLevel(BCLog::SV2, BCLog::Level::Trace, "Disconnecting client id=%zu\n", - client_id); - LOCK(client->cs_status); - client->m_disconnect_flag = true; + try { + if (!SendWork(*client, WITH_LOCK(m_tp_mutex, return m_template_id;), *block_template, future_template)) { + LogPrintLevel(BCLog::SV2, BCLog::Level::Trace, "Disconnecting client id=%zu\n", + client_id); + LOCK(client->cs_status); + client->m_disconnect_flag = true; + } + } catch (const ipc::Exception& e) { + DisconnectBackend(template_backend, "SendWork", e); + backend.reset(); + block_template.reset(); + template_backend.reset(); + continue; } } @@ -428,7 +585,17 @@ void Sv2TemplateProvider::RequestTransactionData(Sv2Client& client, node::Sv2Req return; } - block = (*cached_block->second.second).getBlock(); + try { + block = (*cached_block->second.second).getBlock(); + } catch (const ipc::Exception& e) { + std::shared_ptr backend; + { + LOCK(m_backend_mutex); + backend = m_backend; + } + DisconnectBackend(backend, "getBlock", e); + return; + } auto recent = GetTime() - std::chrono::seconds(STALE_TEMPLATE_GRACE_PERIOD); if (block.hashPrevBlock != m_best_prev_hash && m_last_block_time < recent) { @@ -504,11 +671,22 @@ void Sv2TemplateProvider::SubmitSolution(node::Sv2SubmitSolutionMsg solution) } // Submit the solution to construct and process the block - const bool submitted = block_template->submitSolution( - solution.m_version, - solution.m_header_timestamp, - solution.m_header_nonce, - MakeTransactionRef(solution.m_coinbase_tx)); + bool submitted{false}; + try { + submitted = block_template->submitSolution( + solution.m_version, + solution.m_header_timestamp, + solution.m_header_nonce, + MakeTransactionRef(solution.m_coinbase_tx)); + } catch (const ipc::Exception& e) { + std::shared_ptr backend; + { + LOCK(m_backend_mutex); + backend = m_backend; + } + DisconnectBackend(backend, "submitSolution", e); + return; + } SaveBlockAsync(block_template, submitted); } @@ -541,6 +719,9 @@ void Sv2TemplateProvider::SaveBlockAsync(std::shared_ptr block_te "Wrote block %s to %s (submitted=%d)\n", block_hash.ToString(), fs::PathToString(out_path), submitted); } + } catch (const ipc::Exception& e) { + LogPrintLevel(BCLog::SV2, BCLog::Level::Error, + "sv2-saveblk thread caught IPC exception: %s\n", e.what()); } catch (const std::exception& e) { LogPrintLevel(BCLog::SV2, BCLog::Level::Error, "sv2-saveblk thread caught exception: %s\n", e.what()); @@ -567,8 +748,14 @@ void Sv2TemplateProvider::PruneBlockTemplateCache() bool Sv2TemplateProvider::SendWork(Sv2Client& client, uint64_t template_id, BlockTemplate& block_template, bool future_template) { - CBlockHeader header{block_template.getBlockHeader()}; - node::CoinbaseTx coinbase{block_template.getCoinbaseTx()}; + CBlockHeader header; + node::CoinbaseTx coinbase; + try { + header = block_template.getBlockHeader(); + coinbase = block_template.getCoinbaseTx(); + } catch (const ipc::Exception& e) { + throw; + } node::Sv2NewTemplateMsg new_template{header, coinbase, diff --git a/src/sv2/template_provider.h b/src/sv2/template_provider.h index 5b53bcb6..ed16e22f 100644 --- a/src/sv2/template_provider.h +++ b/src/sv2/template_provider.h @@ -2,6 +2,7 @@ #define BITCOIN_SV2_TEMPLATE_PROVIDER_H #include +#include #include #include #include @@ -11,6 +12,8 @@ #include #include #include +#include +#include using interfaces::BlockTemplate; @@ -53,10 +56,31 @@ class Sv2TemplateProvider : public Sv2EventsInterface private: /** - * The Mining interface is used to build new valid blocks, get the best known - * block hash and to check whether the node is still in IBD. + * The active Bitcoin Core IPC backend generation. */ - interfaces::Mining& m_mining; + struct BackendSession { + explicit BackendSession(std::unique_ptr init, std::unique_ptr mining); + + bool MarkDisconnected() + { + return !m_disconnected.exchange(true); + } + + bool Disconnected() const + { + return m_disconnected.load(); + } + + interfaces::Mining& Mining() + { + return *m_mining; + } + + private: + std::atomic m_disconnected{false}; + std::unique_ptr m_init; + std::unique_ptr m_mining; + }; /* * The template provider subprotocol used in setup connection messages. The stratum v2 @@ -86,7 +110,11 @@ class Sv2TemplateProvider : public Sv2EventsInterface * Signal for handling interrupts and stopping the template provider event loop. */ std::atomic m_flag_interrupt_sv2{false}; + std::atomic m_backend_disconnected{false}; CThreadInterrupt m_interrupt_sv2; + Mutex m_backend_mutex; + std::condition_variable_any m_backend_cv; + std::shared_ptr m_backend GUARDED_BY(m_backend_mutex); /** * The most recent template id. This is incremented on creating new template, @@ -112,9 +140,9 @@ class Sv2TemplateProvider : public Sv2EventsInterface BlockTemplateCache m_block_template_cache GUARDED_BY(m_tp_mutex); public: - explicit Sv2TemplateProvider(interfaces::Mining& mining); + Sv2TemplateProvider(); - ~Sv2TemplateProvider() EXCLUSIVE_LOCKS_REQUIRED(!m_tp_mutex); + ~Sv2TemplateProvider() EXCLUSIVE_LOCKS_REQUIRED(!m_tp_mutex, !m_backend_mutex); Mutex m_tp_mutex; @@ -124,11 +152,15 @@ class Sv2TemplateProvider : public Sv2EventsInterface */ [[nodiscard]] bool Start(const Sv2TemplateProviderOptions& options = {}); + bool BackendDisconnected() const { return m_backend_disconnected.load(); } + void ReplaceBackend(std::unique_ptr node_init, + std::unique_ptr mining) EXCLUSIVE_LOCKS_REQUIRED(!m_backend_mutex, !m_tp_mutex); + /** * The main thread for the template provider, contains an event loop handling * all tasks for the template provider. */ - void ThreadSv2Handler() EXCLUSIVE_LOCKS_REQUIRED(!m_tp_mutex); + void ThreadSv2Handler() EXCLUSIVE_LOCKS_REQUIRED(!m_tp_mutex, !m_backend_mutex); /** * Give each client its own thread so they're treated equally @@ -140,13 +172,13 @@ class Sv2TemplateProvider : public Sv2EventsInterface * connection. For the use case of a public facing template provider, * further changes are needed anyway e.g. for DoS resistance. */ - void ThreadSv2ClientHandler(size_t client_id) EXCLUSIVE_LOCKS_REQUIRED(!m_tp_mutex); + void ThreadSv2ClientHandler(size_t client_id) EXCLUSIVE_LOCKS_REQUIRED(!m_tp_mutex, !m_backend_mutex); /** * Triggered on interrupt signals to stop the main event loop in ThreadSv2Handler(). * Interrupts pending waitNext() calls */ - void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_tp_mutex); + void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_tp_mutex, !m_backend_mutex); /** * Tear down of the template provider thread and any other necessary tear down. @@ -189,6 +221,12 @@ class Sv2TemplateProvider : public Sv2EventsInterface */ [[nodiscard]] bool SendWork(Sv2Client& client, uint64_t template_id, BlockTemplate& block_template, bool future_template); + void ClearTemplateCache() EXCLUSIVE_LOCKS_REQUIRED(m_tp_mutex); + std::shared_ptr WaitForBackend() EXCLUSIVE_LOCKS_REQUIRED(!m_backend_mutex); + void DisconnectBackend(const std::shared_ptr& backend, const char* operation, const std::exception& exception) + EXCLUSIVE_LOCKS_REQUIRED(!m_backend_mutex, !m_tp_mutex); + void InterruptBackend() EXCLUSIVE_LOCKS_REQUIRED(!m_backend_mutex, !m_tp_mutex); + }; #endif // BITCOIN_SV2_TEMPLATE_PROVIDER_H diff --git a/src/test/sv2_tp_tester.cpp b/src/test/sv2_tp_tester.cpp index 777bdf74..92da49d9 100644 --- a/src/test/sv2_tp_tester.cpp +++ b/src/test/sv2_tp_tester.cpp @@ -75,7 +75,8 @@ TPTester::TPTester(Sv2TemplateProviderOptions opts) BOOST_REQUIRE(m_mining_proxy != nullptr); // Construct Template Provider with the IPC-backed Mining proxy - m_tp = std::make_unique(*m_mining_proxy); + m_tp = std::make_unique(); + m_tp->ReplaceBackend(std::move(m_client_init), std::move(m_mining_proxy)); CreateSock = [this](int, int, int) -> std::unique_ptr { // This will be the bind/listen socket from m_tp. It will @@ -93,14 +94,10 @@ TPTester::~TPTester() mp::EventLoopRef loop_ref{*m_loop}; // Destroy objects that may post work to the loop while the loop is guaranteed alive. m_tp.reset(); - m_mining_proxy.reset(); - m_client_init.reset(); // Server init can go after clients; it only owns exported capabilities. m_server_init.reset(); } else { m_tp.reset(); - m_mining_proxy.reset(); - m_client_init.reset(); m_server_init.reset(); } // Join loop thread (loop exits automatically when refs & connections reach zero).