Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 140 additions & 0 deletions integration-tests/tests/translator_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1613,3 +1613,143 @@ async fn translator_does_not_shutdown_on_missing_downstream_channel() {

assert!(TcpListener::bind(tproxy_addr).await.is_err());
}

/// This test verifies that the translator correctly handles miners with short worker names
/// that fit within the 32-byte UserIdentity TLV limit.
///
/// The user_identity field in TLV only contains the worker suffix (part after the '.')
/// from the mining.authorize message. With "shortuser.worker1", the TLV will contain "worker1".
#[tokio::test]
async fn translator_handles_worker_name_within_limit() {
start_tracing();
let (_tp, tp_addr) = start_template_provider(None, DifficultyLevel::Low);
let (_pool, pool_addr) = start_pool(sv2_tp_config(tp_addr), vec![], vec![]).await;
let (pool_translator_sniffer, pool_translator_sniffer_addr) =
start_sniffer("0", pool_addr, false, vec![], None);

// Start translator in non-aggregated mode (needed for user identity TLV)
let (_, tproxy_addr) =
start_sv2_translator(&[pool_translator_sniffer_addr], false, vec![], vec![], None).await;

// Use a normal username.worker format - worker suffix "worker1" is 7 bytes, well under 32
let worker_name = Some("shortuser.worker1".to_string());

let (_minerd_process, _minerd_addr) =
start_minerd(tproxy_addr, worker_name, Some("x".to_string()), false).await;

// Verify the translator can successfully:
// 1. Complete setup connection
pool_translator_sniffer
.wait_for_message_type(MessageDirection::ToUpstream, MESSAGE_TYPE_SETUP_CONNECTION)
.await;
pool_translator_sniffer
.wait_for_message_type(
MessageDirection::ToDownstream,
MESSAGE_TYPE_SETUP_CONNECTION_SUCCESS,
)
.await;

// 2. Open a mining channel
pool_translator_sniffer
.wait_for_message_type(
MessageDirection::ToUpstream,
MESSAGE_TYPE_OPEN_EXTENDED_MINING_CHANNEL,
)
.await;
pool_translator_sniffer
.wait_for_message_type(
MessageDirection::ToDownstream,
MESSAGE_TYPE_OPEN_EXTENDED_MINING_CHANNEL_SUCCESS,
)
.await;

// 3. Receive a job
pool_translator_sniffer
.wait_for_message_type(
MessageDirection::ToDownstream,
MESSAGE_TYPE_NEW_EXTENDED_MINING_JOB,
)
.await;

// 4. Submit shares successfully
pool_translator_sniffer
.wait_for_message_type(
MessageDirection::ToUpstream,
MESSAGE_TYPE_SUBMIT_SHARES_EXTENDED,
)
.await;
}

