Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion integration-tests/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,11 @@ pub fn start_jdc(
);
let ret = jd_client_sv2::JobDeclaratorClient::new(jd_client_proxy);
let ret_clone = ret.clone();
tokio::spawn(async move { ret_clone.start().await });
tokio::spawn(async move {
if let Err(e) = ret_clone.start().await {
panic!("Integration test JDC failed to start: {e}");
}
});
(ret, jdc_address, monitoring_address)
}

Expand Down
12 changes: 11 additions & 1 deletion miner-apps/jd-client/src/lib/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,17 @@ pub struct Upstream;
#[derive(Debug)]
pub struct Downstream;

#[derive(Debug)]
pub struct JobDeclaratorClient;

#[derive(Debug)]
pub struct JDCError<Owner> {
pub kind: JDCErrorKind,
pub action: Action,
_owner: PhantomData<Owner>,
}

#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Action {
Log,
Disconnect(DownstreamId),
Expand All @@ -84,12 +87,14 @@ impl CanDisconnect for ChannelManager {}
impl CanFallback for Upstream {}
impl CanFallback for JobDeclarator {}
impl CanFallback for ChannelManager {}
impl CanFallback for JobDeclaratorClient {}

impl CanShutdown for ChannelManager {}
impl CanShutdown for TemplateProvider {}
impl CanShutdown for Downstream {}
impl CanShutdown for Upstream {}
impl CanShutdown for JobDeclarator {}
impl CanShutdown for JobDeclaratorClient {}

