Skip to content

Commit 21f12a5

Browse files
committed
Switch to a separate executor for RPC calls to avoid tokio hangs
See the comment in the commit for more info on why we have to do this.
1 parent 04aaa24 commit 21f12a5

File tree

1 file changed

+83
-35
lines changed

1 file changed

+83
-35
lines changed

src/bitcoind_client.rs

+83-35
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,14 @@ use lightning_block_sync::rpc::RpcClient;
2525
use lightning_block_sync::{AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource};
2626
use serde_json;
2727
use std::collections::HashMap;
28+
use std::future::Future;
2829
use std::str::FromStr;
2930
use std::sync::atomic::{AtomicU32, Ordering};
3031
use std::sync::Arc;
3132
use std::time::Duration;
3233

34+
use tokio::runtime::{self, Runtime};
35+
3336
pub struct BitcoindClient {
3437
pub(crate) bitcoind_rpc_client: Arc<RpcClient>,
3538
network: Network,
@@ -38,7 +41,8 @@ pub struct BitcoindClient {
3841
rpc_user: String,
3942
rpc_password: String,
4043
fees: Arc<HashMap<ConfirmationTarget, AtomicU32>>,
41-
handle: tokio::runtime::Handle,
44+
main_runtime_handle: runtime::Handle,
45+
inner_runtime: Arc<Runtime>,
4246
logger: Arc<FilesystemLogger>,
4347
}
4448

@@ -66,7 +70,7 @@ const MIN_FEERATE: u32 = 253;
6670
impl BitcoindClient {
6771
pub(crate) async fn new(
6872
host: String, port: u16, rpc_user: String, rpc_password: String, network: Network,
69-
handle: tokio::runtime::Handle, logger: Arc<FilesystemLogger>,
73+
handle: runtime::Handle, logger: Arc<FilesystemLogger>,
7074
) -> std::io::Result<Self> {
7175
let http_endpoint = HttpEndpoint::for_host(host.clone()).with_port(port);
7276
let rpc_credentials =
@@ -95,6 +99,15 @@ impl BitcoindClient {
9599
fees.insert(ConfirmationTarget::ChannelCloseMinimum, AtomicU32::new(MIN_FEERATE));
96100
fees.insert(ConfirmationTarget::OutputSpendingFee, AtomicU32::new(MIN_FEERATE));
97101

102+
let mut builder = runtime::Builder::new_multi_thread();
103+
let runtime =
104+
builder.enable_all().worker_threads(1).thread_name("rpc-worker").build().unwrap();
105+
let inner_runtime = Arc::new(runtime);
106+
// Tokio will panic if we drop a runtime while in another runtime. Because the entire
107+
// application runs inside a tokio runtime, we have to ensure this runtime is never
108+
// `drop`'d, which we do by leaking an Arc reference.
109+
std::mem::forget(Arc::clone(&inner_runtime));
110+
98111
let client = Self {
99112
bitcoind_rpc_client: Arc::new(bitcoind_rpc_client),
100113
host,
@@ -103,7 +116,8 @@ impl BitcoindClient {
103116
rpc_password,
104117
network,
105118
fees: Arc::new(fees),
106-
handle: handle.clone(),
119+
main_runtime_handle: handle.clone(),
120+
inner_runtime,
107121
logger,
108122
};
109123
BitcoindClient::poll_for_fee_estimates(
@@ -226,10 +240,42 @@ impl BitcoindClient {
226240
});
227241
}
228242

243+
fn run_future_in_blocking_context<F: Future + Send + 'static>(&self, future: F) -> F::Output
244+
where
245+
F::Output: Send + 'static,
246+
{
247+
// Tokio deliberately makes it nigh impossible to block on a future in a sync context that
248+
// is running in an async task (which makes it really hard to interact with sync code that
249+
// has callbacks in an async project).
250+
//
251+
// Reading the docs, it *seems* like
252+
// `tokio::task::block_in_place(tokio::runtime::Handle::spawn(future))` should do the
253+
// trick, and 99.999% of the time it does! But tokio has a "non-stealable I/O driver" - if
254+
// the task we're running happens to, by sheer luck, be holding the "I/O driver" when we go
255+
// into a `block_in_place` call, and the inner future requires I/O (which of course it
256+
// does, its a future!), the whole thing will come to a grinding halt as no other thread is
257+
// allowed to poll I/O until the blocked one finishes.
258+
//
259+
// This is, of course, nuts, and an almost trivial performance penalty of occasional
260+
// additional wakeups would solve this, but tokio refuses to do so because any performance
261+
// penalty at all would be too much (tokio issue #4730).
262+
//
263+
// Instead, we have to do a rather insane dance - we have to spawn the `future` we want to
264+
// run on a *different* (threaded) tokio runtime (doing the `block_in_place` dance to avoid
265+
// blocking too many threads on the main runtime). We want to block on that `future` being
266+
// run on the other runtime's threads, but tokio only provides `block_on` to do so, which
267+
// runs the `future` itself on the current thread, panicing if this thread is already a
268+
// part of a tokio runtime (which in this case it is - the main tokio runtime). Thus, we
269+
// have to `spawn` the `future` on the secondary runtime and then `block_on` the resulting
270+
// `JoinHandle` on the main runtime.
271+
tokio::task::block_in_place(move || {
272+
self.main_runtime_handle.block_on(self.inner_runtime.spawn(future)).unwrap()
273+
})
274+
}
275+
229276
pub fn get_new_rpc_client(&self) -> RpcClient {
230277
let http_endpoint = HttpEndpoint::for_host(self.host.clone()).with_port(self.port);
231-
let rpc_credentials =
232-
base64::encode(format!("{}:{}", self.rpc_user.clone(), self.rpc_password.clone()));
278+
let rpc_credentials = base64::encode(format!("{}:{}", self.rpc_user, self.rpc_password));
233279
RpcClient::new(&rpc_credentials, http_endpoint)
234280
}
235281

@@ -273,22 +319,28 @@ impl BitcoindClient {
273319
.unwrap();
274320
}
275321

276-
pub async fn sign_raw_transaction_with_wallet(&self, tx_hex: String) -> SignedTx {
322+
pub fn sign_raw_transaction_with_wallet(
323+
&self, tx_hex: String,
324+
) -> impl Future<Output = SignedTx> {
277325
let tx_hex_json = serde_json::json!(tx_hex);
278-
self.bitcoind_rpc_client
279-
.call_method("signrawtransactionwithwallet", &vec![tx_hex_json])
280-
.await
281-
.unwrap()
326+
let rpc_client = self.get_new_rpc_client();
327+
async move {
328+
rpc_client
329+
.call_method("signrawtransactionwithwallet", &vec![tx_hex_json])
330+
.await
331+
.unwrap()
332+
}
282333
}
283334

284-
pub async fn get_new_address(&self) -> Address {
335+
pub fn get_new_address(&self) -> impl Future<Output = Address> {
285336
let addr_args = vec![serde_json::json!("LDK output address")];
286-
let addr = self
287-
.bitcoind_rpc_client
288-
.call_method::<NewAddress>("getnewaddress", &addr_args)
289-
.await
290-
.unwrap();
291-
Address::from_str(addr.0.as_str()).unwrap().require_network(self.network).unwrap()
337+
let network = self.network;
338+
let rpc_client = self.get_new_rpc_client();
339+
async move {
340+
let addr =
341+
rpc_client.call_method::<NewAddress>("getnewaddress", &addr_args).await.unwrap();
342+
Address::from_str(addr.0.as_str()).unwrap().require_network(network).unwrap()
343+
}
292344
}
293345

294346
pub async fn get_blockchain_info(&self) -> BlockchainInfo {
@@ -298,11 +350,11 @@ impl BitcoindClient {
298350
.unwrap()
299351
}
300352

301-
pub async fn list_unspent(&self) -> ListUnspentResponse {
302-
self.bitcoind_rpc_client
303-
.call_method::<ListUnspentResponse>("listunspent", &vec![])
304-
.await
305-
.unwrap()
353+
pub fn list_unspent(&self) -> impl Future<Output = ListUnspentResponse> {
354+
let rpc_client = self.get_new_rpc_client();
355+
async move {
356+
rpc_client.call_method::<ListUnspentResponse>("listunspent", &vec![]).await.unwrap()
357+
}
306358
}
307359
}
308360

@@ -324,7 +376,7 @@ impl BroadcasterInterface for BitcoindClient {
324376
let txn = txs.iter().map(|tx| encode::serialize_hex(tx)).collect::<Vec<_>>();
325377
let bitcoind_rpc_client = Arc::clone(&self.bitcoind_rpc_client);
326378
let logger = Arc::clone(&self.logger);
327-
self.handle.spawn(async move {
379+
self.main_runtime_handle.spawn(async move {
328380
let res = if txn.len() == 1 {
329381
let tx_json = serde_json::json!(txn[0]);
330382
bitcoind_rpc_client
@@ -355,17 +407,15 @@ impl BroadcasterInterface for BitcoindClient {
355407

356408
impl ChangeDestinationSource for BitcoindClient {
357409
fn get_change_destination_script(&self) -> Result<ScriptBuf, ()> {
358-
tokio::task::block_in_place(move || {
359-
Ok(self.handle.block_on(async move { self.get_new_address().await.script_pubkey() }))
360-
})
410+
let future = self.get_new_address();
411+
Ok(self.run_future_in_blocking_context(async move { future.await.script_pubkey() }))
361412
}
362413
}
363414

364415
impl WalletSource for BitcoindClient {
365416
fn list_confirmed_utxos(&self) -> Result<Vec<Utxo>, ()> {
366-
let utxos = tokio::task::block_in_place(move || {
367-
self.handle.block_on(async move { self.list_unspent().await }).0
368-
});
417+
let future = self.list_unspent();
418+
let utxos = self.run_future_in_blocking_context(async move { future.await.0 });
369419
Ok(utxos
370420
.into_iter()
371421
.filter_map(|utxo| {
@@ -398,18 +448,16 @@ impl WalletSource for BitcoindClient {
398448
}
399449

400450
fn get_change_script(&self) -> Result<ScriptBuf, ()> {
401-
tokio::task::block_in_place(move || {
402-
Ok(self.handle.block_on(async move { self.get_new_address().await.script_pubkey() }))
403-
})
451+
let future = self.get_new_address();
452+
Ok(self.run_future_in_blocking_context(async move { future.await.script_pubkey() }))
404453
}
405454

406455
fn sign_psbt(&self, tx: Psbt) -> Result<Transaction, ()> {
407456
let mut tx_bytes = Vec::new();
408457
let _ = tx.unsigned_tx.consensus_encode(&mut tx_bytes).map_err(|_| ());
409458
let tx_hex = hex_utils::hex_str(&tx_bytes);
410-
let signed_tx = tokio::task::block_in_place(move || {
411-
self.handle.block_on(async move { self.sign_raw_transaction_with_wallet(tx_hex).await })
412-
});
459+
let future = self.sign_raw_transaction_with_wallet(tx_hex);
460+
let signed_tx = self.run_future_in_blocking_context(async move { future.await });
413461
let signed_tx_bytes = hex_utils::to_vec(&signed_tx.hex).ok_or(())?;
414462
Transaction::consensus_decode(&mut signed_tx_bytes.as_slice()).map_err(|_| ())
415463
}

0 commit comments

Comments
 (0)