/// This test verifies that the translator disconnects miners whose worker suffix
/// (part after '.') exceeds the 32-byte UserIdentity TLV limit.
///
/// The user_identity TLV field has a 32-byte limit. When the worker suffix extracted
/// from mining.authorize exceeds this limit, the translator should disconnect the
/// client at share submission time (when the TLV would be created).
#[tokio::test]
async fn translator_disconnects_on_worker_suffix_exceeding_32_bytes() {
start_tracing();
let (_tp, tp_addr) = start_template_provider(None, DifficultyLevel::Low);
let (_pool, pool_addr) = start_pool(sv2_tp_config(tp_addr), vec![], vec![]).await;
let (pool_translator_sniffer, pool_translator_sniffer_addr) =
start_sniffer("0", pool_addr, false, vec![], None);

// Start translator in non-aggregated mode (needed for user identity TLV)
let (_, tproxy_addr) =
start_sv2_translator(&[pool_translator_sniffer_addr], false, vec![], vec![], None).await;

// Use a short username with a very long worker suffix (42 chars, exceeds 32 byte limit)
// The worker suffix "very_long_worker_name_that_exceeds_limit" is what goes into the TLV
let long_worker_suffix = Some("shortuser.very_long_worker_name_that_exceeds_limit".to_string());

let (_minerd_process, _minerd_addr) = start_minerd(
tproxy_addr,
long_worker_suffix,
Some("x".to_string()),
false,
)
.await;

// The miner should be able to complete the handshake (authorize always succeeds)
pool_translator_sniffer
.wait_for_message_type(MessageDirection::ToUpstream, MESSAGE_TYPE_SETUP_CONNECTION)
.await;
pool_translator_sniffer
.wait_for_message_type(
MessageDirection::ToDownstream,
MESSAGE_TYPE_SETUP_CONNECTION_SUCCESS,
)
.await;

pool_translator_sniffer
.wait_for_message_type(
MessageDirection::ToUpstream,
MESSAGE_TYPE_OPEN_EXTENDED_MINING_CHANNEL,
)
.await;
pool_translator_sniffer
.wait_for_message_type(
MessageDirection::ToDownstream,
MESSAGE_TYPE_OPEN_EXTENDED_MINING_CHANNEL_SUCCESS,
)
.await;

pool_translator_sniffer
.wait_for_message_type(
MessageDirection::ToDownstream,
MESSAGE_TYPE_NEW_EXTENDED_MINING_JOB,
)
.await;

// The miner will attempt to submit shares, but the translator should disconnect
// because the worker suffix exceeds 32 bytes and can't be encoded in the TLV.
// We wait a bit and verify NO share submission message arrives at the pool.
tokio::time::sleep(Duration::from_secs(5)).await;

pool_translator_sniffer
.assert_message_not_present(
MessageDirection::ToUpstream,
MESSAGE_TYPE_SUBMIT_SHARES_EXTENDED,
)
.await;
}
7 changes: 5 additions & 2 deletions miner-apps/translator/src/lib/sv1/downstream/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ pub struct DownstreamData {
pub version_rolling_mask: Option<HexU32Be>,
pub version_rolling_min_bit: Option<HexU32Be>,
pub last_job_version_field: Option<u32>,
pub authorized_worker_name: String,
/// The raw string received from SV1 mining.authorize (e.g., "username.worker1")
pub worker_name_from_authorize: String,
/// The worker suffix extracted from mining.authorize for TLV extension (e.g., "worker1" from "username.worker1")
/// If no '.' delimiter exists, contains the full authorize name.
pub user_identity: String,
pub target: Target,
pub hashrate: Option<Hashrate>,
Expand Down Expand Up @@ -56,7 +59,7 @@ impl DownstreamData {
version_rolling_mask: None,
version_rolling_min_bit: None,
last_job_version_field: None,
authorized_worker_name: String::new(),
worker_name_from_authorize: String::new(),
user_identity: String::new(),
target,
hashrate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ use stratum_apps::stratum_core::sv1_api::{
utils::{Extranonce, HexU32Be},
IsServer,
};
use tracing::{debug, info, warn};
use tracing::{debug, error, info, warn};

use crate::{
error,
sv1::{downstream::SubmitShareWithChannelId, Sv1Server},
utils::validate_sv1_share,
};
Expand Down Expand Up @@ -88,6 +87,10 @@ impl IsServer<'static> for Sv1Server {
let downstream_id = client_id.expect("Downstream id should exist");
info!("Received mining.authorize from Sv1 downstream {downstream_id}");
debug!("Down: Handling mining.authorize: {:?}", request);

// Always accept authorization - the username portion from SV1 is not used upstream.
// The worker suffix (part after '.') will be used for TLV extension if negotiated,
// and length validation happens at share submission time.
true
}

Expand Down Expand Up @@ -188,10 +191,14 @@ impl IsServer<'static> for Sv1Server {
.expect("Downstream should exist");
downstream
.downstream_data
.super_safe_lock(|data| data.authorized_worker_name == *name)
.super_safe_lock(|data| data.worker_name_from_authorize == *name)
}

