From 7e7a786b792a7bff09dc25edeb30c8361d06edce Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Mon, 23 Sep 2024 14:34:42 +0200 Subject: [PATCH 1/6] introduce ZMQ listener thread and arg --- Cargo.lock | 195 ++++++++++++++++++++++++++++++++++++++++++- Cargo.toml | 1 + src/bin/electrs.rs | 6 +- src/config.rs | 10 +++ src/new_index/mod.rs | 1 + src/new_index/zmq.rs | 43 ++++++++++ tests/common.rs | 1 + 7 files changed, 254 insertions(+), 3 deletions(-) create mode 100644 src/new_index/zmq.rs diff --git a/Cargo.lock b/Cargo.lock index c399a9585..1f95506c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -416,6 +416,16 @@ dependencies = [ "nom", ] +[[package]] +name = "cfg-expr" +version = "0.15.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d067ad48b8650848b989a59a86c6c36a995d02d2bf778d45c3c5d57bc2718f02" +dependencies = [ + "smallvec", + "target-lexicon", +] + [[package]] name = "cfg-if" version = "1.0.0" @@ -513,7 +523,7 @@ checksum = "4ea181bf566f71cb9a5d17a59e1871af638180a18fb0035c92ae62b705207123" dependencies = [ "bitflags 1.3.2", "clap_lex", - "indexmap", + "indexmap 1.9.3", "textwrap 0.16.1", ] @@ -625,6 +635,19 @@ dependencies = [ "itertools 0.10.5", ] +[[package]] +name = "crossbeam" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + [[package]] name = "crossbeam-channel" version = "0.5.12" @@ -653,6 +676,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df0346b5d5e76ac2fe4e327c5fd1118d6be7c51dfb18f9b7922923f287471e35" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.19" @@ -695,6 +727,17 @@ dependencies = [ "subtle", ] +[[package]] +name = "dircpy" +version = "0.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a88521b0517f5f9d51d11925d8ab4523497dcf947073fa3231a311b63941131c" +dependencies = [ + "jwalk", + "log", + "walkdir", +] + [[package]] name = "dirs" version = "5.0.1" @@ -775,6 +818,7 @@ dependencies = [ "tokio", "ureq 2.9.6", "url", + "zmq", ] [[package]] @@ -831,6 +875,12 @@ dependencies = [ "tar", ] +[[package]] +name = "equivalent" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" + [[package]] name = "errno" version = "0.2.8" @@ -1022,6 +1072,18 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + [[package]] name = "hermit-abi" version = "0.1.19" @@ -1194,7 +1256,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" dependencies = [ "autocfg", - "hashbrown", + "hashbrown 0.12.3", +] + +[[package]] +name = "indexmap" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68b900aa2f7301e21c36462b170ee99994de34dff39a4a6a528e80e7376d07e5" +dependencies = [ + "equivalent", + "hashbrown 0.14.5", ] [[package]] @@ -1272,6 +1344,16 @@ dependencies = [ "serde_json", ] +[[package]] +name = "jwalk" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2735847566356cd2179a2a38264839308f7079fa96e6bd5a42d740460e003c56" +dependencies = [ + "crossbeam", + "rayon", +] + [[package]] name = "kernel32-sys" version = "0.2.2" @@ -2155,6 +2237,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_spanned" +version = "0.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb5b1b31579f3811bf615c144393417496f152e12ac8b7663bf664f4a815306d" +dependencies = [ + "serde", +] + [[package]] name = "sha1" version = "0.6.1" @@ -2382,6 +2473,19 @@ dependencies = [ "winapi 0.2.8", ] +[[package]] +name = "system-deps" +version = "6.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3e535eb8dded36d55ec13eddacd30dec501792ff23a0b1682c38601b8cf2349" +dependencies = [ + "cfg-expr", + "heck", + "pkg-config", + "toml", + "version-compare", +] + [[package]] name = "tar" version = "0.4.40" @@ -2393,6 +2497,12 @@ dependencies = [ "xattr", ] +[[package]] +name = "target-lexicon" +version = "0.12.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61c41af27dd6d1e27b1b16b489db798443478cef1f06a660c96db617ba5de3b1" + [[package]] name = "tempfile" version = "3.10.1" @@ -2603,6 +2713,40 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "toml" +version = "0.8.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1ed1f98e3fdc28d6d910e6737ae6ab1a93bf1985935a1193e68f93eeb68d24e" +dependencies = [ + "serde", + "serde_spanned", + "toml_datetime", + "toml_edit", +] + +[[package]] +name = "toml_datetime" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0dd7358ecb8fc2f8d014bf86f6f638ce72ba252a2c3a2572f2a795f1d23efb41" +dependencies = [ + "serde", +] + +[[package]] +name = "toml_edit" +version = "0.22.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b072cee73c449a636ffd6f32bd8de3a9f7119139aff882f44943ce2986dc5cf" +dependencies = [ + "indexmap 2.5.0", + "serde", + "serde_spanned", + "toml_datetime", + "winnow", +] + [[package]] name = "tower-service" version = "0.3.2" @@ -2735,6 +2879,12 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" +[[package]] +name = "version-compare" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "852e951cb7832cb45cb1169900d19760cfa39b82bc0ea9c0e5a14ae88411c98b" + [[package]] name = "version_check" version = "0.9.4" @@ -3066,6 +3216,15 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" +[[package]] +name = "winnow" +version = "0.6.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68a9bda4691f099d435ad181000724da8e5899daa10713c2d432552b9ccd3a6f" +dependencies = [ + "memchr", +] + [[package]] name = "xattr" version = "1.3.1" @@ -3077,6 +3236,16 @@ dependencies = [ "rustix", ] +[[package]] +name = "zeromq-src" +version = "0.2.6+4.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc120b771270365d5ed0dfb4baf1005f2243ae1ae83703265cb3504070f4160b" +dependencies = [ + "cc", + "dircpy", +] + [[package]] name = "zip" version = "0.6.6" @@ -3097,6 +3266,28 @@ dependencies = [ "zstd", ] +[[package]] +name = "zmq" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd3091dd571fb84a9b3e5e5c6a807d186c411c812c8618786c3c30e5349234e7" +dependencies = [ + "bitflags 1.3.2", + "libc", + "zmq-sys", +] + +[[package]] +name = "zmq-sys" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e8351dc72494b4d7f5652a681c33634063bbad58046c1689e75270908fdc864" +dependencies = [ + "libc", + "system-deps", + "zeromq-src", +] + [[package]] name = "zstd" version = "0.11.2+zstd.1.5.2" diff --git a/Cargo.toml b/Cargo.toml index ef8ed6fb1..a4fcd78b2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,6 +57,7 @@ tokio = { version = "1", features = ["sync", "macros"] } # optional dependencies for electrum-discovery electrum-client = { version = "0.8", optional = true } +zmq = "0.10.0" [dev-dependencies] diff --git a/src/bin/electrs.rs b/src/bin/electrs.rs index 4d4eb1c17..6f239aafd 100644 --- a/src/bin/electrs.rs +++ b/src/bin/electrs.rs @@ -15,7 +15,7 @@ use electrs::{ electrum::RPC as ElectrumRPC, errors::*, metrics::Metrics, - new_index::{precache, ChainQuery, FetchFrom, Indexer, Mempool, Query, Store}, + new_index::{precache, zmq, ChainQuery, FetchFrom, Indexer, Mempool, Query, Store}, rest, signal::Waiter, }; @@ -45,6 +45,10 @@ fn run_server(config: Arc) -> Result<()> { let metrics = Metrics::new(config.monitoring_addr); metrics.start(); + if let Some(zmq_addr) = config.zmq_addr.as_ref() { + zmq::start(&format!("tcp://{zmq_addr}"), None); + } + let daemon = Arc::new(Daemon::new( &config.daemon_dir, &config.blocks_dir, diff --git a/src/config.rs b/src/config.rs index 4a1066f17..17c94aaab 100644 --- a/src/config.rs +++ b/src/config.rs @@ -41,6 +41,7 @@ pub struct Config { pub electrum_txs_limit: usize, pub electrum_banner: String, pub electrum_rpc_logging: Option, + pub zmq_addr: Option, /// Enable compaction during initial sync /// @@ -208,6 +209,11 @@ impl Config { Arg::with_name("initial_sync_compaction") .long("initial-sync-compaction") .help("Perform compaction during initial sync (slower but less disk space required)") + ).arg( + Arg::with_name("zmq_addr") + .long("zmq-addr") + .help("Optional zmq socket address of the bitcoind daemon") + .takes_value(true), ); #[cfg(unix)] @@ -352,6 +358,9 @@ impl Config { .unwrap_or(&format!("127.0.0.1:{}", default_http_port)), "HTTP Server", ); + let zmq_addr: Option = m + .value_of("zmq_addr") + .map(|e| str_to_socketaddr(e, "ZMQ addr")); let http_socket_file: Option = m.value_of("http_socket_file").map(PathBuf::from); let monitoring_addr: SocketAddr = str_to_socketaddr( @@ -422,6 +431,7 @@ impl Config { cors: m.value_of("cors").map(|s| s.to_string()), precache_scripts: m.value_of("precache_scripts").map(|s| s.to_string()), initial_sync_compaction: m.is_present("initial_sync_compaction"), + zmq_addr, #[cfg(feature = "liquid")] parent_network, diff --git a/src/new_index/mod.rs b/src/new_index/mod.rs index 30c7854b1..09730b104 100644 --- a/src/new_index/mod.rs +++ b/src/new_index/mod.rs @@ -4,6 +4,7 @@ mod mempool; pub mod precache; mod query; pub mod schema; +pub mod zmq; pub use self::db::{DBRow, DB}; pub use self::fetch::{BlockEntry, FetchFrom}; diff --git a/src/new_index/zmq.rs b/src/new_index/zmq.rs new file mode 100644 index 000000000..5315b4264 --- /dev/null +++ b/src/new_index/zmq.rs @@ -0,0 +1,43 @@ +use std::sync::mpsc::Sender; + +use bitcoin::{hashes::Hash, BlockHash}; + +use crate::util::spawn_thread; + +pub fn start(url: &str, block_hash_notify: Option>) { + log::debug!("Starting ZMQ thread"); + let ctx = zmq::Context::new(); + let subscriber: zmq::Socket = ctx.socket(zmq::SUB).expect("failed creating subscriber"); + subscriber + .connect(url) + .expect("failed connecting subscriber"); + + // subscriber.set_subscribe(b"rawtx").unwrap(); + subscriber + .set_subscribe(b"hashblock") + .expect("failed subscribing to hashblock"); + + spawn_thread("zmq", move || loop { + match subscriber.recv_multipart(0) { + Ok(data) => match (data.get(0), data.get(1)) { + (Some(topic), Some(data)) => { + if &topic[..] == &[114, 97, 119, 116, 120] { + //rawtx + } else if &topic[..] == &[104, 97, 115, 104, 98, 108, 111, 99, 107] { + //hashblock + let mut reversed = data.to_vec(); + reversed.reverse(); + if let Ok(block_hash) = BlockHash::from_slice(&reversed[..]) { + log::debug!("New block from ZMQ: {block_hash}"); + if let Some(block_hash_notify) = block_hash_notify.as_ref() { + let _ = block_hash_notify.send(block_hash); + } + } + } + } + _ => (), + }, + Err(e) => log::warn!("recv_multipart error: {e:?}"), + } + }); +} diff --git a/tests/common.rs b/tests/common.rs index bac94d8a2..5819709bf 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -109,6 +109,7 @@ impl TestRunner { electrum_txs_limit: 100, electrum_banner: "".into(), electrum_rpc_logging: None, + zmq_addr: None, #[cfg(feature = "liquid")] asset_db_path: None, // XXX From 1e37f20d50dcf9cf683c8fddf86b8d6aaccb033e Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Tue, 22 Oct 2024 13:56:13 +0200 Subject: [PATCH 2/6] use select! instead of recv_deadline this commit aim to achieve the same behaviour as before but using the select! construct instead of recv_timeout because in the following commits we want to add a case to the select! --- src/signal.rs | 37 ++++++++++++++++++------------------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/src/signal.rs b/src/signal.rs index 9bc30d9e3..a36688181 100644 --- a/src/signal.rs +++ b/src/signal.rs @@ -1,5 +1,4 @@ -use crossbeam_channel as channel; -use crossbeam_channel::RecvTimeoutError; +use crossbeam_channel::{self as channel, after, select}; use std::thread; use std::time::{Duration, Instant}; @@ -36,24 +35,24 @@ impl Waiter { } pub fn wait(&self, duration: Duration, accept_sigusr: bool) -> Result<()> { - // Determine the deadline time based on the duration, so that it doesn't - // get pushed back when wait_deadline() recurses - self.wait_deadline(Instant::now() + duration, accept_sigusr) - } - - fn wait_deadline(&self, deadline: Instant, accept_sigusr: bool) -> Result<()> { - match self.receiver.recv_deadline(deadline) { - Ok(sig) if sig == SIGUSR1 => { - trace!("notified via SIGUSR1"); - if accept_sigusr { - Ok(()) - } else { - self.wait_deadline(deadline, accept_sigusr) + let start = Instant::now(); + select! { + recv(self.receiver) -> msg => { + match msg { + Ok(sig) if sig == SIGUSR1 => { + trace!("notified via SIGUSR1"); + if accept_sigusr { + Ok(()) + } else { + let wait_more = duration.saturating_sub(start.elapsed()); + self.wait(wait_more, accept_sigusr) + } + } + Ok(sig) => bail!(ErrorKind::Interrupt(sig)), + Err(_) => bail!("signal hook channel disconnected"), } - } - Ok(sig) => bail!(ErrorKind::Interrupt(sig)), - Err(RecvTimeoutError::Timeout) => Ok(()), - Err(RecvTimeoutError::Disconnected) => bail!("signal hook channel disconnected"), + }, + recv(after(duration)) -> _ => Ok(()), } } } From 75871658ce7f14c5a88d724ca9819f4a712c4445 Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Tue, 22 Oct 2024 14:20:04 +0200 Subject: [PATCH 3/6] main loop listen also to block notify from zmq --- src/bin/electrs.rs | 4 +-- src/bin/tx-fingerprint-stats.rs | 2 +- src/new_index/zmq.rs | 3 +-- src/signal.rs | 44 ++++++++++++++++++++++++--------- tests/common.rs | 2 +- 5 files changed, 38 insertions(+), 17 deletions(-) diff --git a/src/bin/electrs.rs b/src/bin/electrs.rs index 6f239aafd..1192cb227 100644 --- a/src/bin/electrs.rs +++ b/src/bin/electrs.rs @@ -41,12 +41,12 @@ fn fetch_from(config: &Config, store: &Store) -> FetchFrom { } fn run_server(config: Arc) -> Result<()> { - let signal = Waiter::start(); + let (block_hash_notify, signal) = Waiter::start(); let metrics = Metrics::new(config.monitoring_addr); metrics.start(); if let Some(zmq_addr) = config.zmq_addr.as_ref() { - zmq::start(&format!("tcp://{zmq_addr}"), None); + zmq::start(&format!("tcp://{zmq_addr}"), Some(block_hash_notify)); } let daemon = Arc::new(Daemon::new( diff --git a/src/bin/tx-fingerprint-stats.rs b/src/bin/tx-fingerprint-stats.rs index 94a3821ab..0a39eaf05 100644 --- a/src/bin/tx-fingerprint-stats.rs +++ b/src/bin/tx-fingerprint-stats.rs @@ -21,7 +21,7 @@ fn main() { util::has_prevout, }; - let signal = Waiter::start(); + let signal = Waiter::start().1; let config = Config::from_args(); let store = Arc::new(Store::open(&config.db_path.join("newindex"), &config)); diff --git a/src/new_index/zmq.rs b/src/new_index/zmq.rs index 5315b4264..5c35c9b21 100644 --- a/src/new_index/zmq.rs +++ b/src/new_index/zmq.rs @@ -1,6 +1,5 @@ -use std::sync::mpsc::Sender; - use bitcoin::{hashes::Hash, BlockHash}; +use crossbeam_channel::Sender; use crate::util::spawn_thread; diff --git a/src/signal.rs b/src/signal.rs index a36688181..16e3e57d0 100644 --- a/src/signal.rs +++ b/src/signal.rs @@ -1,4 +1,5 @@ -use crossbeam_channel::{self as channel, after, select}; +use bitcoin::BlockHash; +use crossbeam_channel::{self as channel, after, select, Sender}; use std::thread; use std::time::{Duration, Instant}; @@ -9,6 +10,7 @@ use crate::errors::*; #[derive(Clone)] // so multiple threads could wait on signals pub struct Waiter { receiver: channel::Receiver, + zmq_receiver: channel::Receiver, } fn notify(signals: &[i32]) -> channel::Receiver { @@ -25,34 +27,54 @@ fn notify(signals: &[i32]) -> channel::Receiver { } impl Waiter { - pub fn start() -> Waiter { - Waiter { - receiver: notify(&[ - SIGINT, SIGTERM, - SIGUSR1, // allow external triggering (e.g. via bitcoind `blocknotify`) - ]), - } + pub fn start() -> (Sender, Waiter) { + let (block_hash_notify, block_hash_receive) = channel::bounded(1); + + ( + block_hash_notify, + Waiter { + receiver: notify(&[ + SIGINT, SIGTERM, + SIGUSR1, // allow external triggering (e.g. via bitcoind `blocknotify`) + ]), + zmq_receiver: block_hash_receive, + }, + ) } - pub fn wait(&self, duration: Duration, accept_sigusr: bool) -> Result<()> { + pub fn wait(&self, duration: Duration, accept_block_notification: bool) -> Result<()> { let start = Instant::now(); select! { recv(self.receiver) -> msg => { match msg { Ok(sig) if sig == SIGUSR1 => { trace!("notified via SIGUSR1"); - if accept_sigusr { + if accept_block_notification { Ok(()) } else { let wait_more = duration.saturating_sub(start.elapsed()); - self.wait(wait_more, accept_sigusr) + self.wait(wait_more, accept_block_notification) } } Ok(sig) => bail!(ErrorKind::Interrupt(sig)), Err(_) => bail!("signal hook channel disconnected"), } }, + recv(self.zmq_receiver) -> msg => { + match msg { + Ok(_) => { + if accept_block_notification { + Ok(()) + } else { + let wait_more = duration.saturating_sub(start.elapsed()); + self.wait(wait_more, accept_block_notification) + } + } + Err(_) => bail!("signal hook channel disconnected"), + } + }, recv(after(duration)) -> _ => Ok(()), + } } } diff --git a/tests/common.rs b/tests/common.rs index 5819709bf..66f50a794 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -124,7 +124,7 @@ impl TestRunner { //tor_proxy: Option, }); - let signal = Waiter::start(); + let signal = Waiter::start().1; let metrics = Metrics::new(rand_available_addr()); metrics.start(); From 974d78971fbfc9555e7a85fa32b2f91af6db8023 Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Tue, 22 Oct 2024 18:26:33 +0200 Subject: [PATCH 4/6] zmq always accept a notifier --- src/bin/electrs.rs | 2 +- src/new_index/zmq.rs | 6 ++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/bin/electrs.rs b/src/bin/electrs.rs index 1192cb227..e385061d6 100644 --- a/src/bin/electrs.rs +++ b/src/bin/electrs.rs @@ -46,7 +46,7 @@ fn run_server(config: Arc) -> Result<()> { metrics.start(); if let Some(zmq_addr) = config.zmq_addr.as_ref() { - zmq::start(&format!("tcp://{zmq_addr}"), Some(block_hash_notify)); + zmq::start(&format!("tcp://{zmq_addr}"), block_hash_notify); } let daemon = Arc::new(Daemon::new( diff --git a/src/new_index/zmq.rs b/src/new_index/zmq.rs index 5c35c9b21..73d5d7964 100644 --- a/src/new_index/zmq.rs +++ b/src/new_index/zmq.rs @@ -3,7 +3,7 @@ use crossbeam_channel::Sender; use crate::util::spawn_thread; -pub fn start(url: &str, block_hash_notify: Option>) { +pub fn start(url: &str, block_hash_notify: Sender) { log::debug!("Starting ZMQ thread"); let ctx = zmq::Context::new(); let subscriber: zmq::Socket = ctx.socket(zmq::SUB).expect("failed creating subscriber"); @@ -28,9 +28,7 @@ pub fn start(url: &str, block_hash_notify: Option>) { reversed.reverse(); if let Ok(block_hash) = BlockHash::from_slice(&reversed[..]) { log::debug!("New block from ZMQ: {block_hash}"); - if let Some(block_hash_notify) = block_hash_notify.as_ref() { - let _ = block_hash_notify.send(block_hash); - } + let _ = block_hash_notify.send(block_hash); } } } From 4edd741592ec00c3f1e33969ed26fc9f395f90e0 Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Tue, 22 Oct 2024 18:27:29 +0200 Subject: [PATCH 5/6] Waiter accept receiver instead of returning sender --- src/bin/electrs.rs | 4 +++- src/bin/tx-fingerprint-stats.rs | 2 +- src/signal.rs | 23 +++++++++-------------- tests/common.rs | 2 +- 4 files changed, 14 insertions(+), 17 deletions(-) diff --git a/src/bin/electrs.rs b/src/bin/electrs.rs index e385061d6..59d3b5383 100644 --- a/src/bin/electrs.rs +++ b/src/bin/electrs.rs @@ -4,6 +4,7 @@ extern crate log; extern crate electrs; +use crossbeam_channel::{self as channel}; use error_chain::ChainedError; use std::process; use std::sync::{Arc, RwLock}; @@ -41,7 +42,8 @@ fn fetch_from(config: &Config, store: &Store) -> FetchFrom { } fn run_server(config: Arc) -> Result<()> { - let (block_hash_notify, signal) = Waiter::start(); + let (block_hash_notify, block_hash_receive) = channel::bounded(1); + let signal = Waiter::start(block_hash_receive); let metrics = Metrics::new(config.monitoring_addr); metrics.start(); diff --git a/src/bin/tx-fingerprint-stats.rs b/src/bin/tx-fingerprint-stats.rs index 0a39eaf05..4c7e8a394 100644 --- a/src/bin/tx-fingerprint-stats.rs +++ b/src/bin/tx-fingerprint-stats.rs @@ -21,7 +21,7 @@ fn main() { util::has_prevout, }; - let signal = Waiter::start().1; + let signal = Waiter::start(crossbeam_channel::never()); let config = Config::from_args(); let store = Arc::new(Store::open(&config.db_path.join("newindex"), &config)); diff --git a/src/signal.rs b/src/signal.rs index 16e3e57d0..ee319c3f6 100644 --- a/src/signal.rs +++ b/src/signal.rs @@ -1,5 +1,5 @@ use bitcoin::BlockHash; -use crossbeam_channel::{self as channel, after, select, Sender}; +use crossbeam_channel::{self as channel, after, select}; use std::thread; use std::time::{Duration, Instant}; @@ -27,19 +27,14 @@ fn notify(signals: &[i32]) -> channel::Receiver { } impl Waiter { - pub fn start() -> (Sender, Waiter) { - let (block_hash_notify, block_hash_receive) = channel::bounded(1); - - ( - block_hash_notify, - Waiter { - receiver: notify(&[ - SIGINT, SIGTERM, - SIGUSR1, // allow external triggering (e.g. via bitcoind `blocknotify`) - ]), - zmq_receiver: block_hash_receive, - }, - ) + pub fn start(block_hash_receive: channel::Receiver) -> Waiter { + Waiter { + receiver: notify(&[ + SIGINT, SIGTERM, + SIGUSR1, // allow external triggering (e.g. via bitcoind `blocknotify`) + ]), + zmq_receiver: block_hash_receive, + } } pub fn wait(&self, duration: Duration, accept_block_notification: bool) -> Result<()> { diff --git a/tests/common.rs b/tests/common.rs index 66f50a794..e4a7e8015 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -124,7 +124,7 @@ impl TestRunner { //tor_proxy: Option, }); - let signal = Waiter::start().1; + let signal = Waiter::start(crossbeam_channel::never()); let metrics = Metrics::new(rand_available_addr()); metrics.start(); From a63c0634e741f1a696147f7b4ab5d80658ae2c2c Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Tue, 22 Oct 2024 18:28:15 +0200 Subject: [PATCH 6/6] rust format --- src/bin/tx-fingerprint-stats.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/bin/tx-fingerprint-stats.rs b/src/bin/tx-fingerprint-stats.rs index 4c7e8a394..0a0399692 100644 --- a/src/bin/tx-fingerprint-stats.rs +++ b/src/bin/tx-fingerprint-stats.rs @@ -83,13 +83,15 @@ fn main() { //info!("{:?},{:?}", txid, blockid); - let prevouts = chain.lookup_txos( - tx.input - .iter() - .filter(|txin| has_prevout(txin)) - .map(|txin| txin.previous_output) - .collect(), - ).unwrap(); + let prevouts = chain + .lookup_txos( + tx.input + .iter() + .filter(|txin| has_prevout(txin)) + .map(|txin| txin.previous_output) + .collect(), + ) + .unwrap(); let total_out: u64 = tx.output.iter().map(|out| out.value.to_sat()).sum(); let small_out = tx