diff --git a/Cargo.lock b/Cargo.lock index 1671839de5..9996ce2c22 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6059,6 +6059,7 @@ name = "zksync_prometheus_exporter" version = "1.0.0" dependencies = [ "anyhow", + "chrono", "futures 0.3.17", "metrics", "metrics-exporter-prometheus", @@ -6068,9 +6069,11 @@ dependencies = [ "tokio", "tracing", "vlog", + "zksync_crypto", "zksync_storage", "zksync_token_db_cache", "zksync_types", + "zksync_utils", ] [[package]] diff --git a/core/lib/prometheus_exporter/Cargo.toml b/core/lib/prometheus_exporter/Cargo.toml index 47b85b8aea..a98a562c4a 100644 --- a/core/lib/prometheus_exporter/Cargo.toml +++ b/core/lib/prometheus_exporter/Cargo.toml @@ -27,3 +27,8 @@ metrics = "0.17" metrics-exporter-prometheus = "0.6" metrics-macros = "0.4" metrics-util = "0.10" + +[dev-dependencies] +zksync_utils = { path = "../../lib/utils", version = "1.0" } +chrono = { version = "0.4", features = ["serde", "rustc-serialize"] } +zksync_crypto = { path = "../../lib/crypto", version = "1.0" } diff --git a/core/lib/prometheus_exporter/src/lib.rs b/core/lib/prometheus_exporter/src/lib.rs index 0ca832149e..4cdfac9b24 100644 --- a/core/lib/prometheus_exporter/src/lib.rs +++ b/core/lib/prometheus_exporter/src/lib.rs @@ -12,7 +12,7 @@ use zksync_storage::{ConnectionPool, QueryResult, StorageProcessor}; use zksync_token_db_cache::TokenDBCache; use zksync_types::aggregated_operations::AggregatedActionType::*; use zksync_types::block::IncompleteBlock; -use zksync_types::TokenId; +use zksync_types::{ExecutedOperations, TokenId}; const QUERY_INTERVAL: Duration = Duration::from_secs(30); @@ -63,12 +63,11 @@ async fn prometheus_exporter_iteration(connection_pool: ConnectionPool) -> Query } /// Extract volumes from block -fn get_volumes(block: &IncompleteBlock) -> HashMap { +fn get_volumes(txs: &[ExecutedOperations]) -> HashMap { let mut volumes: HashMap = HashMap::new(); // Iterator over tx amounts in the block. - let amounts_iter = block - .block_transactions + let amounts_iter = txs .iter() .filter(|executed_op| executed_op.is_successful()) // Only process successful operations. .filter_map(|executed_op| executed_op.get_executed_op()) // Obtain transaction. @@ -91,7 +90,7 @@ pub async fn calculate_volume_for_block( token_db_cache: &mut TokenDBCache, ) -> Result<(), anyhow::Error> { let start = Instant::now(); - let volumes = get_volumes(block); + let volumes = get_volumes(&block.block_transactions); for (token_id, amount) in volumes.into_iter() { if let Some(price) = storage .tokens_schema() @@ -99,17 +98,19 @@ pub async fn calculate_volume_for_block( .await? { let token = token_db_cache.get_token(storage, token_id).await?.unwrap(); + let usd_amount = token_amount_to_usd(amount, token.decimals, price.usd_price); let labels = vec![("token", token.symbol)]; - let usd_amount = Ratio::from(amount) - / BigUint::from(10u32).pow(u32::from(token.decimals)) - * price.usd_price; - metrics::increment_gauge!("txs_volume", usd_amount.to_f64().unwrap(), &labels); + metrics::increment_gauge!("txs_volumes", usd_amount.to_f64().unwrap(), &labels); } } metrics::histogram!("calculate_metric", start.elapsed(), "type" => "volume_for_block"); Ok(()) } +fn token_amount_to_usd(amount: BigUint, decimals: u8, usd_price: Ratio) -> Ratio { + Ratio::from(amount) / BigUint::from(10u32).pow(u32::from(decimals)) * usd_price +} + pub fn run_prometheus_exporter(port: u16) -> JoinHandle<()> { let addr = ([0, 0, 0, 0], port); let (recorder, exporter) = PrometheusBuilder::new() @@ -127,3 +128,143 @@ pub fn run_prometheus_exporter(port: u16) -> JoinHandle<()> { } }) } + +#[cfg(test)] +mod tests { + use crate::{get_volumes, token_amount_to_usd, BigUint, ToPrimitive, TokenId}; + use chrono::Utc; + use num::FromPrimitive; + use zksync_crypto::{ + priv_key_from_fs, + rand::{thread_rng, Rng}, + }; + use zksync_storage::BigDecimal; + use zksync_types::{ + AccountId, Address, ExecutedOperations, ExecutedTx, Nonce, Order, SignedZkSyncTx, Swap, + SwapOp, Transfer, TransferOp, ZkSyncOp, ZkSyncTx, + }; + use zksync_utils::big_decimal_to_ratio; + + #[test] + fn calculate_volume() { + let usdc_price = big_decimal_to_ratio(&BigDecimal::from_f64(1.032).unwrap()).unwrap(); + let usdc_amount = BigUint::from(65169500u32); + let usdc_decimals = 6; + let volume = token_amount_to_usd(usdc_amount, usdc_decimals, usdc_price); + assert!((67.254924 - volume.to_f64().unwrap()).abs() <= f64::EPSILON); + let eth_price = big_decimal_to_ratio(&BigDecimal::from_f64(3424.05).unwrap()).unwrap(); + let eth_amount = BigUint::from(87829590000000000u64); + let eth_decimals = 18; + let volume = token_amount_to_usd(eth_amount, eth_decimals, eth_price); + assert!((300.7329076395 - volume.to_f64().unwrap()).abs() <= f64::EPSILON); + } + + fn create_transfer(amount: u64, token: TokenId, success: bool) -> ExecutedOperations { + let correct_transfer = Transfer::new( + AccountId(0), + Default::default(), + Default::default(), + token, + BigUint::from(amount), + BigUint::from(10u64), + Nonce(0), + Default::default(), + None, + ); + let transfer_op = TransferOp { + tx: correct_transfer.clone(), + from: Default::default(), + to: Default::default(), + }; + + ExecutedOperations::Tx(Box::new(ExecutedTx { + signed_tx: SignedZkSyncTx::from(ZkSyncTx::Transfer(Box::new(correct_transfer))), + success, + op: Some(ZkSyncOp::Transfer(Box::new(transfer_op))), + fail_reason: None, + block_index: None, + created_at: Utc::now(), + batch_id: None, + })) + } + + fn create_swap( + amount: u64, + token_0: TokenId, + token_1: TokenId, + success: bool, + ) -> ExecutedOperations { + let rng = &mut thread_rng(); + let sk = priv_key_from_fs(rng.gen()); + let swap = Swap::new( + AccountId(0), + Default::default(), + Nonce(0), + ( + Order::new_signed( + AccountId(1), + Address::random(), + Nonce(0), + token_1, + token_0, + (BigUint::from(1u64), BigUint::from(1u64)), + BigUint::from(amount), + Default::default(), + &sk, + ) + .unwrap(), + Order::new_signed( + AccountId(1), + Address::random(), + Nonce(0), + token_0, + token_1, + (BigUint::from(1u64), BigUint::from(1u64)), + BigUint::from(amount), + Default::default(), + &sk, + ) + .unwrap(), + ), + (BigUint::from(amount), BigUint::from(amount)), + BigUint::from(10u64), + TokenId(0), + None, + ); + let swap_op = SwapOp { + tx: swap.clone(), + submitter: Default::default(), + accounts: (Default::default(), Default::default()), + recipients: (Default::default(), Default::default()), + }; + + ExecutedOperations::Tx(Box::new(ExecutedTx { + signed_tx: SignedZkSyncTx::from(ZkSyncTx::Swap(Box::new(swap))), + success, + op: Some(ZkSyncOp::Swap(Box::new(swap_op))), + fail_reason: None, + block_index: None, + created_at: Utc::now(), + batch_id: None, + })) + } + + #[test] + fn test_get_volumes() { + let txs = vec![ + create_transfer(100, TokenId(0), true), + create_transfer(200, TokenId(0), false), + create_transfer(10, TokenId(0), true), + create_transfer(10, TokenId(1), true), + create_transfer(10, TokenId(33), true), + create_swap(10, TokenId(0), TokenId(33), true), + ]; + let res = get_volumes(&txs); + let volume = res.get(&TokenId(0)).unwrap().clone(); + assert_eq!(volume, BigUint::from(120u64)); + let volume = res.get(&TokenId(1)).unwrap().clone(); + assert_eq!(volume, BigUint::from(10u64)); + let volume = res.get(&TokenId(33)).unwrap().clone(); + assert_eq!(volume, BigUint::from(20u64)); + } +} diff --git a/core/lib/types/src/operations/mod.rs b/core/lib/types/src/operations/mod.rs index 1b07f2aeed..c891236daf 100644 --- a/core/lib/types/src/operations/mod.rs +++ b/core/lib/types/src/operations/mod.rs @@ -82,8 +82,8 @@ impl ZkSyncOp { ZkSyncOp::ForcedExit(_) => None, ZkSyncOp::MintNFTOp(_) => None, ZkSyncOp::Swap(tx) => Some(vec![ - (tx.tx.orders.0.token_buy, tx.tx.amounts.0.clone()), - (tx.tx.orders.1.token_buy, tx.tx.amounts.1.clone()), + (tx.tx.orders.0.token_sell, tx.tx.amounts.0.clone()), + (tx.tx.orders.1.token_sell, tx.tx.amounts.1.clone()), ]), ZkSyncOp::Deposit(tx) => { Some(vec![(tx.priority_op.token, tx.priority_op.amount.clone())])