diff --git a/.gitignore b/.gitignore index 3b5ac06cb..e469a0747 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,8 @@ aider_stdlib_map.md +# Local tools and notes +.local-tools/ + ### Rust ### # Generated by Cargo # will have compiled files and executables diff --git a/apps/freenet-ping/Cargo.lock b/apps/freenet-ping/Cargo.lock index 711f612a5..d4b1fcdf7 100644 --- a/apps/freenet-ping/Cargo.lock +++ b/apps/freenet-ping/Cargo.lock @@ -1247,7 +1247,7 @@ dependencies = [ [[package]] name = "freenet" -version = "0.1.5" +version = "0.1.6" dependencies = [ "aes-gcm", "ahash", @@ -1371,9 +1371,9 @@ dependencies = [ [[package]] name = "freenet-stdlib" -version = "0.1.6" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ea1db7c5c1820d7583d21bafeead27e87f9d0d3590dcd7291f5e73e01202634" +checksum = "567c4a988271b76a6cb22d3324f4588ae65e07627d07f4b3b56fdffd0d489b95" dependencies = [ "bincode", "blake3", diff --git a/apps/freenet-ping/test-prod-logs/config/gateways.toml b/apps/freenet-ping/test-prod-logs/config/gateways.toml new file mode 100644 index 000000000..4a563d0e9 --- /dev/null +++ b/apps/freenet-ping/test-prod-logs/config/gateways.toml @@ -0,0 +1,11 @@ +[[gateways]] +public_key = "/home/ian/code/freenet/freenet-core/main/apps/freenet-ping/test-prod-logs/data/secrets/public.vega.gw.pem" + +[gateways.address] +hostname = "vega.locut.us:31337" + +[[gateways]] +public_key = "/home/ian/code/freenet/freenet-core/main/apps/freenet-ping/test-prod-logs/data/secrets/public.ziggy.gw.pem" + +[gateways.address] +hostname = "technic.locut.us:31337" diff --git a/apps/freenet-ping/test-prod-logs/data/_EVENT_LOG b/apps/freenet-ping/test-prod-logs/data/_EVENT_LOG new file mode 100644 index 000000000..d99752950 Binary files /dev/null and b/apps/freenet-ping/test-prod-logs/data/_EVENT_LOG differ diff --git a/apps/freenet-ping/test-prod-logs/data/_EVENT_LOG_LOCAL b/apps/freenet-ping/test-prod-logs/data/_EVENT_LOG_LOCAL new file mode 100644 index 000000000..e69de29bb diff --git a/apps/freenet-ping/test-prod-logs/data/contracts/KEY_DATA b/apps/freenet-ping/test-prod-logs/data/contracts/KEY_DATA new file mode 100644 index 000000000..cdeb3c7d9 Binary files /dev/null and b/apps/freenet-ping/test-prod-logs/data/contracts/KEY_DATA differ diff --git a/apps/freenet-ping/test-prod-logs/data/contracts/KEY_DATA.tmp b/apps/freenet-ping/test-prod-logs/data/contracts/KEY_DATA.tmp new file mode 100644 index 000000000..e69de29bb diff --git a/apps/freenet-ping/test-prod-logs/data/db/db b/apps/freenet-ping/test-prod-logs/data/db/db new file mode 100644 index 000000000..7a38f4136 Binary files /dev/null and b/apps/freenet-ping/test-prod-logs/data/db/db differ diff --git a/apps/freenet-ping/test-prod-logs/data/delegates/KEY_DATA b/apps/freenet-ping/test-prod-logs/data/delegates/KEY_DATA new file mode 100644 index 000000000..e69de29bb diff --git a/apps/freenet-ping/test-prod-logs/data/delegates/KEY_DATA.tmp b/apps/freenet-ping/test-prod-logs/data/delegates/KEY_DATA.tmp new file mode 100644 index 000000000..e69de29bb diff --git a/apps/freenet-ping/test-prod-logs/data/secrets/KEY_DATA b/apps/freenet-ping/test-prod-logs/data/secrets/KEY_DATA new file mode 100644 index 000000000..e69de29bb diff --git a/apps/freenet-ping/test-prod-logs/data/secrets/KEY_DATA.tmp b/apps/freenet-ping/test-prod-logs/data/secrets/KEY_DATA.tmp new file mode 100644 index 000000000..e69de29bb diff --git a/apps/freenet-ping/test-prod-logs/data/secrets/public.vega.gw.pem b/apps/freenet-ping/test-prod-logs/data/secrets/public.vega.gw.pem new file mode 100644 index 000000000..b2318426d --- /dev/null +++ b/apps/freenet-ping/test-prod-logs/data/secrets/public.vega.gw.pem @@ -0,0 +1,14 @@ +-----BEGIN PUBLIC KEY----- +MIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEA3ZECtEbAcxfvnpweDHiV +OtHvHtyYE5omuNWn3AHyNHPyFMRWd/LAF4l7Q0WunjL4uAXUws1U2OBroW+Rw91C +753kUtecaCelU2f4dLOfz7hIcbFL+A9w4Qzwc3bLUqTKmfASmqVX9qXP1SE4ZAKQ +VxgS/9dVNBEo6wvLkrCNvq2LVQvWslEJb+0QKKbUPrFmYVnweiNGCmP6dqL37u5d +C1jsuE72TaJ7FSj+hWFja3gl6o+Wz/Bw+9qbByUOchD7LtQ//9zl4Xb9lHjyUJ4V ++Tmhg2EiKy3EFEUWILB391cLR3YSzS6B4kw/PptZ/L/TSWUsZH33QiGnAw56jNBW +cFw3TIlQu3PvZWcttyD2dDO6tqZDusI2YHh8hyAb9Z5N8px1jlfHPpK6qHEqhB9N +XjrkD2lTuRbB55UurZ5L2ajvysIjqOOtYDPcLRZHk8YjvxrYmMIk17GKE/6YI01h ++QcqcYxcBCBv0ME9WL3ohKvWTziHVNfrcWYebISHhpc4F/CHbaj2O4bs4I9a9aXV +INlmFHVprCl7UYI7lb4BHWu0tYxsy3oZeLG7ARWffRsz3L2aRHbtqgthSu6X7yYL +YB/qWjUmIE0cAszAULCjbynplZNtwJT324+77DPD85+N9nTrS+Po8T2xJ8pCweu+ +5LUJQ0Jh+QPJeIQ2TzA7RSUCAwEAAQ== +-----END PUBLIC KEY----- \ No newline at end of file diff --git a/apps/freenet-ping/test-prod-logs/data/secrets/public.ziggy.gw.pem b/apps/freenet-ping/test-prod-logs/data/secrets/public.ziggy.gw.pem new file mode 100644 index 000000000..0470db69d --- /dev/null +++ b/apps/freenet-ping/test-prod-logs/data/secrets/public.ziggy.gw.pem @@ -0,0 +1,14 @@ +-----BEGIN PUBLIC KEY----- +MIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEArpQ/xgxqczEpsNaKtBdh +MmhiocOjfNimI6rD5cTYfMAsahKS0ElY6leXzNBgsZYLGxYPKqkrODPxpttejD9v +fdcOaN1QNW+Rl85BSf5DlBxeBdC2NTMeadC207RZOdD12pdEdIz1W1nS/PGlDqRq +Kh2LPSmuT9Kqxzk1PMO7HuIgMvQVpLBJiFnQMWJ4+meHINtI8vOkbYjLqoE2yrSK +GSapQ/zptsjzzB4Jd0Rm/OV5qT7SF14yhv+XVhPy2gw/lJmhcIDw2rCEBe7hPbDX +E+p9jCgGMi0HWN8QeSDTapY9hlFy4dzXW5xK0ySrGUAp1kcPDdwqA2npa3oJlVDZ +TstAMDt3RhdfrrwfmdQefvygBUIAlAk69y1FvNGEOO9TcikQjDaJdfQSP9V0qTEV +BsKNpkX7VHgG1i8Cc6c7JlDyeHSB+3TXSfO9esYcAFxjwx1SbyInHBMpYG/iYoX+ +jGqQLJVMV6EasQ5xgryW2jTD4pBjkcrlgz0KKUlCg0cMTafaOpTrnaTHjmWYIP2c +KKxVR2Iu+cPdNPvFeywr2mF2Bz7I4cRxJOVhwupDH7XaWf8gfMMvIlsr2f2ChQzc +gRCVduKEeMp1bcuKisSbJnFIluVOgJ3MZJi7RS+0W31dkePWtJYP0eXgdlHtDVV6 +scCTSgvaL0QFtu7+5ds0qqkCAwEAAQ== +-----END PUBLIC KEY----- \ No newline at end of file diff --git a/crates/core/src/transport/connection_handler.rs b/crates/core/src/transport/connection_handler.rs index b7bb90c90..a5e8dbbce 100644 --- a/crates/core/src/transport/connection_handler.rs +++ b/crates/core/src/transport/connection_handler.rs @@ -646,6 +646,7 @@ impl UdpPacketsListener { inbound_symmetric_key: inbound_key, inbound_symmetric_key_bytes: inbound_key_bytes, my_address: None, + transport_secret_key: secret, }; let inbound_conn = InboundRemoteConnection { @@ -842,6 +843,8 @@ impl UdpPacketsListener { inbound_symmetric_key_bytes: inbound_sym_key_bytes, my_address: Some(my_address), + transport_secret_key: transport_secret_key + .clone(), }, InboundRemoteConnection { inbound_packet_sender: inbound_sender, @@ -907,6 +910,7 @@ impl UdpPacketsListener { inbound_symmetric_key: inbound_sym_key, inbound_symmetric_key_bytes: inbound_sym_key_bytes, my_address: None, + transport_secret_key: transport_secret_key.clone(), }, InboundRemoteConnection { inbound_packet_sender: inbound_sender, diff --git a/crates/core/src/transport/peer_connection.rs b/crates/core/src/transport/peer_connection.rs index 4c7127ad5..2a43c062c 100644 --- a/crates/core/src/transport/peer_connection.rs +++ b/crates/core/src/transport/peer_connection.rs @@ -5,6 +5,7 @@ use std::time::Duration; use std::{collections::HashMap, time::Instant}; use crate::transport::connection_handler::NAT_TRAVERSAL_MAX_ATTEMPTS; +use crate::transport::crypto::TransportSecretKey; use crate::transport::packet_data::UnknownEncryption; use crate::transport::sent_packet_tracker::MESSAGE_CONFIRMATION_TIMEOUT; use aes_gcm::Aes128Gcm; @@ -47,6 +48,7 @@ pub(crate) struct RemoteConnection { pub(super) inbound_symmetric_key: Aes128Gcm, pub(super) inbound_symmetric_key_bytes: [u8; 16], pub(super) my_address: Option, + pub(super) transport_secret_key: TransportSecretKey, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] @@ -144,9 +146,11 @@ impl PeerConnection { outbound_symmetric_key: Aes128Gcm, inbound_symmetric_key: Aes128Gcm, ) -> PeerConnectionMock { + use crate::transport::crypto::TransportKeypair; use parking_lot::Mutex; let (outbound_packets, outbound_packets_recv) = mpsc::channel(1); let (inbound_packet_sender, inbound_packet_recv) = mpsc::channel(1); + let keypair = TransportKeypair::new(); let remote = RemoteConnection { outbound_packets, outbound_symmetric_key, @@ -157,6 +161,7 @@ impl PeerConnection { inbound_symmetric_key, inbound_symmetric_key_bytes: [1; 16], my_address: Some(my_address), + transport_secret_key: keypair.secret, }; ( Self::new(remote), @@ -172,9 +177,11 @@ impl PeerConnection { outbound_symmetric_key: Aes128Gcm, inbound_symmetric_key: Aes128Gcm, ) -> RemoteConnectionMock { + use crate::transport::crypto::TransportKeypair; use parking_lot::Mutex; let (outbound_packets, outbound_packets_recv) = mpsc::channel(1); let (inbound_packet_sender, inbound_packet_recv) = mpsc::channel(1); + let keypair = TransportKeypair::new(); ( RemoteConnection { outbound_packets, @@ -186,6 +193,7 @@ impl PeerConnection { inbound_symmetric_key, inbound_symmetric_key_bytes: [1; 16], my_address: Some(my_address), + transport_secret_key: keypair.secret, }, inbound_packet_sender, outbound_packets_recv, @@ -215,14 +223,8 @@ impl PeerConnection { // listen for incoming messages or receipts or wait until is time to do anything else again let mut resend_check = Some(tokio::time::sleep(tokio::time::Duration::from_millis(10))); - #[cfg(debug_assertions)] - const KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(2); - #[cfg(not(debug_assertions))] - const KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(20); - #[cfg(debug_assertions)] - const KILL_CONNECTION_AFTER: Duration = Duration::from_secs(6); - #[cfg(not(debug_assertions))] - const KILL_CONNECTION_AFTER: Duration = Duration::from_secs(60); + const KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(10); + const KILL_CONNECTION_AFTER: Duration = Duration::from_secs(30); let mut keep_alive = tokio::time::interval(KEEP_ALIVE_INTERVAL); keep_alive.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); @@ -236,14 +238,85 @@ impl PeerConnection { inbound = self.remote_conn.inbound_packet_recv.recv() => { let packet_data = inbound.ok_or(TransportError::ConnectionClosed(self.remote_addr()))?; last_received = std::time::Instant::now(); + + // Debug logging for 256-byte packets + if packet_data.data().len() == 256 { + tracing::warn!( + remote = ?self.remote_conn.remote_addr, + packet_bytes = ?&packet_data.data()[..32], // First 32 bytes + packet_len = packet_data.data().len(), + "Received 256-byte packet" + ); + } + let Ok(decrypted) = packet_data.try_decrypt_sym(&self.remote_conn.inbound_symmetric_key).inspect_err(|error| { tracing::warn!( %error, remote = ?self.remote_conn.remote_addr, inbound_key = ?self.remote_conn.inbound_symmetric_key_bytes, + packet_len = packet_data.data().len(), + packet_first_bytes = ?&packet_data.data()[..std::cmp::min(32, packet_data.data().len())], "Failed to decrypt packet, might be an intro packet or a partial packet" ); }) else { + // Check if this is a 256-byte RSA intro packet + if packet_data.data().len() == 256 { + tracing::info!( + remote = ?self.remote_conn.remote_addr, + "Attempting to decrypt potential RSA intro packet" + ); + + // Try to decrypt as RSA intro packet + match self.remote_conn.transport_secret_key.decrypt(packet_data.data()) { + Ok(_decrypted_intro) => { + tracing::info!( + remote = ?self.remote_conn.remote_addr, + "Successfully decrypted RSA intro packet, sending ACK" + ); + + // Send ACK response for intro packet + let ack_packet = SymmetricMessage::ack_ok( + &self.remote_conn.outbound_symmetric_key, + self.remote_conn.inbound_symmetric_key_bytes, + self.remote_conn.remote_addr, + ); + + if let Ok(ack) = ack_packet { + if let Err(send_err) = self.remote_conn + .outbound_packets + .send((self.remote_conn.remote_addr, ack.data().into())) + .await + { + tracing::warn!( + remote = ?self.remote_conn.remote_addr, + error = ?send_err, + "Failed to send ACK for intro packet" + ); + } else { + tracing::info!( + remote = ?self.remote_conn.remote_addr, + "Successfully sent ACK for intro packet" + ); + } + } else { + tracing::warn!( + remote = ?self.remote_conn.remote_addr, + "Failed to create ACK packet for intro" + ); + } + + // Continue to next packet + continue; + } + Err(rsa_err) => { + tracing::debug!( + remote = ?self.remote_conn.remote_addr, + error = ?rsa_err, + "256-byte packet is not a valid RSA intro packet" + ); + } + } + } let now = Instant::now(); if let Some(first_failure_time) = self.first_failure_time { if now.duration_since(first_failure_time) <= FAILURE_TIME_WINDOW { diff --git a/crates/core/tests/operations.rs b/crates/core/tests/operations.rs index 85e2080f7..ec43d7343 100644 --- a/crates/core/tests/operations.rs +++ b/crates/core/tests/operations.rs @@ -2022,3 +2022,418 @@ async fn test_delegate_request() -> TestResult { Ok(()) } + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_gateway_packet_size_change_after_60s() -> TestResult { + freenet::config::set_logger(Some(LevelFilter::DEBUG), None); + + // Load test contract + const TEST_CONTRACT: &str = "test-contract-integration"; + let contract = test_utils::load_contract(TEST_CONTRACT, vec![].into())?; + let contract_key = contract.key(); + + // Create initial state + let initial_state = test_utils::create_empty_todo_list(); + let wrapped_state = WrappedState::from(initial_state); + + // Create network sockets + let network_socket_gw1 = TcpListener::bind("127.0.0.1:0")?; + let network_socket_gw2 = TcpListener::bind("127.0.0.1:0")?; + let ws_api_port_socket_client = TcpListener::bind("127.0.0.1:0")?; + let ws_api_port_socket_gw1 = TcpListener::bind("127.0.0.1:0")?; + let ws_api_port_socket_gw2 = TcpListener::bind("127.0.0.1:0")?; + + // Configure first gateway node + let (config_gw1, preset_cfg_gw1, config_gw1_info) = { + let (cfg, preset) = base_node_test_config( + true, + vec![], + Some(network_socket_gw1.local_addr()?.port()), + ws_api_port_socket_gw1.local_addr()?.port(), + ) + .await?; + let public_port = cfg.network_api.public_port.unwrap(); + let path = preset.temp_dir.path().to_path_buf(); + (cfg, preset, gw_config(public_port, &path)?) + }; + + // Configure second gateway node (connects to first gateway) + let (config_gw2, preset_cfg_gw2, config_gw2_info) = { + let (cfg, preset) = base_node_test_config( + true, + vec![serde_json::to_string(&config_gw1_info)?], // Connect to gateway 1 + Some(network_socket_gw2.local_addr()?.port()), + ws_api_port_socket_gw2.local_addr()?.port(), + ) + .await?; + let public_port = cfg.network_api.public_port.unwrap(); + let path = preset.temp_dir.path().to_path_buf(); + (cfg, preset, gw_config(public_port, &path)?) + }; + + // Configure client node (connects via gateway 2) + let (config_client, preset_cfg_client) = base_node_test_config( + false, + vec![serde_json::to_string(&config_gw2_info)?], + None, + ws_api_port_socket_client.local_addr()?.port(), + ) + .await?; + let ws_api_port_client = config_client.ws_api.ws_api_port.unwrap(); + + // Log data directories + tracing::info!( + "Client node data dir: {:?}", + preset_cfg_client.temp_dir.path() + ); + tracing::info!( + "Gateway 1 node data dir: {:?}", + preset_cfg_gw1.temp_dir.path() + ); + tracing::info!( + "Gateway 2 node data dir: {:?}", + preset_cfg_gw2.temp_dir.path() + ); + + // Free ports + std::mem::drop(ws_api_port_socket_client); + std::mem::drop(network_socket_gw1); + std::mem::drop(network_socket_gw2); + std::mem::drop(ws_api_port_socket_gw1); + std::mem::drop(ws_api_port_socket_gw2); + + // Start gateway 1 node + let node_gw1 = async { + let config = config_gw1.build().await?; + let node = NodeConfig::new(config.clone()) + .await? + .build(serve_gateway(config.ws_api).await) + .await?; + node.run().await + } + .boxed_local(); + + // Start gateway 2 node (connects to gateway 1) + let node_gw2 = async { + let config = config_gw2.build().await?; + let node = NodeConfig::new(config.clone()) + .await? + .build(serve_gateway(config.ws_api).await) + .await?; + node.run().await + } + .boxed_local(); + + // Start client node + let node_client = async move { + let config = config_client.build().await?; + let node = NodeConfig::new(config.clone()) + .await? + .build(serve_gateway(config.ws_api).await) + .await?; + node.run().await + } + .boxed_local(); + + let test = tokio::time::timeout(Duration::from_secs(180), async { + // Wait for nodes to start (gateways need to connect to each other) + tokio::time::sleep(Duration::from_secs(20)).await; + + // Connect to client node + let uri = format!( + "ws://127.0.0.1:{}/v1/contract/command?encodingProtocol=native", + ws_api_port_client + ); + let (stream, _) = connect_async(&uri).await?; + let mut client = WebApi::start(stream); + + // Put contract + make_put(&mut client, wrapped_state.clone(), contract.clone(), false).await?; + + // Wait for put response + let resp = tokio::time::timeout(Duration::from_secs(30), client.recv()).await; + match resp { + Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => { + assert_eq!(key, contract_key); + tracing::info!("Successfully put contract"); + } + _ => { + bail!("Failed to put contract"); + } + } + + // Now keep the connection alive for 90 seconds, sending periodic GET requests + tracing::info!("Starting packet size change test - monitoring for 75 seconds"); + let start_time = std::time::Instant::now(); + let mut get_count = 0; + let mut error_count = 0; + + while start_time.elapsed() < Duration::from_secs(75) { + // Send a GET request every 5 seconds for more frequent monitoring + tokio::time::sleep(Duration::from_secs(5)).await; + get_count += 1; + + let elapsed = start_time.elapsed(); + tracing::info!("Sending GET request #{} at {:?}", get_count, elapsed); + + // Log if we're past the 60-second mark where errors typically start + if elapsed > Duration::from_secs(60) { + tracing::warn!("Past 60-second mark - monitoring for packet size changes"); + } + + make_get(&mut client, contract_key, false, false).await?; + + // Try to receive response with a shorter timeout + match tokio::time::timeout(Duration::from_secs(10), client.recv()).await { + Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse { + key, + .. + }))) => { + assert_eq!(key, contract_key); + tracing::info!("GET request #{} succeeded", get_count); + } + Ok(Ok(other)) => { + tracing::warn!( + "GET request #{} unexpected response: {:?}", + get_count, + other + ); + error_count += 1; + } + Ok(Err(e)) => { + tracing::error!("GET request #{} error: {}", get_count, e); + error_count += 1; + } + Err(_) => { + tracing::error!("GET request #{} timed out", get_count); + error_count += 1; + } + } + } + + tracing::info!( + "Long-running test completed: {} GET requests, {} errors", + get_count, + error_count + ); + + // The test passes if we don't crash with decryption errors + // In production, decryption errors would cause the connection to fail + if error_count > get_count / 2 { + bail!("Too many errors during long-running connection test"); + } + + Ok::<_, anyhow::Error>(()) + }); + + // Wait for test completion or node failures + select! { + gw1 = node_gw1 => { + let Err(e) = gw1; + return Err(anyhow!("Gateway 1 node failed: {}", e).into()) + } + gw2 = node_gw2 => { + let Err(e) = gw2; + return Err(anyhow!("Gateway 2 node failed: {}", e).into()) + } + client = node_client => { + let Err(e) = client; + return Err(anyhow!("Client node failed: {}", e).into()) + } + r = test => { + r??; + tokio::time::sleep(Duration::from_secs(3)).await; + } + } + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_production_decryption_error_scenario() -> TestResult { + freenet::config::set_logger(Some(LevelFilter::DEBUG), None); + + // This test attempts to reproduce the exact production scenario: + // 1. Client connects to gateway (vega) + // 2. Connection works fine for ~60 seconds with 48-byte packets + // 3. After 60 seconds, 256-byte packets arrive that fail to decrypt + + const TEST_CONTRACT: &str = "test-contract-integration"; + let contract = test_utils::load_contract(TEST_CONTRACT, vec![].into())?; + let contract_key = contract.key(); + + let initial_state = test_utils::create_empty_todo_list(); + let wrapped_state = WrappedState::from(initial_state); + + // Create sockets + let network_socket_gw = TcpListener::bind("127.0.0.1:0")?; + let ws_api_port_socket_client = TcpListener::bind("127.0.0.1:0")?; + let ws_api_port_socket_gw = TcpListener::bind("127.0.0.1:0")?; + + // Configure gateway (simulating vega) + let (config_gw, preset_cfg_gw, config_gw_info) = { + let (cfg, preset) = base_node_test_config( + true, + vec![], + Some(network_socket_gw.local_addr()?.port()), + ws_api_port_socket_gw.local_addr()?.port(), + ) + .await?; + let public_port = cfg.network_api.public_port.unwrap(); + let path = preset.temp_dir.path().to_path_buf(); + (cfg, preset, gw_config(public_port, &path)?) + }; + + // Configure client node + let (config_client, preset_cfg_client) = base_node_test_config( + false, + vec![serde_json::to_string(&config_gw_info)?], + None, + ws_api_port_socket_client.local_addr()?.port(), + ) + .await?; + let ws_api_port_client = config_client.ws_api.ws_api_port.unwrap(); + + tracing::info!( + "Client node data dir: {:?}", + preset_cfg_client.temp_dir.path() + ); + tracing::info!("Gateway node data dir: {:?}", preset_cfg_gw.temp_dir.path()); + + // Free ports + std::mem::drop(ws_api_port_socket_client); + std::mem::drop(network_socket_gw); + std::mem::drop(ws_api_port_socket_gw); + + // Start nodes + let node_gw = async { + let config = config_gw.build().await?; + let node = NodeConfig::new(config.clone()) + .await? + .build(serve_gateway(config.ws_api).await) + .await?; + node.run().await + } + .boxed_local(); + + let node_client = async move { + let config = config_client.build().await?; + let node = NodeConfig::new(config.clone()) + .await? + .build(serve_gateway(config.ws_api).await) + .await?; + node.run().await + } + .boxed_local(); + + let test = tokio::time::timeout(Duration::from_secs(90), async { + // Wait for nodes to start + tokio::time::sleep(Duration::from_secs(15)).await; + + // Connect to client node + let uri = format!( + "ws://127.0.0.1:{}/v1/contract/command?encodingProtocol=native", + ws_api_port_client + ); + let (stream, _) = connect_async(&uri).await?; + let mut client = WebApi::start(stream); + + // Put contract + make_put(&mut client, wrapped_state.clone(), contract.clone(), false).await?; + + // Wait for put response + let resp = tokio::time::timeout(Duration::from_secs(30), client.recv()).await; + match resp { + Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => { + assert_eq!(key, contract_key); + tracing::info!("Successfully put contract"); + } + _ => { + bail!("Failed to put contract"); + } + } + + // Monitor connection for 75 seconds + tracing::info!("Starting production scenario simulation - monitoring for 75 seconds"); + let start_time = std::time::Instant::now(); + let mut last_success_time = start_time; + let mut error_count = 0; + let mut success_count = 0; + + while start_time.elapsed() < Duration::from_secs(75) { + tokio::time::sleep(Duration::from_secs(3)).await; + + let elapsed = start_time.elapsed(); + + // Try a GET request + make_get(&mut client, contract_key, false, false).await?; + + match tokio::time::timeout(Duration::from_secs(5), client.recv()).await { + Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse { + key, + .. + }))) => { + assert_eq!(key, contract_key); + success_count += 1; + last_success_time = std::time::Instant::now(); + tracing::info!( + "GET succeeded at {:?} (success #{})", + elapsed, + success_count + ); + } + Ok(Ok(other)) => { + error_count += 1; + tracing::error!("GET unexpected response at {:?}: {:?}", elapsed, other); + } + Ok(Err(e)) => { + error_count += 1; + tracing::error!("GET error at {:?}: {}", elapsed, e); + } + Err(_) => { + error_count += 1; + tracing::error!("GET timeout at {:?}", elapsed); + } + } + + // Log status around the critical 60-second mark + if elapsed > Duration::from_secs(58) && elapsed < Duration::from_secs(65) { + tracing::warn!( + "Critical period - elapsed: {:?}, errors: {}, last success: {:?} ago", + elapsed, + error_count, + std::time::Instant::now().duration_since(last_success_time) + ); + } + } + + tracing::info!( + "Test completed: {} successes, {} errors", + success_count, + error_count + ); + + // In production, all requests fail after ~60 seconds + // For now, we just log the results to see if we can reproduce the pattern + + Ok::<_, anyhow::Error>(()) + }); + + // Wait for test completion or node failures + select! { + gw = node_gw => { + let Err(e) = gw; + return Err(anyhow!("Gateway node failed: {}", e).into()) + } + client = node_client => { + let Err(e) = client; + return Err(anyhow!("Client node failed: {}", e).into()) + } + r = test => { + r??; + tokio::time::sleep(Duration::from_secs(3)).await; + } + } + + Ok(()) +}