Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion src/ipc/libmultiprocess/include/mp/proxy-io.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <capnp/rpc-twoparty.h>

#include <assert.h>
#include <atomic>
#include <condition_variable>
#include <functional>
#include <kj/function.h>
Expand Down Expand Up @@ -440,6 +441,9 @@ class Connection
//! destructors of m_impl instances owned by ProxyServer objects).
~Connection();

//! Whether disconnection/teardown has started for this connection.
std::atomic<bool> m_disconnected{false};

//! Register synchronous cleanup function to run on event loop thread (with
//! access to capnp thread local variables) when disconnect() is called.
//! any new i/o.
Expand Down Expand Up @@ -538,7 +542,15 @@ ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client cli
// the remote object, waiting for it to be deleted server side. If the
// capnp interface does not define a destroy method, this will just call
// an empty stub defined in the ProxyClientBase class and do nothing.
Sub::destroy(*this);
if (m_context.connection && !m_context.connection->m_disconnected.load()) {
try {
Sub::destroy(*this);
} catch (...) {
// Ignore exceptions during destruction, such as disconnect races
// Since this runs in a noexcept destructor, an uncaught exception
// would cause std::terminate.
}
}

// FIXME: Could just invoke removed addCleanup fn here instead of duplicating code
m_context.loop->sync([&]() {
Expand Down Expand Up @@ -808,6 +820,7 @@ std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, int f
init_client = connection->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs<InitInterface>();
Connection* connection_ptr = connection.get();
connection->onDisconnect([&loop, connection_ptr] {
connection_ptr->m_disconnected = true;
MP_LOG(loop, Log::Warning) << "IPC client: unexpected network disconnect.";
delete connection_ptr;
});
Expand Down
2 changes: 2 additions & 0 deletions src/ipc/libmultiprocess/src/mp/proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ ProxyContext::ProxyContext(Connection* connection) : connection(connection), loo

Connection::~Connection()
{
m_disconnected = true;

// Connection destructor is always called on the event loop thread. If this
// is a local disconnect, it will trigger I/O, so this needs to run on the
// event loop thread, and if there was a remote disconnect, this is called
Expand Down
71 changes: 53 additions & 18 deletions src/sv2-tp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ static void AddArgs(ArgsManager& args)
}

static bool g_interrupt{false};
namespace {
constexpr auto IPC_RECONNECT_INITIAL_DELAY{std::chrono::seconds{1}};
constexpr auto IPC_RECONNECT_MAX_DELAY{std::chrono::seconds{32}};
constexpr auto IPC_RECONNECT_POLL_INTERVAL{100ms};
}

#ifndef WIN32
static void registerSignalHandler(int signal, void(*handler)(int))
Expand Down Expand Up @@ -231,34 +236,52 @@ MAIN_FUNCTION
options.template_interval = std::chrono::seconds(args.GetIntArg("-templateinterval", 0));
}

// Connect to bitcoin-node via IPC
//
// If the node is not available, keep retrying in a loop every 10 seconds.
std::unique_ptr<interfaces::Init> mine_init{interfaces::MakeBasicInit("sv2-tp", argc > 0 ? argv[0] : "")};
assert(mine_init);
std::unique_ptr<interfaces::Init> node_init;
std::string address{args.GetArg("-ipcconnect", "unix")};

LogPrintf("Attempting IPC connection to bitcoin-node at %s\n", address);
LogPrintf("Ensure Bitcoin Core is running with '-ipcbind=unix' and the correct network (%s)\n",
ChainTypeToString(chain_type));

while (true) {
try {
node_init = mine_init->ipc()->connectAddress(address);
LogPrintf("Connected to bitcoin-node via IPC at: %s\n", address);
break; // Success: break out of the loop
} catch (const std::exception& exception) {
LogPrintf("IPC connection failed: %s\n", exception.what());
LogPrintf("Retrying in 10 seconds... (Ensure Bitcoin Core is running with '-ipcbind=unix')\n");
std::this_thread::sleep_for(std::chrono::seconds(10));
const auto connect_to_node = [&]() -> std::pair<std::unique_ptr<interfaces::Init>, std::unique_ptr<interfaces::Mining>> {
auto retry_delay = IPC_RECONNECT_INITIAL_DELAY;
while (!g_interrupt) {
try {
std::unique_ptr<interfaces::Init> node_init = mine_init->ipc()->connectAddress(address);
if (!node_init) {
throw std::runtime_error("IPC connection returned no remote init interface");
}
std::unique_ptr<interfaces::Mining> mining = node_init->makeMining();
if (!mining) {
throw std::runtime_error("IPC connection returned no mining interface");
}
LogPrintf("Connected to bitcoin-node via IPC at: %s\n", address);
return {std::move(node_init), std::move(mining)};
} catch (const std::exception& exception) {
LogPrintf("IPC connection failed: %s\n", exception.what());
LogPrintf("Retrying in %d seconds... (Ensure Bitcoin Core is running with '-ipcbind=unix')\n",
retry_delay.count());
auto waited{0ms};
while (!g_interrupt && waited < retry_delay) {
std::this_thread::sleep_for(IPC_RECONNECT_POLL_INTERVAL);
waited += IPC_RECONNECT_POLL_INTERVAL;
}
retry_delay = std::min(retry_delay * 2, IPC_RECONNECT_MAX_DELAY);
}
}
}
return {};
};

auto connection = connect_to_node();
auto node_init = std::move(connection.first);
auto mining = std::move(connection.second);
if (g_interrupt) return EXIT_SUCCESS;
assert(node_init);
std::unique_ptr<interfaces::Mining> mining{node_init->makeMining()};
assert(mining);

auto tp = std::make_unique<Sv2TemplateProvider>(*mining);
auto tp = std::make_unique<Sv2TemplateProvider>();
tp->ReplaceBackend(std::move(node_init), std::move(mining));

if (!tp->Start(options)) {
tfm::format(std::cerr, "Unable to start Stratum v2 Template Provider");
Expand All @@ -273,8 +296,20 @@ MAIN_FUNCTION
registerSignalHandler(SIGINT, HandleSIGTERM);
#endif

while(!g_interrupt) {
UninterruptibleSleep(100ms);
while (!g_interrupt) {
if (!tp->BackendDisconnected()) {
UninterruptibleSleep(100ms);
continue;
}

LogPrintf("Restarting sv2-tp after Bitcoin Core IPC disconnect\n");
connection = connect_to_node();
node_init = std::move(connection.first);
mining = std::move(connection.second);
if (g_interrupt) break;
assert(node_init);
assert(mining);
tp->ReplaceBackend(std::move(node_init), std::move(mining));
}

tp->Interrupt();
Expand Down
Loading