diff --git a/integration-tests/lib/mod.rs b/integration-tests/lib/mod.rs index 360250be9..cc4eaabe7 100644 --- a/integration-tests/lib/mod.rs +++ b/integration-tests/lib/mod.rs @@ -274,7 +274,11 @@ pub fn start_jdc( ); let ret = jd_client_sv2::JobDeclaratorClient::new(jd_client_proxy); let ret_clone = ret.clone(); - tokio::spawn(async move { ret_clone.start().await }); + tokio::spawn(async move { + if let Err(e) = ret_clone.start().await { + panic!("Integration test JDC failed to start: {e}"); + } + }); (ret, jdc_address, monitoring_address) } diff --git a/miner-apps/jd-client/src/lib/error.rs b/miner-apps/jd-client/src/lib/error.rs index 5a3d5cc7f..8bfcaeb31 100644 --- a/miner-apps/jd-client/src/lib/error.rs +++ b/miner-apps/jd-client/src/lib/error.rs @@ -57,6 +57,9 @@ pub struct Upstream; #[derive(Debug)] pub struct Downstream; +#[derive(Debug)] +pub struct JobDeclaratorClient; + #[derive(Debug)] pub struct JDCError { pub kind: JDCErrorKind, @@ -64,7 +67,7 @@ pub struct JDCError { _owner: PhantomData, } -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum Action { Log, Disconnect(DownstreamId), @@ -84,12 +87,14 @@ impl CanDisconnect for ChannelManager {} impl CanFallback for Upstream {} impl CanFallback for JobDeclarator {} impl CanFallback for ChannelManager {} +impl CanFallback for JobDeclaratorClient {} impl CanShutdown for ChannelManager {} impl CanShutdown for TemplateProvider {} impl CanShutdown for Downstream {} impl CanShutdown for Upstream {} impl CanShutdown for JobDeclarator {} +impl CanShutdown for JobDeclaratorClient {} impl JDCError { pub fn log>(kind: E) -> Self { @@ -256,6 +261,8 @@ pub enum JDCErrorKind { InvalidKey, /// Upstream not found UpstreamNotFound, + /// JDC initialization error with details + InitializationError(String), } impl std::error::Error for JDCErrorKind {} @@ -395,6 +402,9 @@ impl fmt::Display for JDCErrorKind { CouldNotInitiateSystem => write!(f, "Could not initiate subsystem"), InvalidKey => write!(f, "Invalid key used during noise handshake"), UpstreamNotFound => write!(f, "Upstream not found"), + InitializationError(ref err) => { + write!(f, "Cannot initialize JD client: {err:?}") + } } } } diff --git a/miner-apps/jd-client/src/lib/mod.rs b/miner-apps/jd-client/src/lib/mod.rs index a87b80c62..c78bd6eec 100644 --- a/miner-apps/jd-client/src/lib/mod.rs +++ b/miner-apps/jd-client/src/lib/mod.rs @@ -22,7 +22,7 @@ use tracing::{debug, error, info, warn}; use crate::{ channel_manager::ChannelManager, config::JobDeclaratorClientConfig, - error::JDCErrorKind, + error::{Action, JDCError, JDCErrorKind, JDCResult}, jd_mode::JDMode, job_declarator::JobDeclarator, template_receiver::{ @@ -68,7 +68,7 @@ impl JobDeclaratorClient { } /// Starts the Job Declarator Client (JDC) main loop. - pub async fn start(&self) { + pub async fn start(&self) -> JDCResult<(), error::JobDeclaratorClient> { info!( "Job declarator client starting... setting up subsystems, User Identity: {}", self.config.user_identity() @@ -78,12 +78,17 @@ impl JobDeclaratorClient { let mut encoded_outputs = vec![]; let mode = JDMode::new(self.config.mode); - if let Err(e) = miner_coinbase_outputs.consensus_encode(&mut encoded_outputs) { - error!(error = ?e, "Invalid coinbase output in config"); + if miner_coinbase_outputs + .consensus_encode(&mut encoded_outputs) + .is_err() + { self.cancellation_token.cancel(); self.shutdown_notify.notify_waiters(); self.is_alive.store(false, Ordering::Relaxed); - return; + + return Err(JDCError::shutdown(JDCErrorKind::InitializationError( + "Invalid coinbase output in config".to_string(), + ))); } let mut fallback_coordinator = FallbackCoordinator::new(); @@ -123,11 +128,11 @@ impl JobDeclaratorClient { { Ok(channel_manager) => channel_manager, Err(e) => { - error!(error = ?e, "Failed to initialize channel manager"); self.cancellation_token.cancel(); self.shutdown_notify.notify_waiters(); self.is_alive.store(false, Ordering::Relaxed); - return; + + return Err(JDCError::shutdown(e.kind)); } }; @@ -207,11 +212,11 @@ impl JobDeclaratorClient { { Ok(template_receiver) => template_receiver, Err(e) => { - error!(error = ?e, "Failed to initialize SV2 template receiver"); self.cancellation_token.cancel(); self.shutdown_notify.notify_waiters(); self.is_alive.store(false, Ordering::Relaxed); - return; + + return Err(JDCError::shutdown(e.kind)); } }; @@ -242,13 +247,11 @@ impl JobDeclaratorClient { ) { Some(unix_socket_path) => unix_socket_path, None => { - error!( - "Could not determine Bitcoin data directory. Please set data_dir in config." - ); self.cancellation_token.cancel(); self.shutdown_notify.notify_waiters(); self.is_alive.store(false, Ordering::Relaxed); - return; + + return Err(JDCError::shutdown(JDCErrorKind::InitializationError("Could not determine Bitcoin data directory. Please set data_dir in config.".to_string()))); } }; @@ -645,6 +648,8 @@ impl JobDeclaratorClient { self.shutdown_notify.notify_waiters(); self.is_alive.store(false, Ordering::Relaxed); info!("JD Client shutdown complete."); + + Ok(()) } pub async fn shutdown(&self) { @@ -671,7 +676,7 @@ impl JobDeclaratorClient { fallback_coordinator: FallbackCoordinator, mode: JDMode, task_manager: Arc, - ) -> Result<(Upstream, JobDeclarator), JDCErrorKind> { + ) -> JDCResult<(Upstream, JobDeclarator), error::JobDeclaratorClient> { const MAX_RETRIES: usize = 3; let upstream_len = upstreams.len(); for (i, upstream_entry) in upstreams.iter_mut().enumerate() { @@ -689,7 +694,9 @@ impl JobDeclaratorClient { biased; _ = cancellation_token.cancelled() => { info!("Shutdown requested while waiting to initialize upstream, aborting retries"); - return Err(JDCErrorKind::CouldNotInitiateSystem); + return Err( + JDCError::shutdown(JDCErrorKind::CouldNotInitiateSystem) + ); } _ = tokio::time::sleep(Duration::from_secs(1)) => {} } @@ -706,7 +713,7 @@ impl JobDeclaratorClient { info!( "Shutdown requested before upstream connection attempt, aborting retries" ); - return Err(JDCErrorKind::CouldNotInitiateSystem); + return Err(JDCError::shutdown(JDCErrorKind::CouldNotInitiateSystem)); } info!("Connection attempt {}/{}...", attempt, MAX_RETRIES); @@ -736,11 +743,18 @@ impl JobDeclaratorClient { biased; _ = cancellation_token.cancelled() => { info!("Shutdown requested after upstream initialization failure, aborting retries"); - return Err(JDCErrorKind::CouldNotInitiateSystem); + return Err( + JDCError::shutdown(JDCErrorKind::CouldNotInitiateSystem) + ); } _ = tokio::time::sleep(Duration::from_secs(1)) => {} } + if e.action == Action::Shutdown { + info!("Encountered a shutdown error during upstream initialization, aborting retries"); + return Err(e); + } + warn!( "Attempt {}/{} failed for pool={}:{}, jds={}:{}: {:?}", attempt, @@ -767,7 +781,7 @@ impl JobDeclaratorClient { } tracing::error!("All upstreams failed after {} retries each", MAX_RETRIES); - Err(JDCErrorKind::CouldNotInitiateSystem) + Err(JDCError::shutdown(JDCErrorKind::CouldNotInitiateSystem)) } } @@ -785,7 +799,7 @@ async fn try_initialize_single( mode: JDMode, task_manager: Arc, config: &JobDeclaratorClientConfig, -) -> Result<(Upstream, JobDeclarator), JDCErrorKind> { +) -> JDCResult<(Upstream, JobDeclarator), error::JobDeclaratorClient> { info!("Upstream connection in-progress at initialize single"); let upstream = Upstream::new( upstream_entry, @@ -797,7 +811,10 @@ async fn try_initialize_single( config.required_extensions().to_vec(), ) .await - .map_err(|error| error.kind)?; + .map_err(|error| match error.action { + Action::Shutdown => JDCError::shutdown(error.kind), + _ => JDCError::fallback(error.kind), + })?; info!("Upstream connection done at initialize single"); @@ -811,7 +828,10 @@ async fn try_initialize_single( task_manager.clone(), ) .await - .map_err(|error| error.kind)?; + .map_err(|error| match error.action { + Action::Shutdown => JDCError::shutdown(error.kind), + _ => JDCError::fallback(error.kind), + })?; Ok((upstream, job_declarator)) } diff --git a/miner-apps/jd-client/src/main.rs b/miner-apps/jd-client/src/main.rs index e77d70e39..017279451 100644 --- a/miner-apps/jd-client/src/main.rs +++ b/miner-apps/jd-client/src/main.rs @@ -25,5 +25,8 @@ async fn inner_main() { }); init_logging(jdc_config.log_file()); - JobDeclaratorClient::new(jdc_config).start().await; + if let Err(e) = JobDeclaratorClient::new(jdc_config).start().await { + tracing::error!("Job Declarator Client failed to start: {e}"); + std::process::exit(1); + }; }