|
47 | 47 | solana_client::{ |
48 | 48 | connection_cache::{ConnectionCache, DEFAULT_TPU_CONNECTION_POOL_SIZE}, |
49 | 49 | rpc_client::RpcClient, |
| 50 | + tpu_connection::TpuConnection, |
50 | 51 | }, |
51 | 52 | solana_core::serve_repair::RepairProtocol, |
52 | 53 | solana_dos::cli::*, |
|
67 | 68 | }, |
68 | 69 | solana_streamer::socket::SocketAddrSpace, |
69 | 70 | std::{ |
| 71 | + cmp::min, |
70 | 72 | net::{SocketAddr, UdpSocket}, |
71 | 73 | process::exit, |
72 | 74 | sync::Arc, |
@@ -233,6 +235,8 @@ impl TransactionGenerator { |
233 | 235 | } |
234 | 236 | } |
235 | 237 |
|
| 238 | +const SEND_BATCH_MAX_SIZE: usize = 1 << 10; |
| 239 | + |
236 | 240 | fn get_target( |
237 | 241 | nodes: &[ContactInfo], |
238 | 242 | mode: Mode, |
@@ -385,9 +389,8 @@ fn run_dos_transactions<T: 'static + BenchTpsClient + Send + Sync>( |
385 | 389 | iterations: usize, |
386 | 390 | client: Option<Arc<T>>, |
387 | 391 | transaction_params: TransactionParams, |
| 392 | + tpu_use_quic: bool, |
388 | 393 | ) { |
389 | | - let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); |
390 | | - |
391 | 394 | // Number of payers is the number of generating threads, for now it is 1 |
392 | 395 | // Later, we will create a new payer for each thread since Keypair is not clonable |
393 | 396 | let payers: Vec<Option<Keypair>> = |
@@ -417,34 +420,45 @@ fn run_dos_transactions<T: 'static + BenchTpsClient + Send + Sync>( |
417 | 420 |
|
418 | 421 | let mut transaction_generator = TransactionGenerator::new(transaction_params); |
419 | 422 |
|
| 423 | + //let connection_cache_stats = Arc::new(ConnectionCacheStats::default()); |
| 424 | + //let udp_client = UdpTpuConnection::new(target, connection_cache_stats); |
| 425 | + |
| 426 | + let connection_cache = ConnectionCache::new(tpu_use_quic, DEFAULT_TPU_CONNECTION_POOL_SIZE); |
| 427 | + let connection = connection_cache.get_connection(&target); |
| 428 | + |
420 | 429 | let mut count = 0; |
421 | 430 | let mut total_count = 0; |
422 | 431 | let mut error_count = 0; |
423 | 432 | let mut last_log = Instant::now(); |
| 433 | + |
424 | 434 | loop { |
425 | | - let chunk_keypairs = if generate_keypairs { |
426 | | - let permutation = it.next(); |
427 | | - if permutation.is_none() { |
428 | | - // if ran out of permutations, regenerate keys |
429 | | - keypairs_flat.iter_mut().for_each(|v| *v = Keypair::new()); |
430 | | - info!("Regenerate keypairs"); |
431 | | - continue; |
432 | | - } |
433 | | - let permutation = permutation.unwrap(); |
434 | | - Some(apply_permutation(permutation, &keypairs_flat)) |
435 | | - } else { |
436 | | - None |
437 | | - }; |
| 435 | + let send_batch_size = min(iterations - total_count, SEND_BATCH_MAX_SIZE); |
| 436 | + let mut data = Vec::<Vec<u8>>::with_capacity(SEND_BATCH_MAX_SIZE); |
| 437 | + for _ in 0..send_batch_size { |
| 438 | + let chunk_keypairs = if generate_keypairs { |
| 439 | + let mut permutation = it.next(); |
| 440 | + if permutation.is_none() { |
| 441 | + // if ran out of permutations, regenerate keys |
| 442 | + keypairs_flat.iter_mut().for_each(|v| *v = Keypair::new()); |
| 443 | + info!("Regenerate keypairs"); |
| 444 | + permutation = it.next(); |
| 445 | + } |
| 446 | + let permutation = permutation.unwrap(); |
| 447 | + Some(apply_permutation(permutation, &keypairs_flat)) |
| 448 | + } else { |
| 449 | + None |
| 450 | + }; |
| 451 | + let tx = transaction_generator.generate(payer, chunk_keypairs, client.as_ref()); |
| 452 | + data.push(bincode::serialize(&tx).unwrap()); |
| 453 | + } |
438 | 454 |
|
439 | | - let tx = transaction_generator.generate(payer, chunk_keypairs, client.as_ref()); |
| 455 | + let res = connection.send_wire_transaction_batch_async(data); |
440 | 456 |
|
441 | | - let data = bincode::serialize(&tx).unwrap(); |
442 | | - let res = socket.send_to(&data, target); |
443 | 457 | if res.is_err() { |
444 | | - error_count += 1; |
| 458 | + error_count += send_batch_size; |
445 | 459 | } |
446 | | - count += 1; |
447 | | - total_count += 1; |
| 460 | + count += send_batch_size; |
| 461 | + total_count += send_batch_size; |
448 | 462 | if last_log.elapsed().as_millis() > SAMPLE_PERIOD_MS as u128 { |
449 | 463 | info!( |
450 | 464 | "count: {}, errors: {}, rps: {}", |
@@ -485,7 +499,13 @@ fn run_dos<T: 'static + BenchTpsClient + Send + Sync>( |
485 | 499 | { |
486 | 500 | let target = target.expect("should have target"); |
487 | 501 | info!("Targeting {}", target); |
488 | | - run_dos_transactions(target, iterations, client, params.transaction_params); |
| 502 | + run_dos_transactions( |
| 503 | + target, |
| 504 | + iterations, |
| 505 | + client, |
| 506 | + params.transaction_params, |
| 507 | + params.tpu_use_quic, |
| 508 | + ); |
489 | 509 | } else { |
490 | 510 | let target = target.expect("should have target"); |
491 | 511 | info!("Targeting {}", target); |
|
0 commit comments