/// Authorizes a Downstream role.
///
/// Stores the full authorize name in `worker_name_from_authorize` and extracts the
/// worker suffix (part after the first '.') into `user_identity` for TLV extension use.
/// If no '.' delimiter exists, the full name is used as the user_identity.
fn authorize(&mut self, client_id: Option<usize>, name: &str) {
let downstream_id = client_id.expect("Downstream id should exist");
let downstream = self
Expand All @@ -200,14 +207,22 @@ impl IsServer<'static> for Sv1Server {
.expect("Downstream should exist");

let is_authorized = self.is_authorized(client_id, name);

// Extract worker suffix: part after first '.', or full name if no '.'
let user_identity = if let Some(dot_pos) = name.find('.') {
name[dot_pos + 1..].to_string()
} else {
name.to_string()
};

downstream.downstream_data.super_safe_lock(|data| {
if !is_authorized {
data.authorized_worker_name = name.to_string();
data.worker_name_from_authorize = name.to_string();
}
data.user_identity = name.to_string();
data.user_identity = user_identity.clone();
debug!(
"Down: Set user_identity to '{}' for downstream {}",
data.user_identity, downstream_id
"Down: Set user_identity to '{}' (from '{}') for downstream {}",
data.user_identity, name, downstream_id
);
});
}
Expand Down
32 changes: 19 additions & 13 deletions miner-apps/translator/src/lib/sv1/sv1_server/sv1_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,18 +453,28 @@ impl Sv1Server {
.map_err(|_| TproxyError::shutdown(TproxyErrorKind::SV1Error))?;

// Only add TLV fields with user identity in non-aggregated mode
// user_identity contains the worker suffix from mining.authorize (e.g., "worker1" from "user.worker1")
let tlv_fields = if !self.config.aggregate_channels {
let user_identity_string = self
.downstreams
.get(&message.downstream_id)
.unwrap()
.downstream_data
.super_safe_lock(|d| d.user_identity.clone());
UserIdentity::new(&user_identity_string)
.unwrap()
.to_tlv()
.ok()
.map(|tlv| vec![tlv])
match UserIdentity::new(&user_identity_string) {
Ok(ui) => ui.to_tlv().ok().map(|tlv| vec![tlv]),
Err(e) => {
// Worker suffix exceeds 32 byte limit - disconnect the client
error!(
"User identity '{}' exceeds 32 byte limit for TLV: {} - disconnecting downstream {}",
user_identity_string, e, message.downstream_id
);
return Err(TproxyError::disconnect(
TproxyErrorKind::SV1Error,
message.downstream_id,
));
}
}
} else {
None
};
Expand Down Expand Up @@ -720,7 +730,7 @@ impl Sv1Server {
downstream_id: DownstreamId,
) -> TproxyResult<(), error::Sv1Server> {
let config = &self.config.downstream_difficulty_config;
let downstream = self.downstreams.get(&downstream_id).unwrap();
let _downstream = self.downstreams.get(&downstream_id).unwrap();

let hashrate = config.min_individual_miner_hashrate as f64;
let shares_per_min = config.shares_per_minute as f64;
Expand All @@ -742,17 +752,13 @@ impl Sv1Server {
}
});

// Build channel identity from config (not from downstream's user_identity which is for TLV)
let miner_id = self.miner_counter.fetch_add(1, Ordering::SeqCst) + 1;
let user_identity = format!("{}.miner{}", self.config.user_identity, miner_id);

downstream
.downstream_data
.safe_lock(|d| d.user_identity = user_identity.clone())
.map_err(TproxyError::shutdown)?;
let channel_identity = format!("{}.miner{}", self.config.user_identity, miner_id);

if let Ok(open_channel_msg) = build_sv2_open_extended_mining_channel(
request_id,
user_identity.clone(),
channel_identity,
hashrate as Hashrate,
max_target,
min_extranonce_size,
Expand Down
2 changes: 1 addition & 1 deletion miner-apps/translator/src/lib/sv1_monitoring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ fn downstream_to_sv1_client_info(downstream: &Downstream) -> Option<Sv1ClientInf
.safe_lock(|dd| Sv1ClientInfo {
client_id: dd.downstream_id,
channel_id: dd.channel_id,
authorized_worker_name: dd.authorized_worker_name.clone(),
authorized_worker_name: dd.worker_name_from_authorize.clone(),
user_identity: dd.user_identity.clone(),
target_hex: hex::encode(dd.target.to_be_bytes()),
hashrate: dd.hashrate,
Expand Down
Loading