diff --git a/miner-apps/jd-client/src/lib/channel_manager/mod.rs b/miner-apps/jd-client/src/lib/channel_manager/mod.rs index 939e9731d..dbe7f1aa9 100644 --- a/miner-apps/jd-client/src/lib/channel_manager/mod.rs +++ b/miner-apps/jd-client/src/lib/channel_manager/mod.rs @@ -354,6 +354,16 @@ impl ChannelManager { Ok(channel_manager) } + /// Sets the negotiated extensions. + /// + /// This is used after upstream connection setup to store the extensions + /// that were successfully negotiated with the upstream server. + pub fn set_negotiated_extensions(&self, extensions: Vec) { + self.channel_manager_data.super_safe_lock(|data| { + data.negotiated_extensions = extensions; + }); + } + // Bootstraps a group channel with the given parameters. // Returns a `GroupChannel` if successful, otherwise returns `None`. // diff --git a/miner-apps/jd-client/src/lib/error.rs b/miner-apps/jd-client/src/lib/error.rs index cd245a3fa..2d77c3417 100644 --- a/miner-apps/jd-client/src/lib/error.rs +++ b/miner-apps/jd-client/src/lib/error.rs @@ -17,6 +17,7 @@ use std::{ marker::PhantomData, }; use stratum_apps::{ + extensions_negotiation::ExtensionNegotiationError, network_helpers, stratum_core::{ binary_sv2, bitcoin, @@ -224,9 +225,11 @@ pub enum JDCErrorKind { RequiredExtensionsNotSupported(Vec), /// Server requires extensions that the translator doesn't support ServerRequiresUnsupportedExtensions(Vec), - /// BitcoinCoreSv2TDP cancellation token activated - BitcoinCoreSv2TDPCancellationTokenActivated, - /// Failed to create BitcoinCoreSv2TDP tokio runtime + /// Extension negotiation timed out waiting for response + ExtensionNegotiationTimeout, + /// BitcoinCoreSv2 cancellation token activated + BitcoinCoreSv2CancellationTokenActivated, + /// Failed to create BitcoinCoreSv2 tokio runtime FailedToCreateBitcoinCoreTokioRuntime, /// Failed to send CoinbaseOutputConstraints message FailedToSendCoinbaseOutputConstraints, @@ -368,8 +371,11 @@ impl fmt::Display for JDCErrorKind { ServerRequiresUnsupportedExtensions(extensions) => { write!(f, "Server requires extensions that the translator doesn't support: {extensions:?}") } - BitcoinCoreSv2TDPCancellationTokenActivated => { - write!(f, "BitcoinCoreSv2TDP cancellation token activated") + ExtensionNegotiationTimeout => { + write!(f, "Extension negotiation timed out waiting for response") + } + BitcoinCoreSv2CancellationTokenActivated => { + write!(f, "BitcoinCoreSv2 cancellation token activated") } FailedToCreateBitcoinCoreTokioRuntime => { write!(f, "Failed to create BitcoinCoreSv2TDP tokio runtime") @@ -504,6 +510,20 @@ impl From for JDCErrorKind { } } +impl From for JDCErrorKind { + fn from(e: ExtensionNegotiationError) -> Self { + match e { + ExtensionNegotiationError::SendError => JDCErrorKind::ChannelErrorSender, + ExtensionNegotiationError::ReceiveError(_) => JDCErrorKind::UnexpectedMessage(0, 0), + ExtensionNegotiationError::Timeout => JDCErrorKind::ExtensionNegotiationTimeout, + ExtensionNegotiationError::UnexpectedMessage(ext, msg) => { + JDCErrorKind::UnexpectedMessage(ext, msg as u8) + } + ExtensionNegotiationError::HandlerError(_) => JDCErrorKind::UnexpectedMessage(0, 0), + } + } +} + impl HandlerErrorType for JDCError { fn parse_error(error: ParserError) -> Self { Self { diff --git a/miner-apps/jd-client/src/lib/mod.rs b/miner-apps/jd-client/src/lib/mod.rs index afe94a853..e2116c4ba 100644 --- a/miner-apps/jd-client/src/lib/mod.rs +++ b/miner-apps/jd-client/src/lib/mod.rs @@ -171,7 +171,7 @@ impl JobDeclaratorClient { }); } - let channel_manager_clone = channel_manager.clone(); + let mut channel_manager_clone = channel_manager.clone(); let mut bitcoin_core_sv2_join_handle: Option> = None; match self.config.template_provider_type().clone() { @@ -291,7 +291,8 @@ impl JobDeclaratorClient { .await { Ok((upstream, job_declarator)) => { - upstream + // Start upstream and wait for extension negotiation to complete. + let negotiated_extensions = upstream .start( self.config.min_supported_version(), self.config.max_supported_version(), @@ -299,9 +300,15 @@ impl JobDeclaratorClient { fallback_coordinator.clone(), status_sender.clone(), task_manager.clone(), + &mut channel_manager_clone, ) .await; + info!( + "Upstream extension negotiation complete. Negotiated extensions: {:?}", + negotiated_extensions + ); + job_declarator .start( self.cancellation_token.clone(), @@ -459,6 +466,7 @@ impl JobDeclaratorClient { fallback_coordinator.clone(), status_sender.clone(), task_manager.clone(), + &mut channel_manager, ) .await; diff --git a/miner-apps/jd-client/src/lib/template_receiver/bitcoin_core.rs b/miner-apps/jd-client/src/lib/template_receiver/bitcoin_core.rs index aa2171ccc..0116b4373 100644 --- a/miner-apps/jd-client/src/lib/template_receiver/bitcoin_core.rs +++ b/miner-apps/jd-client/src/lib/template_receiver/bitcoin_core.rs @@ -39,7 +39,7 @@ pub async fn connect_to_bitcoin_core( let status_sender = StatusSender::TemplateReceiver(status_sender_clone); handle_error( &status_sender, - JDCError::::shutdown(JDCErrorKind::BitcoinCoreSv2TDPCancellationTokenActivated), + JDCError::::shutdown(JDCErrorKind::BitcoinCoreSv2CancellationTokenActivated), ) .await; } diff --git a/miner-apps/jd-client/src/lib/upstream/mod.rs b/miner-apps/jd-client/src/lib/upstream/mod.rs index 34d3cb533..f25ca35d0 100644 --- a/miner-apps/jd-client/src/lib/upstream/mod.rs +++ b/miner-apps/jd-client/src/lib/upstream/mod.rs @@ -6,6 +6,7 @@ //! Responsibilities: //! - Establish a TCP + Noise encrypted connection to upstream //! - Perform `SetupConnection` handshake +//! - Negotiate extensions synchronously before returning //! - Forward SV2 mining messages between upstream and channel manager //! - Handle common messages from upstream @@ -15,12 +16,10 @@ use async_channel::{unbounded, Receiver, Sender}; use bitcoin_core_sv2::template_distribution_protocol::CancellationToken; use stratum_apps::{ custom_mutex::Mutex, + extensions_negotiation::negotiate_extensions, fallback_coordinator::FallbackCoordinator, network_helpers::{connect_with_noise, resolve_host}, - stratum_core::{ - binary_sv2::Seq064K, extensions_sv2::RequestExtensions, framing_sv2, - handlers_sv2::HandleCommonMessagesFromServerAsync, parsers_sv2::AnyMessage, - }, + stratum_core::{framing_sv2, handlers_sv2::HandleCommonMessagesFromServerAsync}, task_manager::TaskManager, utils::{ protocol_message_type::{protocol_message_type, MessageType}, @@ -31,6 +30,7 @@ use tokio::net::TcpStream; use tracing::{debug, error, info, warn}; use crate::{ + channel_manager::ChannelManager, error::{self, JDCError, JDCErrorKind, JDCResult}, io_task::spawn_io_tasks, status::{handle_error, Status, StatusSender}, @@ -151,11 +151,18 @@ impl Upstream { /// Perform `SetupConnection` handshake with upstream. /// /// Sends [`SetupConnection`] and awaits response. + /// If required extensions are configured, negotiates them synchronously + /// before returning. + /// + /// # Returns + /// * `Ok(Vec)` - The list of negotiated extensions (empty if none were requested) + /// * `Err(JDCError)` - Error during handshake or extension negotiation pub async fn setup_connection( &mut self, min_version: u16, max_version: u16, - ) -> JDCResult<(), error::Upstream> { + channel_manager: &mut ChannelManager, + ) -> JDCResult, error::Upstream> { info!("Upstream: initiating SV2 handshake..."); let setup_connection = get_setup_connection_message(min_version, max_version, &self.address) @@ -197,63 +204,49 @@ impl Upstream { .await?; // Send RequestExtensions after successful SetupConnection if there are required extensions + // and wait for the response before returning if !self.required_extensions.is_empty() { - self.send_request_extensions().await?; + let negotiated = self.negotiate_extensions(channel_manager).await?; + return Ok(negotiated); } - Ok(()) + Ok(vec![]) } - /// Send `RequestExtensions` message to upstream. - /// The supported extensions are stored for potential retry if the server requires additional - /// extensions. - async fn send_request_extensions(&mut self) -> JDCResult<(), error::Upstream> { - info!( - "Sending RequestExtensions to upstream with required extensions: {:?}", - self.required_extensions - ); - if self.required_extensions.is_empty() { - return Ok(()); - } - - let requested_extensions = - Seq064K::new(self.required_extensions.clone()).map_err(JDCError::shutdown)?; - - let request_extensions = RequestExtensions { - request_id: 0, - requested_extensions, - }; - - info!( - "Sending RequestExtensions to upstream with required extensions: {:?}", - self.required_extensions - ); - - let sv2_frame: Sv2Frame = AnyMessage::Extensions(request_extensions.into_static().into()) - .try_into() - .map_err(JDCError::shutdown)?; - - self.upstream_channel - .upstream_sender - .send(sv2_frame) - .await - .map_err(|e| { - error!(?e, "Failed to send RequestExtensions to upstream"); - JDCError::fallback(JDCErrorKind::ChannelErrorSender) - })?; - - info!("Sent RequestExtensions to upstream"); - Ok(()) + /// Sends RequestExtensions and waits for the response. + /// + /// Delegates to the shared [`stratum_apps::extensions_negotiation::negotiate_extensions`] function. + /// + /// # Returns + /// * `Ok(Vec)` - The list of successfully negotiated extensions + /// * `Err(JDCError)` - Extension negotiation failed + async fn negotiate_extensions( + &mut self, + channel_manager: &mut ChannelManager, + ) -> JDCResult, error::Upstream> { + negotiate_extensions( + self.required_extensions.clone(), + self.upstream_channel.upstream_sender.clone(), + self.upstream_channel.upstream_receiver.clone(), + self.upstream_channel.channel_manager_receiver.clone(), + channel_manager, + ) + .await + .map_err(|e| JDCError::fallback(JDCErrorKind::from(e))) } /// Start unified upstream loop. /// /// Responsibilities: - /// - Run `setup_connection` + /// - Run `setup_connection` (including extension negotiation) /// - Handle messages from upstream (pool) and channel manager /// - React to shutdown signals /// - /// This function spawns an async task and returns immediately. + /// This function spawns an async task and returns the negotiated extensions. + /// + /// # Returns + /// * `Vec` - The list of negotiated extensions (empty if none were requested or setup + /// failed) #[allow(clippy::too_many_arguments)] pub async fn start( mut self, @@ -263,13 +256,26 @@ impl Upstream { fallback_coordinator: FallbackCoordinator, status_sender: Sender, task_manager: Arc, - ) { + channel_manager: &mut ChannelManager, + ) -> Vec { let status_sender = StatusSender::Upstream(status_sender); - if let Err(e) = self.setup_connection(min_version, max_version).await { - error!(error = ?e, "Upstream: connection setup failed."); - return; - } + let negotiated_extensions = match self + .setup_connection(min_version, max_version, channel_manager) + .await + { + Ok(extensions) => { + info!( + "Upstream: extension negotiation complete. Extensions: {:?}", + extensions + ); + extensions + } + Err(e) => { + error!(error = ?e, "Upstream: connection setup failed."); + return vec![]; + } + }; task_manager.spawn(async move { // we just spawned a new task that's relevant to fallback coordination @@ -315,6 +321,8 @@ impl Upstream { // signal fallback coordinator that this task has completed its cleanup fallback_handler.done(); }); + + negotiated_extensions } // Handle incoming frames from upstream (pool). diff --git a/miner-apps/translator/src/lib/error.rs b/miner-apps/translator/src/lib/error.rs index 8854b2a44..e6a467ace 100644 --- a/miner-apps/translator/src/lib/error.rs +++ b/miner-apps/translator/src/lib/error.rs @@ -15,6 +15,7 @@ use std::{ sync::PoisonError, }; use stratum_apps::{ + extensions_negotiation::ExtensionNegotiationError, stratum_core::{ binary_sv2, channels_sv2::client::error::GroupChannelError, @@ -176,6 +177,8 @@ pub enum TproxyErrorKind { RequiredExtensionsNotSupported(Vec), /// Server requires extensions that the translator doesn't support ServerRequiresUnsupportedExtensions(Vec), + /// Extension negotiation timed out waiting for response + ExtensionNegotiationTimeout, /// Represents a generic channel send failure, described by a string. General(String), /// Error bubbling up from translator-core library @@ -256,6 +259,9 @@ impl fmt::Display for TproxyErrorKind { extensions ) } + ExtensionNegotiationTimeout => { + write!(f, "Extension negotiation timed out waiting for response") + } SV1Error => write!(f, "Sv1 error"), TranslatorCore(ref e) => write!(f, "Translator core error: {e:?}"), NetworkHelpersError(ref e) => write!(f, "Network helpers error: {e:?}"), @@ -396,6 +402,20 @@ impl HandlerErrorType for TproxyErrorKind { } } +impl From for TproxyErrorKind { + fn from(e: ExtensionNegotiationError) -> Self { + match e { + ExtensionNegotiationError::SendError => TproxyErrorKind::ChannelErrorSender, + ExtensionNegotiationError::ReceiveError(_) => TproxyErrorKind::UnexpectedMessage(0, 0), + ExtensionNegotiationError::Timeout => TproxyErrorKind::ExtensionNegotiationTimeout, + ExtensionNegotiationError::UnexpectedMessage(ext, msg) => { + TproxyErrorKind::UnexpectedMessage(ext, msg as u8) + } + ExtensionNegotiationError::HandlerError(_) => TproxyErrorKind::UnexpectedMessage(0, 0), + } + } +} + impl HandlerErrorType for TproxyError { fn parse_error(error: ParserError) -> Self { Self { diff --git a/miner-apps/translator/src/lib/mod.rs b/miner-apps/translator/src/lib/mod.rs index aa65bf50d..618436864 100644 --- a/miner-apps/translator/src/lib/mod.rs +++ b/miner-apps/translator/src/lib/mod.rs @@ -134,6 +134,20 @@ impl TranslatorSv2 { info!("Initializing upstream connection..."); + // Create ChannelManager BEFORE initialize_upstream so the HandleExtensionsFromServerAsync + // trait impl can store negotiated extensions in it during extension negotiation. + // The ChannelManager holds the CM-side channel ends; Upstream-side ends go to initialize_upstream. + let mut channel_manager_raw = ChannelManager::new( + channel_manager_to_upstream_sender, + upstream_to_channel_manager_receiver, + channel_manager_to_sv1_server_sender.clone(), + sv1_server_to_channel_manager_receiver, + status_sender.clone(), + self.config.supported_extensions.clone(), + self.config.required_extensions.clone(), + vec![], + ); + if let Err(e) = self .initialize_upstream( &mut upstream_addresses, @@ -145,6 +159,7 @@ impl TranslatorSv2 { task_manager.clone(), sv1_server.clone(), self.config.required_extensions.clone(), + &mut channel_manager_raw, ) .await { @@ -154,15 +169,8 @@ impl TranslatorSv2 { return; } - let mut channel_manager: Arc = Arc::new(ChannelManager::new( - channel_manager_to_upstream_sender, - upstream_to_channel_manager_receiver, - channel_manager_to_sv1_server_sender.clone(), - sv1_server_to_channel_manager_receiver, - status_sender.clone(), - self.config.supported_extensions.clone(), - self.config.required_extensions.clone(), - )); + // Wrap in Arc now that negotiation is complete and extensions are stored in channel_manager_raw + let mut channel_manager: Arc = Arc::new(channel_manager_raw); info!("Launching ChannelManager tasks..."); ChannelManager::run_channel_manager_tasks( @@ -285,7 +293,20 @@ impl TranslatorSv2 { self.config.clone(), )); - if let Err(e) = self.initialize_upstream( + // Create ChannelManager BEFORE initialize_upstream so the trait + // impl can store negotiated extensions in it during negotiation. + let mut channel_manager_raw = ChannelManager::new( + channel_manager_to_upstream_sender, + upstream_to_channel_manager_receiver, + channel_manager_to_sv1_server_sender, + sv1_server_to_channel_manager_receiver, + status_sender.clone(), + self.config.supported_extensions.clone(), + self.config.required_extensions.clone(), + vec![], + ); + + match self.initialize_upstream( &mut upstream_addresses, channel_manager_to_upstream_receiver, upstream_to_channel_manager_sender, @@ -295,22 +316,19 @@ impl TranslatorSv2 { task_manager.clone(), sv1_server.clone(), self.config.required_extensions.clone(), + &mut channel_manager_raw, ).await { - error!("Couldn't perform fallback, shutting system down: {e:?}"); - cancellation_token.cancel(); - break; + Ok(()) => { + info!("Upstream restarted successfully."); + channel_manager = Arc::new(channel_manager_raw); + } + Err(e) => { + error!("Couldn't perform fallback, shutting system down: {e:?}"); + cancellation_token.cancel(); + break; + } } - channel_manager = Arc::new(ChannelManager::new( - channel_manager_to_upstream_sender, - upstream_to_channel_manager_receiver, - channel_manager_to_sv1_server_sender, - sv1_server_to_channel_manager_receiver, - status_sender.clone(), - self.config.supported_extensions.clone(), - self.config.required_extensions.clone(), - )); - info!("Launching ChannelManager tasks..."); ChannelManager::run_channel_manager_tasks( channel_manager.clone(), @@ -424,6 +442,10 @@ impl TranslatorSv2 { /// `false` means "never tried", while `true` means "already connected or marked as /// malicious". Once an upstream is flagged we skip it on future loops /// to avoid hammering known-bad endpoints during failover. + /// + /// # Returns + /// * `Ok(())` - Upstream connected and extensions negotiated (stored in `channel_manager`) + /// * `Err(TproxyErrorKind)` - All upstreams failed #[allow(clippy::too_many_arguments)] pub async fn initialize_upstream( &self, @@ -436,11 +458,12 @@ impl TranslatorSv2 { task_manager: Arc, sv1_server_instance: Arc, required_extensions: Vec, + channel_manager: &mut ChannelManager, ) -> Result<(), TproxyErrorKind> { const MAX_RETRIES: usize = 3; let upstream_len = upstreams.len(); for (i, upstream_entry) in upstreams.iter_mut().enumerate() { - // Skip upstreams already marked as malicious. We’ve previously failed or + // Skip upstreams already marked as malicious. We've previously failed or // blacklisted them, so no need to warn or attempt reconnecting again. if upstream_entry.tried_or_flagged { debug!( @@ -469,11 +492,14 @@ impl TranslatorSv2 { status_sender.clone(), task_manager.clone(), required_extensions.clone(), + channel_manager, ) .await { Ok(()) => { - // starting sv1 server instance + info!("Extension negotiation complete."); + + // Now that extensions are negotiated, start the SV1 server if let Err(e) = sv1_server_instance .clone() .start( @@ -525,6 +551,7 @@ async fn try_initialize_upstream( status_sender: Sender, task_manager: Arc, required_extensions: Vec, + channel_manager: &mut ChannelManager, ) -> Result<(), TproxyErrorKind> { let upstream = Upstream::new( upstream_addr, @@ -543,9 +570,11 @@ async fn try_initialize_upstream( fallback_coordinator, status_sender, task_manager, + channel_manager, ) - .await?; - Ok(()) + .await + .map(|_| ()) + .map_err(|e| e.kind) } /// Defines the operational mode for Translator Proxy. diff --git a/miner-apps/translator/src/lib/sv1/sv1_server/downstream_message_handler.rs b/miner-apps/translator/src/lib/sv1/sv1_server/downstream_message_handler.rs index 4a47a265a..d87ed6d86 100644 --- a/miner-apps/translator/src/lib/sv1/sv1_server/downstream_message_handler.rs +++ b/miner-apps/translator/src/lib/sv1/sv1_server/downstream_message_handler.rs @@ -12,6 +12,33 @@ use crate::{ utils::{validate_sv1_share, AGGREGATED_CHANNEL_ID}, }; +/// Maximum length for user identity in bytes. +/// This is a protocol limit from the UserIdentity TLV in SV2 extensions. +const MAX_USER_IDENTITY_BYTES: usize = 32; + +/// Truncates a string to a maximum byte length, respecting UTF-8 character boundaries. +/// +/// If the input string exceeds `max_bytes`, it is truncated at the last valid +/// UTF-8 character boundary before or at `max_bytes`. +/// +/// # Arguments +/// * `s` - The input string to potentially truncate +/// * `max_bytes` - Maximum number of bytes allowed +/// +/// # Returns +/// A string slice that is at most `max_bytes` long +fn truncate_to_bytes(s: &str, max_bytes: usize) -> &str { + if s.len() <= max_bytes { + return s; + } + // Find the last valid UTF-8 char boundary at or before max_bytes + let mut end = max_bytes; + while end > 0 && !s.is_char_boundary(end) { + end -= 1; + } + &s[..end] +} + // Implements `IsServer` for `Sv1Server` to handle the Sv1 messages. #[hotpath::measure_all] impl IsServer<'static> for Sv1Server { @@ -192,6 +219,9 @@ impl IsServer<'static> for Sv1Server { } /// Authorizes a Downstream role. + /// + /// If the username exceeds 32 bytes (the protocol limit for UserIdentity TLV), + /// it will be truncated and a warning will be logged. fn authorize(&mut self, client_id: Option, name: &str) { let downstream_id = client_id.expect("Downstream id should exist"); let downstream = self @@ -200,11 +230,29 @@ impl IsServer<'static> for Sv1Server { .expect("Downstream should exist"); let is_authorized = self.is_authorized(client_id, name); + + // Truncate user_identity if it exceeds the protocol limit (32 bytes) + let user_identity = if name.len() > MAX_USER_IDENTITY_BYTES { + let truncated = truncate_to_bytes(name, MAX_USER_IDENTITY_BYTES); + warn!( + "Downstream {}: Username '{}' exceeds {} bytes ({} bytes), truncating to '{}'. \ + Consider using a shorter username for full visibility on the pool dashboard.", + downstream_id, + name, + MAX_USER_IDENTITY_BYTES, + name.len(), + truncated + ); + truncated.to_string() + } else { + name.to_string() + }; + downstream.downstream_data.super_safe_lock(|data| { if !is_authorized { data.authorized_worker_name = name.to_string(); } - data.user_identity = name.to_string(); + data.user_identity = user_identity.clone(); debug!( "Down: Set user_identity to '{}' for downstream {}", data.user_identity, downstream_id diff --git a/miner-apps/translator/src/lib/sv2/channel_manager/channel_manager.rs b/miner-apps/translator/src/lib/sv2/channel_manager/channel_manager.rs index 72e951aeb..85088dacb 100644 --- a/miner-apps/translator/src/lib/sv2/channel_manager/channel_manager.rs +++ b/miner-apps/translator/src/lib/sv2/channel_manager/channel_manager.rs @@ -108,6 +108,8 @@ impl ChannelManager { /// by server) /// * `required_extensions` - Extensions that the translator requires (must be supported by /// server) + /// * `negotiated_extensions` - Extensions that were successfully negotiated with the upstream + /// server during setup_connection /// /// # Returns /// A new ChannelManager instance ready to handle message routing @@ -120,6 +122,7 @@ impl ChannelManager { status_sender: Sender, supported_extensions: Vec, required_extensions: Vec, + negotiated_extensions: Vec, ) -> Self { let channel_state = ChannelState::new( upstream_sender, @@ -137,12 +140,22 @@ impl ChannelManager { extended_channels: Arc::new(DashMap::new()), group_channels: Arc::new(DashMap::new()), share_sequence_counters: Arc::new(DashMap::new()), - negotiated_extensions: Arc::new(Mutex::new(Vec::new())), + negotiated_extensions: Arc::new(Mutex::new(negotiated_extensions)), extranonce_factories: Arc::new(DashMap::new()), aggregated_channel_state: AtomicAggregatedState::new(AggregatedState::NoChannel), } } + /// Sets the negotiated extensions. + /// + /// This is used after upstream fallback to update the negotiated extensions + /// with the new upstream server. + pub fn set_negotiated_extensions(&self, extensions: Vec) { + self.negotiated_extensions.super_safe_lock(|data| { + *data = extensions; + }); + } + /// Spawns and runs the main channel manager task loop. /// /// This method creates an async task that handles all message routing for the @@ -807,6 +820,7 @@ mod tests { status_sender, vec![], vec![], + vec![], // negotiated_extensions ) } diff --git a/miner-apps/translator/src/lib/sv2/upstream/upstream.rs b/miner-apps/translator/src/lib/sv2/upstream/upstream.rs index b91ee9da4..32b5410bf 100644 --- a/miner-apps/translator/src/lib/sv2/upstream/upstream.rs +++ b/miner-apps/translator/src/lib/sv2/upstream/upstream.rs @@ -2,18 +2,17 @@ use crate::{ error::{self, TproxyError, TproxyErrorKind, TproxyResult}, io_task::spawn_io_tasks, status::{handle_error, Status, StatusSender}, - sv2::upstream::channel::UpstreamChannelState, + sv2::{channel_manager::ChannelManager, upstream::channel::UpstreamChannelState}, utils::UpstreamEntry, }; use async_channel::{unbounded, Receiver, Sender}; use std::{net::SocketAddr, sync::Arc}; use stratum_apps::{ + extensions_negotiation::negotiate_extensions, fallback_coordinator::FallbackCoordinator, network_helpers::{self, connect_with_noise, resolve_host}, stratum_core::{ - binary_sv2::Seq064K, common_messages_sv2::{Protocol, SetupConnection}, - extensions_sv2::RequestExtensions, handlers_sv2::HandleCommonMessagesFromServerAsync, parsers_sv2::{AnyMessage, Mining}, }, @@ -175,36 +174,48 @@ impl Upstream { /// /// This method: /// - Completes the SV2 handshake with the upstream server + /// - Negotiates extensions synchronously (waits for response) /// - Spawns the main message processing task /// - Handles graceful shutdown coordination /// /// The method will first attempt to complete the SV2 setup connection /// handshake. If successful, it spawns a task to handle bidirectional /// message flow between the channel manager and upstream server. + /// + /// # Returns + /// * `Ok(Vec)` - Successfully started, returns negotiated extensions + /// * `Err(TproxyError)` - Failed to start or negotiate extensions pub async fn start( mut self, cancellation_token: CancellationToken, fallback_coordinator: FallbackCoordinator, status_sender: Sender, task_manager: Arc, - ) -> TproxyResult<(), error::Upstream> { + channel_manager: &mut ChannelManager, + ) -> TproxyResult, error::Upstream> { let fallback_token: CancellationToken = fallback_coordinator.token(); + let negotiated_extensions; - // wait for connection setup or cancellation signal + // Wait for connection setup or cancellation signal tokio::select! { - result = self.setup_connection() => { - if let Err(e) = result { - error!("Upstream: failed to set up SV2 connection: {e:?}"); - return Err(e); + result = self.setup_connection(channel_manager) => { + match result { + Ok(extensions) => { + negotiated_extensions = extensions; + } + Err(e) => { + error!("Upstream: failed to set up SV2 connection: {e:?}"); + return Err(e); + } } } _ = cancellation_token.cancelled() => { info!("Upstream: shutdown signal received during connection setup."); - return Ok(()); + return Ok(vec![]); } _ = fallback_token.cancelled() => { info!("Upstream: fallback signal received during connection setup."); - return Ok(()); + return Ok(vec![]); } } @@ -218,7 +229,7 @@ impl Upstream { task_manager, )?; - Ok(()) + Ok(negotiated_extensions) } /// Performs the SV2 handshake setup with the upstream server. @@ -227,10 +238,19 @@ impl Upstream { /// - Creating and sending a SetupConnection message /// - Waiting for the handshake response /// - Validating and processing the response + /// - Sending RequestExtensions if required extensions are configured + /// - **Waiting for RequestExtensionsSuccess/Error response** before returning /// /// The handshake establishes the protocol version, capabilities, and /// other connection parameters needed for SV2 communication. - pub async fn setup_connection(&mut self) -> TproxyResult<(), error::Upstream> { + /// + /// # Returns + /// * `Ok(Vec)` - The list of negotiated extensions (empty if none were requested) + /// * `Err(TproxyError)` - Error during handshake or extension negotiation + pub async fn setup_connection( + &mut self, + channel_manager: &mut ChannelManager, + ) -> TproxyResult, error::Upstream> { debug!("Upstream: initiating SV2 handshake..."); // Build SetupConnection message let setup_conn_msg = Self::get_setup_connection_message(2, 2, &self.address, false) @@ -277,32 +297,35 @@ impl Upstream { debug!("Upstream: handshake completed successfully."); // Send RequestExtensions message if there are any required extensions + // and wait for the response before returning if !self.required_extensions.is_empty() { - let require_extensions = RequestExtensions { - request_id: 1, - requested_extensions: Seq064K::new(self.required_extensions.clone()).unwrap(), - }; - - let sv2_frame: Sv2Frame = - AnyMessage::Extensions(require_extensions.into_static().into()) - .try_into() - .map_err(TproxyError::shutdown)?; - - info!( - "Sending RequestExtensions message to upstream: {:?}", - sv2_frame - ); - - self.upstream_channel_state - .upstream_sender - .send(sv2_frame) - .await - .map_err(|e| { - error!("Failed to send message to upstream: {:?}", e); - TproxyError::fallback(TproxyErrorKind::ChannelErrorSender) - })?; + let negotiated = self.negotiate_extensions(channel_manager).await?; + return Ok(negotiated); } - Ok(()) + + Ok(vec![]) + } + + /// Sends RequestExtensions and waits for the response. + /// + /// Delegates to the shared [`stratum_apps::extensions_negotiation::negotiate_extensions`] function. + /// + /// # Returns + /// * `Ok(Vec)` - The list of successfully negotiated extensions + /// * `Err(TproxyError)` - Extension negotiation failed + async fn negotiate_extensions( + &mut self, + channel_manager: &mut ChannelManager, + ) -> TproxyResult, error::Upstream> { + negotiate_extensions( + self.required_extensions.clone(), + self.upstream_channel_state.upstream_sender.clone(), + self.upstream_channel_state.upstream_receiver.clone(), + self.upstream_channel_state.channel_manager_receiver.clone(), + channel_manager, + ) + .await + .map_err(|e| TproxyError::fallback(TproxyErrorKind::from(e))) } /// Processes incoming messages from the upstream SV2 server. diff --git a/stratum-apps/src/extensions_negotiation.rs b/stratum-apps/src/extensions_negotiation.rs new file mode 100644 index 000000000..996d0d76b --- /dev/null +++ b/stratum-apps/src/extensions_negotiation.rs @@ -0,0 +1,123 @@ +//! Extension negotiation utilities shared by JDC and Translator +//! +//! This module provides the shared logic for negotiating SV2 extensions with +//! an upstream server, handling the RequestExtensions/Response flow. + +use async_channel::{Receiver, Sender}; +use std::time::Duration; +use tracing::{error, info}; + +use stratum_core::{ + binary_sv2::Seq064K, codec_sv2::StandardSv2Frame, extensions_sv2::RequestExtensions, + handlers_sv2::HandleExtensionsFromServerAsync, parsers_sv2::AnyMessage, +}; + +use crate::utils::types::Message; + +const EXTENSION_NEGOTIATION_TIMEOUT_SECS: u64 = 30; + +#[derive(Debug)] +pub enum ExtensionNegotiationError { + SendError, + ReceiveError(async_channel::RecvError), + Timeout, + UnexpectedMessage(u16, u16), + HandlerError(String), +} + +impl std::fmt::Display for ExtensionNegotiationError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ExtensionNegotiationError::SendError => write!(f, "Failed to send RequestExtensions"), + ExtensionNegotiationError::ReceiveError(e) => { + write!(f, "Failed to receive extension response: {}", e) + } + ExtensionNegotiationError::Timeout => write!(f, "Extension negotiation timed out"), + ExtensionNegotiationError::UnexpectedMessage(ext, msg) => { + write!(f, "Unexpected message: ext={}, msg={}", ext, msg) + } + ExtensionNegotiationError::HandlerError(s) => { + write!(f, "Handler error: {}", s) + } + } + } +} + +impl std::error::Error for ExtensionNegotiationError {} + +pub async fn negotiate_extensions( + required_extensions: Vec, + upstream_sender: Sender>, + upstream_receiver: Receiver>, + channel_manager_receiver: Receiver>, + channel_manager: &mut CM, +) -> Result, ExtensionNegotiationError> +where + CM: HandleExtensionsFromServerAsync + Send, + E: std::fmt::Debug, +{ + let requested_extensions = Seq064K::new(required_extensions.clone()).map_err(|e| { + ExtensionNegotiationError::HandlerError(format!("Failed to create Seq064K: {:?}", e)) + })?; + + let request_extensions = RequestExtensions { + request_id: 0, + requested_extensions, + }; + + let sv2_frame: StandardSv2Frame = + AnyMessage::Extensions(request_extensions.into_static().into()) + .try_into() + .map_err(|e| { + ExtensionNegotiationError::HandlerError(format!("Failed to frame: {:?}", e)) + })?; + + info!( + "Sending RequestExtensions to upstream with required extensions: {:?}", + required_extensions + ); + + upstream_sender + .send(sv2_frame) + .await + .map_err(|_| ExtensionNegotiationError::SendError)?; + + loop { + let mut response = tokio::time::timeout( + Duration::from_secs(EXTENSION_NEGOTIATION_TIMEOUT_SECS), + upstream_receiver.recv(), + ) + .await + .map_err(|_| { + error!( + "Extension negotiation timed out after {} seconds", + EXTENSION_NEGOTIATION_TIMEOUT_SECS + ); + ExtensionNegotiationError::Timeout + })? + .map_err(ExtensionNegotiationError::ReceiveError)?; + + let header = response.get_header().ok_or_else(|| { + error!("Extension response frame missing header"); + ExtensionNegotiationError::UnexpectedMessage(0, 0) + })?; + + channel_manager + .handle_extensions_message_frame_from_server(None, header, response.payload()) + .await + .map_err(|e| ExtensionNegotiationError::HandlerError(format!("{:?}", e)))?; + + if let Ok(retry_frame) = channel_manager_receiver.try_recv() { + info!("Forwarding retry RequestExtensions to upstream pool..."); + upstream_sender + .send(retry_frame) + .await + .map_err(|_| ExtensionNegotiationError::SendError)?; + continue; + } + + return channel_manager + .get_negotiated_extensions_with_server(None) + .map_err(|e| ExtensionNegotiationError::HandlerError(format!("{:?}", e))); + } +} diff --git a/stratum-apps/src/lib.rs b/stratum-apps/src/lib.rs index 308a9e917..a6bc84c0d 100644 --- a/stratum-apps/src/lib.rs +++ b/stratum-apps/src/lib.rs @@ -78,3 +78,7 @@ pub mod coinbase_output_constraints; /// Fallback coordinator pub mod fallback_coordinator; + +/// Extension negotiation utilities shared by JDC and Translator +pub mod extensions_negotiation; +pub use extensions_negotiation::{negotiate_extensions, ExtensionNegotiationError};