impl<O> JDCError<O> {
pub fn log<E: Into<JDCErrorKind>>(kind: E) -> Self {
Expand Down Expand Up @@ -256,6 +261,8 @@ pub enum JDCErrorKind {
InvalidKey,
/// Upstream not found
UpstreamNotFound,
/// JDC initialization error with details
InitializationError(String),
}

impl std::error::Error for JDCErrorKind {}
Expand Down Expand Up @@ -395,6 +402,9 @@ impl fmt::Display for JDCErrorKind {
CouldNotInitiateSystem => write!(f, "Could not initiate subsystem"),
InvalidKey => write!(f, "Invalid key used during noise handshake"),
UpstreamNotFound => write!(f, "Upstream not found"),
InitializationError(ref err) => {
write!(f, "Cannot initialize JD client: {err:?}")
}
}
}
}
Expand Down
62 changes: 41 additions & 21 deletions miner-apps/jd-client/src/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use tracing::{debug, error, info, warn};
use crate::{
channel_manager::ChannelManager,
config::JobDeclaratorClientConfig,
error::JDCErrorKind,
error::{Action, JDCError, JDCErrorKind, JDCResult},
jd_mode::JDMode,
job_declarator::JobDeclarator,
template_receiver::{
Expand Down Expand Up @@ -68,7 +68,7 @@ impl JobDeclaratorClient {
}

/// Starts the Job Declarator Client (JDC) main loop.
pub async fn start(&self) {
pub async fn start(&self) -> JDCResult<(), error::JobDeclaratorClient> {
info!(
"Job declarator client starting... setting up subsystems, User Identity: {}",
self.config.user_identity()
Expand All @@ -78,12 +78,17 @@ impl JobDeclaratorClient {
let mut encoded_outputs = vec![];
let mode = JDMode::new(self.config.mode);

if let Err(e) = miner_coinbase_outputs.consensus_encode(&mut encoded_outputs) {
error!(error = ?e, "Invalid coinbase output in config");
if miner_coinbase_outputs
.consensus_encode(&mut encoded_outputs)
.is_err()
{
self.cancellation_token.cancel();
self.shutdown_notify.notify_waiters();
self.is_alive.store(false, Ordering::Relaxed);
return;

return Err(JDCError::shutdown(JDCErrorKind::InitializationError(
"Invalid coinbase output in config".to_string(),
)));
}

let mut fallback_coordinator = FallbackCoordinator::new();
Expand Down Expand Up @@ -123,11 +128,11 @@ impl JobDeclaratorClient {
{
Ok(channel_manager) => channel_manager,
Err(e) => {
error!(error = ?e, "Failed to initialize channel manager");
self.cancellation_token.cancel();
self.shutdown_notify.notify_waiters();
self.is_alive.store(false, Ordering::Relaxed);
return;

return Err(JDCError::shutdown(e.kind));
}
};

Expand Down Expand Up @@ -207,11 +212,11 @@ impl JobDeclaratorClient {
{
Ok(template_receiver) => template_receiver,
Err(e) => {
error!(error = ?e, "Failed to initialize SV2 template receiver");
self.cancellation_token.cancel();
self.shutdown_notify.notify_waiters();
self.is_alive.store(false, Ordering::Relaxed);
return;

return Err(JDCError::shutdown(e.kind));
}
};

Expand Down Expand Up @@ -242,13 +247,11 @@ impl JobDeclaratorClient {
) {
Some(unix_socket_path) => unix_socket_path,
None => {
error!(
"Could not determine Bitcoin data directory. Please set data_dir in config."
);
self.cancellation_token.cancel();
self.shutdown_notify.notify_waiters();
self.is_alive.store(false, Ordering::Relaxed);
return;

return Err(JDCError::shutdown(JDCErrorKind::InitializationError("Could not determine Bitcoin data directory. Please set data_dir in config.".to_string())));
}
};

Expand Down Expand Up @@ -645,6 +648,8 @@ impl JobDeclaratorClient {
self.shutdown_notify.notify_waiters();
self.is_alive.store(false, Ordering::Relaxed);
info!("JD Client shutdown complete.");

Ok(())
}

pub async fn shutdown(&self) {
Expand All @@ -671,7 +676,7 @@ impl JobDeclaratorClient {
fallback_coordinator: FallbackCoordinator,
mode: JDMode,
task_manager: Arc<TaskManager>,
) -> Result<(Upstream, JobDeclarator), JDCErrorKind> {
) -> JDCResult<(Upstream, JobDeclarator), error::JobDeclaratorClient> {
const MAX_RETRIES: usize = 3;
let upstream_len = upstreams.len();
for (i, upstream_entry) in upstreams.iter_mut().enumerate() {
Expand All @@ -689,7 +694,9 @@ impl JobDeclaratorClient {
biased;
_ = cancellation_token.cancelled() => {
info!("Shutdown requested while waiting to initialize upstream, aborting retries");
return Err(JDCErrorKind::CouldNotInitiateSystem);
return Err(
JDCError::shutdown(JDCErrorKind::CouldNotInitiateSystem)
);
}
_ = tokio::time::sleep(Duration::from_secs(1)) => {}
}
Expand All @@ -706,7 +713,7 @@ impl JobDeclaratorClient {
info!(
"Shutdown requested before upstream connection attempt, aborting retries"
);
return Err(JDCErrorKind::CouldNotInitiateSystem);
return Err(JDCError::shutdown(JDCErrorKind::CouldNotInitiateSystem));
}

info!("Connection attempt {}/{}...", attempt, MAX_RETRIES);
Expand Down Expand Up @@ -736,11 +743,18 @@ impl JobDeclaratorClient {
biased;
_ = cancellation_token.cancelled() => {
info!("Shutdown requested after upstream initialization failure, aborting retries");
return Err(JDCErrorKind::CouldNotInitiateSystem);
return Err(
JDCError::shutdown(JDCErrorKind::CouldNotInitiateSystem)
);
}
_ = tokio::time::sleep(Duration::from_secs(1)) => {}
}

if e.action == Action::Shutdown {
info!("Encountered a shutdown error during upstream initialization, aborting retries");
return Err(e);
}

warn!(
"Attempt {}/{} failed for pool={}:{}, jds={}:{}: {:?}",
attempt,
Expand All @@ -767,7 +781,7 @@ impl JobDeclaratorClient {
}

tracing::error!("All upstreams failed after {} retries each", MAX_RETRIES);
Err(JDCErrorKind::CouldNotInitiateSystem)
Err(JDCError::shutdown(JDCErrorKind::CouldNotInitiateSystem))
}
}

Expand All @@ -785,7 +799,7 @@ async fn try_initialize_single(
mode: JDMode,
task_manager: Arc<TaskManager>,
config: &JobDeclaratorClientConfig,
) -> Result<(Upstream, JobDeclarator), JDCErrorKind> {
) -> JDCResult<(Upstream, JobDeclarator), error::JobDeclaratorClient> {
info!("Upstream connection in-progress at initialize single");
let upstream = Upstream::new(
upstream_entry,
Expand All @@ -797,7 +811,10 @@ async fn try_initialize_single(
config.required_extensions().to_vec(),
)
.await
.map_err(|error| error.kind)?;
.map_err(|error| match error.action {
Action::Shutdown => JDCError::shutdown(error.kind),
_ => JDCError::fallback(error.kind),
})?;

info!("Upstream connection done at initialize single");

Expand All @@ -811,7 +828,10 @@ async fn try_initialize_single(
task_manager.clone(),
)
.await
.map_err(|error| error.kind)?;
.map_err(|error| match error.action {
Action::Shutdown => JDCError::shutdown(error.kind),
_ => JDCError::fallback(error.kind),
})?;

Ok((upstream, job_declarator))
}
Expand Down
5 changes: 4 additions & 1 deletion miner-apps/jd-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,8 @@ async fn inner_main() {
});

init_logging(jdc_config.log_file());
JobDeclaratorClient::new(jdc_config).start().await;
if let Err(e) = JobDeclaratorClient::new(jdc_config).start().await {
tracing::error!("Job Declarator Client failed to start: {e}");
std::process::exit(1);
};
}