diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a1c31d5a50..415c07ed46 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -107,7 +107,8 @@ jobs: - name: run-services run: | - ci_run zk server &>server.log & + ci_run zk server core &>server.log & + ci_run zk server api &>api.log & ci_run sleep 10 ci_run zk dummy-prover run &>dummy_prover.log & ci_run sleep 100 @@ -133,6 +134,7 @@ jobs: if: always() run: | ci_run cat server.log + ci_run cat api.log ci_run cat dummy_prover.log circuit-tests: diff --git a/core/bin/block_revert/src/main.rs b/core/bin/block_revert/src/main.rs index 644241adfd..6dff9d098d 100644 --- a/core/bin/block_revert/src/main.rs +++ b/core/bin/block_revert/src/main.rs @@ -7,7 +7,7 @@ use web3::{ contract::Options, types::{TransactionReceipt, U256, U64}, }; -use zksync_config::ZkSyncConfig; +use zksync_config::{ContractsConfig, ETHClientConfig, ETHSenderConfig}; use zksync_eth_client::EthereumGateway; use zksync_storage::StorageProcessor; use zksync_types::{aggregated_operations::stored_block_info, block::Block, BlockNumber, H256}; @@ -226,18 +226,25 @@ struct Opt { #[tokio::main] async fn main() -> anyhow::Result<()> { let opt = Opt::from_args(); - let mut config = ZkSyncConfig::from_env(); let key_without_prefix = opt .operator_private_key .strip_prefix("0x") .unwrap_or_else(|| opt.operator_private_key.as_str()); - config.eth_sender.sender.operator_private_key = + let contracts = ContractsConfig::from_env(); + let eth_client_config = ETHClientConfig::from_env(); + let mut eth_sender_config = ETHSenderConfig::from_env(); + + eth_sender_config.sender.operator_private_key = H256::from_str(key_without_prefix).expect("Cannot deserialize private key"); let mut storage = StorageProcessor::establish_connection().await?; - let client = EthereumGateway::from_config(&config); + let client = EthereumGateway::from_config( + ð_client_config, + ð_sender_config, + contracts.contract_addr, + ); let last_commited_block = storage .chain() diff --git a/core/bin/server/Cargo.toml b/core/bin/server/Cargo.toml index 63d1143082..183e9bf77f 100644 --- a/core/bin/server/Cargo.toml +++ b/core/bin/server/Cargo.toml @@ -22,21 +22,21 @@ zksync_prometheus_exporter = { path = "../../lib/prometheus_exporter", version = zksync_config = { path = "../../lib/config", version = "1.0" } zksync_storage = { path = "../../lib/storage", version = "1.0" } zksync_gateway_watcher = { path = "../../lib/gateway_watcher", version = "1.0" } +zksync_utils = { path = "../../lib/utils", version = "1.0" } +zksync_types = { path = "../../lib/types", version = "1.0" } anyhow = "1.0" structopt = "0.3.20" ctrlc = { version = "3.1", features = ["termination"] } futures = "0.3" tokio = { version = "1", features = ["full"] } +serde = "1.0.90" vlog = { path = "../../lib/vlog", version = "1.0" } [dev-dependencies] zksync_crypto = { path = "../../lib/crypto", version = "1.0" } -zksync_types = { path = "../../lib/types", version = "1.0" } zksync_prover = { path = "../prover", version = "1.0" } -zksync_utils = { path = "../../lib/utils", version = "1.0" } num = { version = "0.3.1", features = ["serde"] } -serde = "1.0.90" serde_json = "1.0.0" diff --git a/core/bin/server/src/main.rs b/core/bin/server/src/main.rs index fae24abcf3..d536140797 100644 --- a/core/bin/server/src/main.rs +++ b/core/bin/server/src/main.rs @@ -1,16 +1,28 @@ use futures::{channel::mpsc, executor::block_on, SinkExt, StreamExt}; use std::cell::RefCell; +use std::str::FromStr; + use structopt::StructOpt; -use zksync_api::run_api; + +use serde::{Deserialize, Serialize}; + +use zksync_api::fee_ticker::{run_ticker_task, TickerRequest}; use zksync_core::{genesis_init, run_core, wait_for_tasks}; use zksync_eth_client::EthereumGateway; -use zksync_eth_sender::run_eth_sender; use zksync_forced_exit_requests::run_forced_exit_requests_actors; use zksync_gateway_watcher::run_gateway_watcher_if_multiplexed; -use zksync_prometheus_exporter::run_prometheus_exporter; use zksync_witness_generator::run_prover_server; -use zksync_config::ZkSyncConfig; +use tokio::task::JoinHandle; +use zksync_config::{ + configs::api::{ + CommonApiConfig, JsonRpcConfig, PrivateApiConfig, ProverApiConfig, RestApiConfig, + Web3Config, + }, + ChainConfig, ContractsConfig, DBConfig, ETHClientConfig, ETHSenderConfig, ETHWatchConfig, + ForcedExitRequestsConfig, GatewayWatcherConfig, ProverConfig, TickerConfig, ZkSyncConfig, +}; +use zksync_core::rejected_tx_cleaner::run_rejected_tx_cleaner; use zksync_storage::ConnectionPool; #[derive(Debug, Clone, Copy)] @@ -19,18 +31,94 @@ pub enum ServerCommand { Launch, } +#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)] +pub enum Component { + // Api components + RestApi, + Web3Api, + RpcApi, + RpcWebSocketApi, + + // Core components + EthSender, + Core, + WitnessGenerator, + ForcedExit, + + // Additional components + Prometheus, + RejectedTaskCleaner, +} + +impl FromStr for Component { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "rest-api" => Ok(Component::RestApi), + "web3-api" => Ok(Component::Web3Api), + "rpc-api" => Ok(Component::RpcApi), + "rpc-websocket-api" => Ok(Component::RpcWebSocketApi), + "eth-sender" => Ok(Component::EthSender), + "witness-generator" => Ok(Component::WitnessGenerator), + "forced-exit" => Ok(Component::ForcedExit), + "prometheus" => Ok(Component::Prometheus), + "core" => Ok(Component::Core), + "rejected-task-cleaner" => Ok(Component::RejectedTaskCleaner), + other => Err(format!("{} is not a valid component name", other)), + } + } +} + +#[derive(Debug)] +struct ComponentsToRun(Vec); + +impl Default for ComponentsToRun { + fn default() -> Self { + Self(vec![ + Component::RestApi, + Component::Web3Api, + Component::RpcApi, + Component::RpcWebSocketApi, + Component::EthSender, + Component::WitnessGenerator, + Component::ForcedExit, + Component::Prometheus, + Component::Core, + Component::RejectedTaskCleaner, + ]) + } +} + +impl FromStr for ComponentsToRun { + type Err = String; + + fn from_str(s: &str) -> Result { + Ok(Self( + s.split(',') + .map(|x| Component::from_str(x.trim())) + .collect::, String>>()?, + )) + } +} + #[derive(StructOpt)] #[structopt(name = "zkSync operator node", author = "Matter Labs")] struct Opt { /// Generate genesis block for the first contract deployment #[structopt(long)] genesis: bool, + /// comma-separated list of components to launch + #[structopt( + long, + default_value = "rest-api,web3-api,rpc-api,rpc-websocket-api,eth-sender,witness-generator,forced-exit,prometheus,core,rejected-task-cleaner" + )] + components: ComponentsToRun, } #[tokio::main] async fn main() -> anyhow::Result<()> { let opt = Opt::from_args(); - let config = ZkSyncConfig::from_env(); let mut _sentry_guard = None; let server_mode = if opt.genesis { ServerCommand::Genesis @@ -41,6 +129,7 @@ async fn main() -> anyhow::Result<()> { if let ServerCommand::Genesis = server_mode { vlog::info!("Performing the server genesis initialization",); + let config = ChainConfig::from_env(); genesis_init(&config).await; return Ok(()); } @@ -48,13 +137,129 @@ async fn main() -> anyhow::Result<()> { // It's a `ServerCommand::Launch`, perform the usual routine. vlog::info!("Running the zkSync server"); - let connection_pool = ConnectionPool::new(None); - let eth_gateway = EthereumGateway::from_config(&config); + run_server(&opt.components).await; - let gateway_watcher_task_opt = run_gateway_watcher_if_multiplexed(eth_gateway.clone(), &config); + Ok(()) +} - // Handle Ctrl+C +async fn run_server(components: &ComponentsToRun) { + let connection_pool = ConnectionPool::new(None); let (stop_signal_sender, mut stop_signal_receiver) = mpsc::channel(256); + let channel_size = 32768; + + let mut tasks = vec![]; + + if components.0.contains(&Component::Web3Api) { + // Run web3 api + tasks.push(zksync_api::api_server::web3::start_rpc_server( + connection_pool.clone(), + &Web3Config::from_env(), + )); + } + + if components.0.iter().any(|c| { + matches!( + c, + Component::RpcWebSocketApi | Component::RpcApi | Component::RestApi + ) + }) { + // Create gateway + let eth_gateway = create_eth_gateway(); + + let eth_watch_config = ETHWatchConfig::from_env(); + let gateway_watcher_config = GatewayWatcherConfig::from_env(); + + // Run eth multiplexer + if let Some(task) = + run_gateway_watcher_if_multiplexed(eth_gateway.clone(), &gateway_watcher_config) + { + tasks.push(task); + } + + // Run ticker + let (task, ticker_request_sender) = run_ticker(connection_pool.clone(), channel_size); + tasks.push(task); + + // Run signer + let (sign_check_sender, sign_check_receiver) = mpsc::channel(channel_size); + tasks.push(zksync_api::signature_checker::start_sign_checker( + eth_gateway, + sign_check_receiver, + )); + + let private_config = PrivateApiConfig::from_env(); + let contracts_config = ContractsConfig::from_env(); + let common_config = CommonApiConfig::from_env(); + let chain_config = ChainConfig::from_env(); + + if components.0.contains(&Component::RpcWebSocketApi) { + tasks.push(zksync_api::api_server::rpc_subscriptions::start_ws_server( + connection_pool.clone(), + sign_check_sender.clone(), + ticker_request_sender.clone(), + &common_config, + &JsonRpcConfig::from_env(), + chain_config.state_keeper.miniblock_iteration_interval(), + private_config.url.clone(), + eth_watch_config.confirmations_for_eth_event, + )); + } + + if components.0.contains(&Component::RpcApi) { + tasks.push(zksync_api::api_server::rpc_server::start_rpc_server( + connection_pool.clone(), + sign_check_sender.clone(), + ticker_request_sender.clone(), + &JsonRpcConfig::from_env(), + &common_config, + private_config.url.clone(), + eth_watch_config.confirmations_for_eth_event, + )); + } + + if components.0.contains(&Component::RestApi) { + zksync_api::api_server::rest::start_server_thread_detached( + connection_pool.clone(), + RestApiConfig::from_env().bind_addr(), + contracts_config.contract_addr, + ticker_request_sender, + sign_check_sender, + private_config.url, + ); + } + } + + if components.0.contains(&Component::EthSender) { + tasks.push(run_eth_sender(connection_pool.clone())) + } + + if components.0.contains(&Component::Core) { + let eth_gateway = create_eth_gateway(); + + tasks.append( + &mut run_core( + connection_pool.clone(), + &ZkSyncConfig::from_env(), + eth_gateway.clone(), + ) + .await + .unwrap(), + ); + } + + if components.0.contains(&Component::WitnessGenerator) { + tasks.push(run_witness_generator(connection_pool.clone())) + } + + if components.0.contains(&Component::ForcedExit) { + tasks.push(run_forced_exit(connection_pool.clone())); + } + + if components.0.contains(&Component::RejectedTaskCleaner) { + let config = DBConfig::from_env(); + tasks.push(run_rejected_tx_cleaner(&config, connection_pool)); + } + { let stop_signal_sender = RefCell::new(stop_signal_sender.clone()); ctrlc::set_handler(move || { @@ -64,69 +269,80 @@ async fn main() -> anyhow::Result<()> { .expect("Error setting Ctrl+C handler"); } - // Run prometheus data exporter. - let (prometheus_task_handle, counter_task_handle) = - run_prometheus_exporter(connection_pool.clone(), config.api.prometheus.port, true); - - // Run core actors. - vlog::info!("Starting the Core actors"); - let core_task_handles = run_core( - connection_pool.clone(), - stop_signal_sender.clone(), - eth_gateway.clone(), - &config, - ) - .await - .expect("Unable to start Core actors"); - - // Run API actors. - vlog::info!("Starting the API server actors"); - let api_task_handle = run_api( - connection_pool.clone(), - stop_signal_sender.clone(), - eth_gateway.clone(), - &config, - ); - - // Run Ethereum sender actors. - vlog::info!("Starting the Ethereum sender actors"); - let eth_sender_task_handle = - run_eth_sender(connection_pool.clone(), eth_gateway.clone(), config.clone()); - - // Run prover server & witness generator. - vlog::info!("Starting the Prover server actors"); - let database = zksync_witness_generator::database::Database::new(connection_pool.clone()); - run_prover_server(database, stop_signal_sender, ZkSyncConfig::from_env()); - - vlog::info!("Starting the ForcedExitRequests actors"); - let forced_exit_requests_task_handle = run_forced_exit_requests_actors(connection_pool, config); - tokio::select! { - _ = async { wait_for_tasks(core_task_handles).await } => { - // We don't need to do anything here, since Core actors will panic upon future resolving. - }, - _ = async { api_task_handle.await } => { - panic!("API server actors aren't supposed to finish their execution") - }, - _ = async { gateway_watcher_task_opt.unwrap().await }, if gateway_watcher_task_opt.is_some() => { - panic!("Gateway Watcher actors aren't supposed to finish their execution") - } - _ = async { eth_sender_task_handle.await } => { - panic!("Ethereum Sender actors aren't supposed to finish their execution") - }, - _ = async { prometheus_task_handle.await } => { - panic!("Prometheus exporter actors aren't supposed to finish their execution") - }, - _ = async { counter_task_handle.unwrap().await } => { - panic!("Operation counting actor is not supposed to finish its execution") - }, - _ = async { forced_exit_requests_task_handle.await } => { - panic!("ForcedExitRequests actor is not supposed to finish its execution") + _ = async { wait_for_tasks(tasks).await } => { + panic!("One if the actors is not supposed to finish its execution") }, _ = async { stop_signal_receiver.next().await } => { vlog::warn!("Stop signal received, shutting down"); } }; +} - Ok(()) +pub fn run_forced_exit(connection_pool: ConnectionPool) -> JoinHandle<()> { + vlog::info!("Starting the ForcedExitRequests actors"); + let config = ForcedExitRequestsConfig::from_env(); + let common_config = CommonApiConfig::from_env(); + let private_api_config = PrivateApiConfig::from_env(); + let contract_config = ContractsConfig::from_env(); + let eth_client_config = ETHClientConfig::from_env(); + + run_forced_exit_requests_actors( + connection_pool, + private_api_config.url, + config, + common_config, + contract_config, + eth_client_config.web3_url(), + ) +} + +pub fn run_witness_generator(connection_pool: ConnectionPool) -> JoinHandle<()> { + vlog::info!("Starting the Prover server actors"); + let prover_api_config = ProverApiConfig::from_env(); + let prover_config = ProverConfig::from_env(); + let database = zksync_witness_generator::database::Database::new(connection_pool); + run_prover_server(database, prover_api_config, prover_config) +} + +pub fn run_eth_sender(connection_pool: ConnectionPool) -> JoinHandle<()> { + vlog::info!("Starting the Ethereum sender actors"); + let eth_client_config = ETHClientConfig::from_env(); + let eth_sender_config = ETHSenderConfig::from_env(); + let contracts = ContractsConfig::from_env(); + let eth_gateway = EthereumGateway::from_config( + ð_client_config, + ð_sender_config, + contracts.contract_addr, + ); + + zksync_eth_sender::run_eth_sender(connection_pool, eth_gateway, eth_sender_config) +} + +pub fn run_ticker( + connection_pool: ConnectionPool, + channel_size: usize, +) -> (JoinHandle<()>, mpsc::Sender) { + vlog::info!("Starting Ticker actors"); + let (ticker_request_sender, ticker_request_receiver) = mpsc::channel(channel_size); + let chain_config = ChainConfig::from_env(); + let ticker_config = TickerConfig::from_env(); + let task = run_ticker_task( + connection_pool, + ticker_request_receiver, + &ticker_config, + chain_config.max_blocks_to_aggregate(), + ); + (task, ticker_request_sender) +} + +pub fn create_eth_gateway() -> EthereumGateway { + let eth_client_config = ETHClientConfig::from_env(); + let eth_sender_config = ETHSenderConfig::from_env(); + let contracts = ContractsConfig::from_env(); + EthereumGateway::from_config( + ð_client_config, + ð_sender_config, + contracts.contract_addr, + ) } diff --git a/core/bin/zksync_api/src/api_server/admin_server.rs b/core/bin/zksync_api/src/api_server/admin_server.rs deleted file mode 100644 index 852f9cfadf..0000000000 --- a/core/bin/zksync_api/src/api_server/admin_server.rs +++ /dev/null @@ -1,185 +0,0 @@ -// Built-in deps -use std::net::SocketAddr; -use std::thread; - -// External uses -use actix_web::dev::ServiceRequest; -use actix_web::{web, App, HttpResponse, HttpServer}; -use actix_web_httpauth::extractors::{ - bearer::{BearerAuth, Config}, - AuthenticationError, -}; -use actix_web_httpauth::middleware::HttpAuthentication; -use futures::channel::mpsc; -use jsonwebtoken::errors::Error as JwtError; -use jsonwebtoken::{decode, DecodingKey, Validation}; -use serde::{Deserialize, Serialize}; - -// Local uses -use zksync_storage::ConnectionPool; -use zksync_types::{tokens, Address, TokenId, TokenKind}; -use zksync_utils::panic_notify::ThreadPanicNotify; - -#[derive(Debug, Serialize, Deserialize)] -struct PayloadAuthToken { - /// Subject (whom auth token refers to). - sub: String, - /// Expiration time (as UTC timestamp). - exp: usize, -} - -#[derive(Debug, Clone)] -struct AppState { - secret_auth: String, - connection_pool: ConnectionPool, -} - -impl AppState { - async fn access_storage(&self) -> actix_web::Result> { - self.connection_pool.access_storage().await.map_err(|e| { - vlog::warn!("Failed to access storage: {}", e); - actix_web::error::ErrorInternalServerError(e) - }) - } -} - -/// Token that contains information to add to the server -#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] -struct AddTokenRequest { - /// id is used for tx signature and serialization - /// is optional because when adding the server will assign the next available ID - pub id: Option, - /// Contract address of ERC20 token or Address::zero() for "ETH" - pub address: Address, - /// Token symbol (e.g. "ETH" or "USDC") - pub symbol: String, - /// Token precision (e.g. 18 for "ETH" so "1.0" ETH = 10e18 as U256 number) - pub decimals: u8, -} - -struct AuthTokenValidator<'a> { - decoding_key: DecodingKey<'a>, -} - -impl<'a> AuthTokenValidator<'a> { - fn new(secret: &'a str) -> Self { - Self { - decoding_key: DecodingKey::from_secret(secret.as_ref()), - } - } - - /// Validate JsonWebToken - fn validate_auth_token(&self, token: &str) -> Result<(), JwtError> { - decode::(token, &self.decoding_key, &Validation::default())?; - - Ok(()) - } - - async fn validator( - &self, - req: ServiceRequest, - credentials: BearerAuth, - ) -> actix_web::Result { - let config = req.app_data::().cloned().unwrap_or_default(); - - self.validate_auth_token(credentials.token()) - .map_err(|_| AuthenticationError::from(config))?; - - Ok(req) - } -} - -async fn add_token( - data: web::Data, - token_request: web::Json, -) -> actix_web::Result { - let mut storage = data.access_storage().await?; - - // if id is None then set it to next available ID from server. - let id = match token_request.id { - Some(id) => id, - None => { - let last_token_id = storage - .tokens_schema() - .get_max_token_id() - .await - .map_err(|e| { - vlog::warn!( - "failed get number of token from database in progress request: {}", - e - ); - actix_web::error::ErrorInternalServerError("storage layer error") - })?; - let next_available_id = last_token_id + 1; - - TokenId(next_available_id) - } - }; - - let token = tokens::Token::new( - id, - token_request.address, - &token_request.symbol, - token_request.decimals, - TokenKind::ERC20, - ); - - storage - .tokens_schema() - .store_or_update_token(token.clone()) - .await - .map_err(|e| { - vlog::warn!("failed add token to database in progress request: {}", e); - actix_web::error::ErrorInternalServerError("storage layer error") - })?; - - Ok(HttpResponse::Ok().json(token)) -} - -async fn run_server(app_state: AppState, bind_to: SocketAddr) { - HttpServer::new(move || { - let auth = HttpAuthentication::bearer(move |req, credentials| async { - let secret_auth = req - .app_data::>() - .expect("failed get AppState upon receipt of the authentication token") - .secret_auth - .clone(); - AuthTokenValidator::new(&secret_auth) - .validator(req, credentials) - .await - }); - - App::new() - .wrap(auth) - .app_data(web::Data::new(app_state.clone())) - .route("/tokens", web::post().to(add_token)) - }) - .workers(1) - .bind(&bind_to) - .expect("failed to bind") - .run() - .await - .expect("failed to run endpoint server"); -} - -pub fn start_admin_server( - bind_to: SocketAddr, - secret_auth: String, - connection_pool: zksync_storage::ConnectionPool, - panic_notify: mpsc::Sender, -) { - thread::Builder::new() - .name("admin_server".to_string()) - .spawn(move || { - let _panic_sentinel = ThreadPanicNotify(panic_notify.clone()); - actix_rt::System::new().block_on(async move { - let app_state = AppState { - secret_auth, - connection_pool, - }; - - run_server(app_state, bind_to).await; - }); - }) - .expect("failed to start endpoint server"); -} diff --git a/core/bin/zksync_api/src/api_server/forced_exit_checker.rs b/core/bin/zksync_api/src/api_server/forced_exit_checker.rs index 032525a3f7..4f030bec52 100644 --- a/core/bin/zksync_api/src/api_server/forced_exit_checker.rs +++ b/core/bin/zksync_api/src/api_server/forced_exit_checker.rs @@ -1,5 +1,5 @@ use crate::api_server::tx_sender::SubmitError; -use zksync_config::ZkSyncConfig; + use zksync_storage::StorageProcessor; use zksync_types::Address; @@ -29,10 +29,9 @@ pub struct ForcedExitChecker { } impl ForcedExitChecker { - pub fn new(config: &ZkSyncConfig) -> Self { - let forced_exit_minimum_account_age = chrono::Duration::seconds( - config.api.common.forced_exit_minimum_account_age_secs as i64, - ); + pub fn new(forced_exit_minimum_account_age_secs: u64) -> Self { + let forced_exit_minimum_account_age = + chrono::Duration::seconds(forced_exit_minimum_account_age_secs as i64); Self { forced_exit_minimum_account_age, diff --git a/core/bin/zksync_api/src/api_server/mod.rs b/core/bin/zksync_api/src/api_server/mod.rs index aa31d25a0f..dcae7edb58 100644 --- a/core/bin/zksync_api/src/api_server/mod.rs +++ b/core/bin/zksync_api/src/api_server/mod.rs @@ -4,77 +4,14 @@ //! `mod rpc_server` - JSON rpc via HTTP (for request reply functions) //! `mod rpc_subscriptions` - JSON rpc via WebSocket (for request reply functions and subscriptions) -// External uses -use futures::channel::mpsc; -// Workspace uses -use zksync_config::ZkSyncConfig; -use zksync_eth_client::EthereumGateway; -use zksync_storage::ConnectionPool; -// Local uses -use crate::fee_ticker::TickerRequest; -use crate::signature_checker; - -mod admin_server; mod event_notify; pub mod forced_exit_checker; mod helpers; -mod rest; +pub mod rest; pub mod rpc_server; -mod rpc_subscriptions; +pub mod rpc_subscriptions; mod tx_sender; -mod web3; +pub mod web3; /// Amount of threads used by each server to serve requests. const THREADS_PER_SERVER: usize = 128; - -#[allow(clippy::too_many_arguments)] -pub fn start_api_server( - connection_pool: ConnectionPool, - panic_notify: mpsc::Sender, - ticker_request_sender: mpsc::Sender, - eth_gateway: EthereumGateway, - config: &ZkSyncConfig, -) { - let (sign_check_sender, sign_check_receiver) = mpsc::channel(32768); - - signature_checker::start_sign_checker_detached( - eth_gateway, - sign_check_receiver, - panic_notify.clone(), - ); - - rest::start_server_thread_detached( - connection_pool.clone(), - config.api.rest.bind_addr(), - config.contracts.contract_addr, - panic_notify.clone(), - ticker_request_sender.clone(), - sign_check_sender.clone(), - config.clone(), - ); - - rpc_subscriptions::start_ws_server( - connection_pool.clone(), - sign_check_sender.clone(), - ticker_request_sender.clone(), - panic_notify.clone(), - config, - ); - - admin_server::start_admin_server( - config.api.admin.bind_addr(), - config.api.admin.secret_auth.clone(), - connection_pool.clone(), - panic_notify.clone(), - ); - - rpc_server::start_rpc_server( - connection_pool.clone(), - sign_check_sender, - ticker_request_sender, - panic_notify.clone(), - config, - ); - - web3::start_rpc_server(connection_pool, panic_notify, config); -} diff --git a/core/bin/zksync_api/src/api_server/rest/forced_exit_requests/mod.rs b/core/bin/zksync_api/src/api_server/rest/forced_exit_requests/mod.rs index 7cdce86cb0..de3f75accd 100644 --- a/core/bin/zksync_api/src/api_server/rest/forced_exit_requests/mod.rs +++ b/core/bin/zksync_api/src/api_server/rest/forced_exit_requests/mod.rs @@ -3,23 +3,30 @@ use actix_web::{web, Scope}; // Workspace uses pub use zksync_api_client::rest::client::{Client, ClientError}; -use zksync_config::ZkSyncConfig; +use zksync_config::ForcedExitRequestsConfig; use zksync_storage::ConnectionPool; // Local uses use crate::api_server::forced_exit_checker::ForcedExitChecker; use error::ApiError; +use ethabi::Address; mod error; mod v01; pub type JsonResult = std::result::Result, ApiError>; -pub(crate) fn api_scope(connection_pool: ConnectionPool, config: &ZkSyncConfig) -> Scope { - let fe_age_checker = ForcedExitChecker::new(config); +pub(crate) fn api_scope( + connection_pool: ConnectionPool, + forced_exit_minimum_account_age_secs: u64, + config: &ForcedExitRequestsConfig, + contract: Address, +) -> Scope { + let fe_age_checker = ForcedExitChecker::new(forced_exit_minimum_account_age_secs); web::scope("/api/forced_exit_requests").service(v01::api_scope( connection_pool, config, + contract, Box::new(fe_age_checker), )) } diff --git a/core/bin/zksync_api/src/api_server/rest/forced_exit_requests/v01.rs b/core/bin/zksync_api/src/api_server/rest/forced_exit_requests/v01.rs index 24d7d27ac5..99bcf6baf3 100644 --- a/core/bin/zksync_api/src/api_server/rest/forced_exit_requests/v01.rs +++ b/core/bin/zksync_api/src/api_server/rest/forced_exit_requests/v01.rs @@ -19,7 +19,7 @@ pub use zksync_api_client::rest::forced_exit_requests::{ }; use zksync_api_client::rest::forced_exit_requests::ConfigInfo; -use zksync_config::ZkSyncConfig; +use zksync_config::ForcedExitRequestsConfig; use zksync_storage::ConnectionPool; use zksync_types::{ forced_exit_requests::{ @@ -51,21 +51,22 @@ pub struct ApiForcedExitRequestsData { impl ApiForcedExitRequestsData { fn new( connection_pool: ConnectionPool, - config: &ZkSyncConfig, + config: &ForcedExitRequestsConfig, + contract: Address, forced_exit_checker: Box, ) -> Self { Self { connection_pool, forced_exit_checker, - is_enabled: config.forced_exit_requests.enabled, - price_per_token: config.forced_exit_requests.price_per_token, - max_tokens_per_request: config.forced_exit_requests.max_tokens_per_request, - recomended_tx_interval_millisecs: config.forced_exit_requests.recomended_tx_interval, - max_tx_interval_millisecs: config.forced_exit_requests.max_tx_interval, - forced_exit_contract_address: config.contracts.forced_exit_addr, - digits_in_id: config.forced_exit_requests.digits_in_id, - wait_confirmations: config.forced_exit_requests.wait_confirmations, + is_enabled: config.enabled, + price_per_token: config.price_per_token, + max_tokens_per_request: config.max_tokens_per_request, + recomended_tx_interval_millisecs: config.recomended_tx_interval, + max_tx_interval_millisecs: config.max_tx_interval, + forced_exit_contract_address: contract, + digits_in_id: config.digits_in_id, + wait_confirmations: config.wait_confirmations, } } } @@ -221,17 +222,18 @@ pub async fn check_account_eligibility( pub fn api_scope( connection_pool: ConnectionPool, - config: &ZkSyncConfig, + config: &ForcedExitRequestsConfig, + contract: Address, fe_checker: Box, ) -> Scope { - let data = ApiForcedExitRequestsData::new(connection_pool, config, fe_checker); + let data = ApiForcedExitRequestsData::new(connection_pool, config, contract, fe_checker); // `enabled` endpoint should always be there let scope = web::scope("v0.1") .app_data(web::Data::new(data)) .route("status", web::get().to(get_status)); - if config.forced_exit_requests.enabled { + if config.enabled { scope .route("/submit", web::post().to(submit_request)) .route("/requests/{id}", web::get().to(get_request_by_id)) @@ -252,7 +254,7 @@ mod tests { use num::BigUint; use zksync_api_client::rest::client::Client; - use zksync_config::ForcedExitRequestsConfig; + use zksync_config::{ForcedExitRequestsConfig, ZkSyncConfig}; use zksync_storage::ConnectionPool; use zksync_types::{Address, TokenId}; @@ -277,7 +279,8 @@ mod tests { move |cfg| { api_scope( cfg.pool.clone(), - &cfg.config, + &cfg.config.forced_exit_requests, + cfg.config.contracts.forced_exit_addr, Box::new(DummyForcedExitChecker {}), ) }, diff --git a/core/bin/zksync_api/src/api_server/rest/mod.rs b/core/bin/zksync_api/src/api_server/rest/mod.rs index a2535da0da..660df138e7 100644 --- a/core/bin/zksync_api/src/api_server/rest/mod.rs +++ b/core/bin/zksync_api/src/api_server/rest/mod.rs @@ -5,12 +5,14 @@ use std::net::SocketAddr; use zksync_storage::ConnectionPool; use zksync_types::H160; -use zksync_utils::panic_notify::ThreadPanicNotify; +use zksync_utils::panic_notify::{spawn_panic_handler, ThreadPanicNotify}; use self::v01::api_decl::ApiV01; use crate::{fee_ticker::TickerRequest, signature_checker::VerifySignatureRequest}; use super::tx_sender::TxSender; + +use tokio::task::JoinHandle; use zksync_config::ZkSyncConfig; mod forced_exit_requests; @@ -27,15 +29,24 @@ async fn start_server( HttpServer::new(move || { let api_v01 = api_v01.clone(); - let forced_exit_requests_api_scope = - forced_exit_requests::api_scope(api_v01.connection_pool.clone(), &api_v01.config); + let forced_exit_requests_api_scope = forced_exit_requests::api_scope( + api_v01.connection_pool.clone(), + api_v01 + .config + .api + .common + .forced_exit_minimum_account_age_secs, + &api_v01.config.forced_exit_requests, + api_v01.config.contracts.forced_exit_addr, + ); let api_v02_scope = { let tx_sender = TxSender::new( api_v01.connection_pool.clone(), sign_verifier.clone(), fee_ticker.clone(), - &api_v01.config, + &api_v01.config.api.common, + api_v01.config.api.private.url.clone(), ); v02::api_scope(tx_sender, &api_v01.config) }; @@ -68,26 +79,31 @@ async fn start_server( /// Start HTTP REST API #[allow(clippy::too_many_arguments)] -pub(super) fn start_server_thread_detached( +pub fn start_server_thread_detached( connection_pool: ConnectionPool, listen_addr: SocketAddr, contract_address: H160, - panic_notify: mpsc::Sender, fee_ticker: mpsc::Sender, sign_verifier: mpsc::Sender, - config: ZkSyncConfig, -) { + private_url: String, +) -> JoinHandle<()> { + let (handler, panic_sender) = spawn_panic_handler(); + std::thread::Builder::new() .name("actix-rest-api".to_string()) .spawn(move || { - let _panic_sentinel = ThreadPanicNotify(panic_notify.clone()); + let _panic_sentinel = ThreadPanicNotify(panic_sender.clone()); actix_rt::System::new().block_on(async move { - let api_v01 = ApiV01::new(connection_pool, contract_address, config.clone()); - api_v01.spawn_network_status_updater(panic_notify); + // TODO remove this config ZKS-815 + let config = ZkSyncConfig::from_env(); + + let api_v01 = ApiV01::new(connection_pool, contract_address, private_url, config); + api_v01.spawn_network_status_updater(panic_sender); start_server(api_v01, fee_ticker, sign_verifier, listen_addr).await; }); }) .expect("Api server thread"); + handler } diff --git a/core/bin/zksync_api/src/api_server/rest/v01/api_decl.rs b/core/bin/zksync_api/src/api_server/rest/v01/api_decl.rs index 7f2551a250..3e7eec29b8 100644 --- a/core/bin/zksync_api/src/api_server/rest/v01/api_decl.rs +++ b/core/bin/zksync_api/src/api_server/rest/v01/api_decl.rs @@ -11,6 +11,7 @@ use actix_web::error::InternalError; use actix_web::{web, HttpResponse, Result as ActixResult}; use futures::channel::mpsc; use zksync_api_types::PriorityOpLookupQuery; + use zksync_config::ZkSyncConfig; use zksync_storage::{ chain::{ @@ -40,9 +41,10 @@ impl ApiV01 { pub fn new( connection_pool: ConnectionPool, contract_address: H160, + private_url: String, config: ZkSyncConfig, ) -> Self { - let api_client = CoreApiClient::new(config.api.private.url.clone()); + let api_client = CoreApiClient::new(private_url); Self { caches: Caches::new(config.api.common.caches_size), connection_pool, diff --git a/core/bin/zksync_api/src/api_server/rest/v02/fee.rs b/core/bin/zksync_api/src/api_server/rest/v02/fee.rs index 1628e423f2..0af87ae26e 100644 --- a/core/bin/zksync_api/src/api_server/rest/v02/fee.rs +++ b/core/bin/zksync_api/src/api_server/rest/v02/fee.rs @@ -114,13 +114,15 @@ mod tests { net: cfg.config.chain.eth.network, api_version: ApiVersion::V02, }; + let (client, server) = cfg.start_server( move |cfg: &TestServerConfig| { api_scope(TxSender::new( cfg.pool.clone(), dummy_sign_verifier(), dummy_fee_ticker(&[]), - &cfg.config, + &cfg.config.api.common, + cfg.config.api.private.url.clone(), )) }, Some(shared_data), diff --git a/core/bin/zksync_api/src/api_server/rest/v02/transaction.rs b/core/bin/zksync_api/src/api_server/rest/v02/transaction.rs index ffe1efbf7a..821725a2c6 100644 --- a/core/bin/zksync_api/src/api_server/rest/v02/transaction.rs +++ b/core/bin/zksync_api/src/api_server/rest/v02/transaction.rs @@ -281,7 +281,7 @@ mod tests { cfg.pool.clone(), dummy_sign_verifier(), dummy_fee_ticker(&[]), - &cfg.config, + &cfg.config.api.common, )) }, Some(shared_data), diff --git a/core/bin/zksync_api/src/api_server/rpc_server/mod.rs b/core/bin/zksync_api/src/api_server/rpc_server/mod.rs index 29309f6329..006efbd3fe 100644 --- a/core/bin/zksync_api/src/api_server/rpc_server/mod.rs +++ b/core/bin/zksync_api/src/api_server/rpc_server/mod.rs @@ -10,7 +10,7 @@ use jsonrpc_core::{Error, IoHandler, MetaIoHandler, Metadata, Middleware, Result use jsonrpc_http_server::ServerBuilder; // Workspace uses -use zksync_config::ZkSyncConfig; + use zksync_storage::{ chain::{ block::records::StorageBlockDetails, operations::records::StoredExecutedPriorityOperation, @@ -27,7 +27,7 @@ use crate::{ utils::shared_lru_cache::AsyncLruCache, }; use bigdecimal::BigDecimal; -use zksync_utils::panic_notify::ThreadPanicNotify; +use zksync_utils::panic_notify::{spawn_panic_handler, ThreadPanicNotify}; pub mod error; mod rpc_impl; @@ -37,6 +37,8 @@ pub mod types; pub use self::rpc_trait::Rpc; use self::types::*; use super::tx_sender::TxSender; +use tokio::task::JoinHandle; +use zksync_config::configs::api::{CommonApiConfig, JsonRpcConfig}; #[derive(Clone)] pub struct RpcApp { @@ -56,19 +58,21 @@ impl RpcApp { connection_pool: ConnectionPool, sign_verify_request_sender: mpsc::Sender, ticker_request_sender: mpsc::Sender, - config: &ZkSyncConfig, + config: &CommonApiConfig, + private_url: String, + confirmations_for_eth_event: u64, ) -> Self { let runtime_handle = tokio::runtime::Handle::try_current() .expect("RpcApp must be created from the context of Tokio Runtime"); - let api_requests_caches_size = config.api.common.caches_size; - let confirmations_for_eth_event = config.eth_watch.confirmations_for_eth_event; + let api_requests_caches_size = config.caches_size; let tx_sender = TxSender::new( connection_pool, sign_verify_request_sender, ticker_request_sender, config, + private_url, ); RpcApp { @@ -360,19 +364,24 @@ pub fn start_rpc_server( connection_pool: ConnectionPool, sign_verify_request_sender: mpsc::Sender, ticker_request_sender: mpsc::Sender, - panic_notify: mpsc::Sender, - config: &ZkSyncConfig, -) { - let addr = config.api.json_rpc.http_bind_addr(); - + config: &JsonRpcConfig, + common_api_config: &CommonApiConfig, + private_url: String, + confirmations_for_eth_event: u64, +) -> JoinHandle<()> { + let addr = config.http_bind_addr(); let rpc_app = RpcApp::new( connection_pool, sign_verify_request_sender, ticker_request_sender, - config, + common_api_config, + private_url, + confirmations_for_eth_event, ); + + let (handler, panic_sender) = spawn_panic_handler(); std::thread::spawn(move || { - let _panic_sentinel = ThreadPanicNotify(panic_notify); + let _panic_sentinel = ThreadPanicNotify(panic_sender); let mut io = IoHandler::new(); rpc_app.extend(&mut io); @@ -382,6 +391,7 @@ pub fn start_rpc_server( .unwrap(); server.wait(); }); + handler } #[cfg(test)] diff --git a/core/bin/zksync_api/src/api_server/rpc_subscriptions.rs b/core/bin/zksync_api/src/api_server/rpc_subscriptions.rs index 296f01a7e4..cee497f667 100644 --- a/core/bin/zksync_api/src/api_server/rpc_subscriptions.rs +++ b/core/bin/zksync_api/src/api_server/rpc_subscriptions.rs @@ -18,8 +18,10 @@ use crate::{ api_server::rpc_server::types::{ETHOpInfoResp, ResponseAccountState, TransactionInfoResp}, signature_checker::VerifySignatureRequest, }; -use zksync_config::ZkSyncConfig; -use zksync_utils::panic_notify::ThreadPanicNotify; +use std::time::Duration; +use zksync_config::configs::api::{CommonApiConfig, JsonRpcConfig}; + +use tokio::task::JoinHandle; #[rpc] pub trait RpcPubSub { @@ -178,30 +180,33 @@ pub fn start_ws_server( db_pool: ConnectionPool, sign_verify_request_sender: mpsc::Sender, ticker_request_sender: mpsc::Sender, - panic_notify: mpsc::Sender, - config: &ZkSyncConfig, -) { - let addr = config.api.json_rpc.ws_bind_addr(); + common_config: &CommonApiConfig, + config: &JsonRpcConfig, + miniblock_iteration_interval: Duration, + private_url: String, + confirmations_for_eth_event: u64, +) -> JoinHandle<()> { + let addr = config.ws_bind_addr(); let (event_sub_sender, event_sub_receiver) = mpsc::channel(2048); start_sub_notifier( db_pool.clone(), event_sub_receiver, - config.api.common.caches_size, - config.chain.state_keeper.miniblock_iteration_interval(), + common_config.caches_size, + miniblock_iteration_interval, ); let req_rpc_app = super::rpc_server::RpcApp::new( db_pool, sign_verify_request_sender, ticker_request_sender, - config, + common_config, + private_url, + confirmations_for_eth_event, ); - std::thread::spawn(move || { - let _panic_sentinel = ThreadPanicNotify(panic_notify); - + tokio::spawn(async move { let mut io = PubSubHandler::new(MetaIoHandler::default()); req_rpc_app.extend(&mut io); @@ -219,5 +224,5 @@ pub fn start_ws_server( .expect("Unable to start RPC ws server"); server.wait().expect("rpc ws server start"); - }); + }) } diff --git a/core/bin/zksync_api/src/api_server/tx_sender.rs b/core/bin/zksync_api/src/api_server/tx_sender.rs index 9897b7b243..9ebcf05fc9 100644 --- a/core/bin/zksync_api/src/api_server/tx_sender.rs +++ b/core/bin/zksync_api/src/api_server/tx_sender.rs @@ -24,7 +24,7 @@ use zksync_api_types::{ v02::transaction::{SubmitBatchResponse, Toggle2FA, Toggle2FAResponse, TxHashSerializeWrapper}, TxWithSignature, }; -use zksync_config::ZkSyncConfig; + use zksync_storage::{chain::account::records::EthAccountType, ConnectionPool}; use zksync_types::{ tx::{ @@ -47,6 +47,7 @@ use crate::{ tx_error::{Toggle2FAError, TxAddError}, utils::{block_details_cache::BlockDetailsCache, token_db_cache::TokenDBCache}, }; +use zksync_config::configs::api::CommonApiConfig; const VALIDNESS_INTERVAL_MINUTES: i64 = 40; @@ -131,9 +132,10 @@ impl TxSender { connection_pool: ConnectionPool, sign_verify_request_sender: mpsc::Sender, ticker_request_sender: mpsc::Sender, - config: &ZkSyncConfig, + config: &CommonApiConfig, + private_url: String, ) -> Self { - let core_api_client = CoreApiClient::new(config.api.private.url.clone()); + let core_api_client = CoreApiClient::new(private_url); Self::with_client( core_api_client, @@ -149,12 +151,11 @@ impl TxSender { connection_pool: ConnectionPool, sign_verify_request_sender: mpsc::Sender, ticker_request_sender: mpsc::Sender, - config: &ZkSyncConfig, + config: &CommonApiConfig, ) -> Self { let max_number_of_transactions_per_batch = - config.api.common.max_number_of_transactions_per_batch as usize; - let max_number_of_authors_per_batch = - config.api.common.max_number_of_authors_per_batch as usize; + config.max_number_of_transactions_per_batch as usize; + let max_number_of_authors_per_batch = config.max_number_of_authors_per_batch as usize; Self { core_api_client, @@ -162,11 +163,13 @@ impl TxSender { sign_verify_requests: sign_verify_request_sender, ticker_requests: ticker_request_sender, tokens: TokenDBCache::new(), - forced_exit_checker: ForcedExitChecker::new(config), - enforce_pubkey_change_fee: config.api.common.enforce_pubkey_change_fee, - blocks: BlockDetailsCache::new(config.api.common.caches_size), + forced_exit_checker: ForcedExitChecker::new( + config.forced_exit_minimum_account_age_secs, + ), + enforce_pubkey_change_fee: config.enforce_pubkey_change_fee, + blocks: BlockDetailsCache::new(config.caches_size), - fee_free_accounts: HashSet::from_iter(config.api.common.fee_free_accounts.clone()), + fee_free_accounts: HashSet::from_iter(config.fee_free_accounts.clone()), max_number_of_transactions_per_batch, max_number_of_authors_per_batch, } diff --git a/core/bin/zksync_api/src/api_server/web3/mod.rs b/core/bin/zksync_api/src/api_server/web3/mod.rs index 964f1c7f77..898ca6b219 100644 --- a/core/bin/zksync_api/src/api_server/web3/mod.rs +++ b/core/bin/zksync_api/src/api_server/web3/mod.rs @@ -1,15 +1,18 @@ // Built-in uses // External uses -use futures::channel::mpsc; + use jsonrpc_core::{Error, IoHandler, MetaIoHandler, Metadata, Middleware, Result}; use jsonrpc_http_server::ServerBuilder; // Workspace uses -use zksync_config::ZkSyncConfig; + use zksync_storage::{ConnectionPool, StorageProcessor}; -use zksync_utils::panic_notify::ThreadPanicNotify; +use zksync_utils::panic_notify::{spawn_panic_handler, ThreadPanicNotify}; // Local uses use self::{calls::CallsHelper, logs::LogsHelper, rpc_trait::Web3Rpc}; +use tokio::task::JoinHandle; +use zksync_config::configs::api::Web3Config; + mod calls; mod converter; mod logs; @@ -28,12 +31,12 @@ pub struct Web3RpcApp { connection_pool: ConnectionPool, logs_helper: LogsHelper, calls_helper: CallsHelper, - chain_id: u8, max_block_range: u32, + chain_id: u32, } impl Web3RpcApp { - pub fn new(connection_pool: ConnectionPool, config: &ZkSyncConfig) -> Self { + pub fn new(connection_pool: ConnectionPool, config: &Web3Config) -> Self { let runtime_handle = tokio::runtime::Handle::try_current() .expect("Web3RpcApp must be created from the context of Tokio Runtime"); Web3RpcApp { @@ -41,8 +44,8 @@ impl Web3RpcApp { connection_pool, logs_helper: LogsHelper::new(), calls_helper: CallsHelper::new(), - chain_id: config.eth_client.chain_id, - max_block_range: config.api.web3.max_block_range, + max_block_range: config.max_block_range, + chain_id: config.chain_id, } } @@ -60,14 +63,16 @@ impl Web3RpcApp { pub fn start_rpc_server( connection_pool: ConnectionPool, - panic_notify: mpsc::Sender, - config: &ZkSyncConfig, -) { - let addr = config.api.web3.bind_addr(); + web3_config: &Web3Config, +) -> JoinHandle<()> { + let addr = web3_config.bind_addr(); + + let rpc_app = Web3RpcApp::new(connection_pool, web3_config); + let (handler, panic_sender) = spawn_panic_handler(); - let rpc_app = Web3RpcApp::new(connection_pool, config); std::thread::spawn(move || { - let _panic_sentinel = ThreadPanicNotify(panic_notify); + let _panic_sentinel = ThreadPanicNotify(panic_sender); + let mut io = IoHandler::new(); rpc_app.extend(&mut io); @@ -77,4 +82,5 @@ pub fn start_rpc_server( .unwrap(); server.wait(); }); + handler } diff --git a/core/bin/zksync_api/src/api_server/web3/rpc_trait.rs b/core/bin/zksync_api/src/api_server/web3/rpc_trait.rs index 93ced72832..a6d7bd0913 100644 --- a/core/bin/zksync_api/src/api_server/web3/rpc_trait.rs +++ b/core/bin/zksync_api/src/api_server/web3/rpc_trait.rs @@ -21,12 +21,12 @@ macro_rules! spawn { #[rpc] pub trait Web3Rpc { - #[rpc(name = "web3_clientVersion", returns = "String")] - fn web3_client_version(&self) -> Result; - #[rpc(name = "net_version", returns = "String")] fn net_version(&self) -> Result; + #[rpc(name = "web3_clientVersion", returns = "String")] + fn web3_client_version(&self) -> Result; + #[rpc(name = "eth_protocolVersion", returns = "String")] fn protocol_version(&self) -> Result; @@ -97,14 +97,14 @@ pub trait Web3Rpc { } impl Web3Rpc for Web3RpcApp { - fn web3_client_version(&self) -> Result { - Ok(String::from("zkSync")) - } - fn net_version(&self) -> Result { Ok(self.chain_id.to_string()) } + fn web3_client_version(&self) -> Result { + Ok(String::from("zkSync")) + } + fn protocol_version(&self) -> Result { Ok(String::from("0")) } diff --git a/core/bin/zksync_api/src/api_server/web3/tests.rs b/core/bin/zksync_api/src/api_server/web3/tests.rs index 0f11013108..61abbc1ad2 100644 --- a/core/bin/zksync_api/src/api_server/web3/tests.rs +++ b/core/bin/zksync_api/src/api_server/web3/tests.rs @@ -8,7 +8,7 @@ use jsonrpc_core_client::{RawClient, RpcError, RpcResult}; use num::BigUint; use serde_json::{Map, Value}; // Workspace uses -use zksync_config::ZkSyncConfig; + use zksync_storage::{chain::operations_ext::records::Web3TxReceipt, ConnectionPool}; use zksync_test_account::ZkSyncAccount; use zksync_types::{ @@ -24,12 +24,13 @@ use super::{ Web3RpcApp, NFT_FACTORY_ADDRESS, ZKSYNC_PROXY_ADDRESS, }; use crate::api_server::rest::v02::test_utils::TestServerConfig; +use zksync_config::configs::api::Web3Config; async fn local_client() -> anyhow::Result<(RawClient, impl Future>)> { let cfg = TestServerConfig::default(); cfg.fill_database().await?; - let rpc_app = Web3RpcApp::new(cfg.pool, &cfg.config); + let rpc_app = Web3RpcApp::new(cfg.pool, &cfg.config.api.web3); let mut io = IoHandler::new(); rpc_app.extend(&mut io); @@ -86,8 +87,8 @@ async fn static_methods() -> anyhow::Result<()> { (gas_price, accounts, get_uncle_count_by_block_hash, get_uncle_count_by_block_number, _), ) = fut.await; assert_eq!(web3_client_version.unwrap().as_str().unwrap(), "zkSync"); - assert_eq!(net_version.unwrap().as_str().unwrap(), "9"); assert_eq!(protocol_version.unwrap().as_str().unwrap(), "0"); + assert_eq!(net_version.unwrap().as_str().unwrap(), "240"); assert!(!mining.unwrap().as_bool().unwrap()); assert_eq!(hashrate.unwrap().as_str().unwrap(), "0x0"); assert_eq!(gas_price.unwrap().as_str().unwrap(), "0x0"); @@ -464,7 +465,7 @@ async fn get_block() -> anyhow::Result<()> { async fn create_logs() -> anyhow::Result<()> { let cfg = TestServerConfig::default(); cfg.fill_database().await?; - let rpc_app = Web3RpcApp::new(cfg.pool, &cfg.config); + let rpc_app = Web3RpcApp::new(cfg.pool, &cfg.config.api.web3); let from_account_id = AccountId(3); let from_account = ZkSyncAccount::rand_with_seed([1, 2, 3, 4]); @@ -852,7 +853,7 @@ async fn get_transaction_receipt() -> anyhow::Result<()> { .web3_receipt_by_hash(&tx_hash) .await? .unwrap(); - let rpc_app = Web3RpcApp::new(pool.clone(), &ZkSyncConfig::from_env()); + let rpc_app = Web3RpcApp::new(pool.clone(), &Web3Config::from_env()); rpc_app.tx_receipt(&mut storage, receipt).await? }; assert_eq!( @@ -871,7 +872,7 @@ async fn get_transaction_receipt() -> anyhow::Result<()> { )] async fn get_logs() -> anyhow::Result<()> { let pool = ConnectionPool::new(Some(1)); - let rpc_app = Web3RpcApp::new(pool.clone(), &ZkSyncConfig::from_env()); + let rpc_app = Web3RpcApp::new(pool.clone(), &Web3Config::from_env()); // Checks that it returns error if `fromBlock` is greater than `toBlock`. let fut = { @@ -896,9 +897,12 @@ async fn get_logs() -> anyhow::Result<()> { // Checks that it returns error if block range is too big. let fut = { let (client, server) = { - let mut config = ZkSyncConfig::from_env(); - config.api.web3.max_block_range = 3; - + let config = Web3Config { + port: 0, + url: "".to_string(), + max_block_range: 3, + chain_id: 9, + }; let rpc_app = Web3RpcApp::new(pool.clone(), &config); let mut io = IoHandler::new(); rpc_app.extend(&mut io); diff --git a/core/bin/zksync_api/src/fee_ticker/mod.rs b/core/bin/zksync_api/src/fee_ticker/mod.rs index c050457607..374fee77c3 100644 --- a/core/bin/zksync_api/src/fee_ticker/mod.rs +++ b/core/bin/zksync_api/src/fee_ticker/mod.rs @@ -27,7 +27,7 @@ use tokio::task::JoinHandle; use tokio::time::Instant; // Workspace deps use zksync_balancer::{Balancer, BuildBalancedItem}; -use zksync_config::{configs::ticker::TokenPriceSource, ZkSyncConfig}; +use zksync_config::configs::ticker::TokenPriceSource; use zksync_storage::ConnectionPool; use zksync_types::{ tokens::ChangePubKeyFeeTypeArg, tx::ChangePubKeyType, Address, BatchFee, ChangePubKeyOp, Fee, @@ -253,40 +253,38 @@ impl pub fn run_ticker_task( db_pool: ConnectionPool, tricker_requests: Receiver, - config: &ZkSyncConfig, + config: &zksync_config::TickerConfig, + max_blocks_to_aggregate: u32, ) -> JoinHandle<()> { let ticker_config = TickerConfig { zkp_cost_chunk_usd: Ratio::from_integer(BigUint::from(10u32).pow(3u32)).inv(), - gas_cost_tx: GasOperationsCost::from_constants(config.ticker.fast_processing_coeff), + gas_cost_tx: GasOperationsCost::from_constants(config.fast_processing_coeff), tokens_risk_factors: HashMap::new(), scale_fee_coefficient: Ratio::new( - BigUint::from(config.ticker.scale_fee_percent), + BigUint::from(config.scale_fee_percent), BigUint::from(100u32), ), - max_blocks_to_aggregate: std::cmp::max( - config.chain.state_keeper.max_aggregated_blocks_to_commit, - config.chain.state_keeper.max_aggregated_blocks_to_execute, - ) as u32, + max_blocks_to_aggregate, }; let cache = (db_pool.clone(), TokenDBCache::new()); - let watcher = UniswapTokenWatcher::new(config.ticker.uniswap_url.clone()); + let watcher = UniswapTokenWatcher::new(config.uniswap_url.clone()); let validator = FeeTokenValidator::new( cache.clone(), - chrono::Duration::seconds(config.ticker.available_liquidity_seconds as i64), - BigDecimal::try_from(config.ticker.liquidity_volume).expect("Valid f64 for decimal"), - HashSet::from_iter(config.ticker.unconditionally_valid_tokens.clone()), + chrono::Duration::seconds(config.available_liquidity_seconds as i64), + BigDecimal::try_from(config.liquidity_volume).expect("Valid f64 for decimal"), + HashSet::from_iter(config.unconditionally_valid_tokens.clone()), watcher.clone(), ); let updater = MarketUpdater::new(cache, watcher); - tokio::spawn(updater.keep_updated(config.ticker.token_market_update_time)); + tokio::spawn(updater.keep_updated(config.token_market_update_time)); let client = reqwest::ClientBuilder::new() .timeout(CONNECTION_TIMEOUT) .connect_timeout(CONNECTION_TIMEOUT) .build() .expect("Failed to build reqwest::Client"); - let (price_source, base_url) = config.ticker.price_source(); + let (price_source, base_url) = config.price_source(); match price_source { TokenPriceSource::CoinMarketCap => { let token_price_api = @@ -332,7 +330,7 @@ pub fn run_ticker_task( validator, }, tricker_requests, - config.ticker.number_of_ticker_actors, + config.number_of_ticker_actors, TICKER_CHANNEL_SIZE, ); diff --git a/core/bin/zksync_api/src/lib.rs b/core/bin/zksync_api/src/lib.rs index 147e2a3a2e..fafa774658 100644 --- a/core/bin/zksync_api/src/lib.rs +++ b/core/bin/zksync_api/src/lib.rs @@ -1,11 +1,5 @@ #![recursion_limit = "256"] -use crate::{api_server::start_api_server, fee_ticker::run_ticker_task}; -use futures::channel::mpsc; -use zksync_config::ZkSyncConfig; -use zksync_eth_client::EthereumGateway; -use zksync_storage::ConnectionPool; - pub mod api_server; pub mod core_api_client; pub mod eth_checker; @@ -13,26 +7,3 @@ pub mod fee_ticker; pub mod signature_checker; pub mod tx_error; pub mod utils; - -/// Runs the application actors. -pub fn run_api( - connection_pool: ConnectionPool, - panic_notify: mpsc::Sender, - eth_gateway: EthereumGateway, - config: &ZkSyncConfig, -) -> tokio::task::JoinHandle<()> { - let channel_size = 32768; - let (ticker_request_sender, ticker_request_receiver) = mpsc::channel(channel_size); - - let ticker_task = run_ticker_task(connection_pool.clone(), ticker_request_receiver, config); - - start_api_server( - connection_pool, - panic_notify, - ticker_request_sender, - eth_gateway, - config, - ); - - ticker_task -} diff --git a/core/bin/zksync_api/src/main.rs b/core/bin/zksync_api/src/main.rs deleted file mode 100644 index 638bfe6a0e..0000000000 --- a/core/bin/zksync_api/src/main.rs +++ /dev/null @@ -1,51 +0,0 @@ -use futures::{channel::mpsc, executor::block_on, SinkExt, StreamExt}; -use std::cell::RefCell; -use zksync_api::run_api; -use zksync_config::ZkSyncConfig; -use zksync_eth_client::EthereumGateway; -use zksync_gateway_watcher::run_gateway_watcher_if_multiplexed; -use zksync_prometheus_exporter::run_prometheus_exporter; -use zksync_storage::ConnectionPool; - -#[tokio::main] -async fn main() -> anyhow::Result<()> { - let _sentry_guard = vlog::init(); - // handle ctrl+c - let config = ZkSyncConfig::from_env(); - let (stop_signal_sender, mut stop_signal_receiver) = mpsc::channel(256); - { - let stop_signal_sender = RefCell::new(stop_signal_sender.clone()); - ctrlc::set_handler(move || { - let mut sender = stop_signal_sender.borrow_mut(); - block_on(sender.send(true)).expect("Ctrl+C signal send"); - }) - .expect("Error setting Ctrl+C handler"); - } - let connection_pool = ConnectionPool::new(None); - let eth_gateway = EthereumGateway::from_config(&config); - - // Run prometheus data exporter. - let (prometheus_task_handle, _) = - run_prometheus_exporter(connection_pool.clone(), config.api.prometheus.port, false); - - let gateway_watcher_task_opt = run_gateway_watcher_if_multiplexed(eth_gateway.clone(), &config); - - let task_handle = run_api(connection_pool, stop_signal_sender, eth_gateway, &config); - - tokio::select! { - _ = async { task_handle.await } => { - panic!("API server actors aren't supposed to finish their execution") - }, - _ = async { gateway_watcher_task_opt.unwrap().await }, if gateway_watcher_task_opt.is_some() => { - panic!("Gateway Watcher actors aren't supposed to finish their execution") - } - _ = async { prometheus_task_handle.await } => { - panic!("Prometheus exporter actors aren't supposed to finish their execution") - }, - _ = async { stop_signal_receiver.next().await } => { - vlog::warn!("Stop signal received, shutting down"); - } - }; - - Ok(()) -} diff --git a/core/bin/zksync_api/src/signature_checker.rs b/core/bin/zksync_api/src/signature_checker.rs index 3ba0d2ecbe..8c426e1c69 100644 --- a/core/bin/zksync_api/src/signature_checker.rs +++ b/core/bin/zksync_api/src/signature_checker.rs @@ -13,16 +13,16 @@ use futures::{ channel::{mpsc, oneshot}, StreamExt, }; -use tokio::runtime::{Builder, Handle}; +use tokio::task::JoinHandle; + // Workspace uses +use zksync_eth_client::EthereumGateway; use zksync_types::{ tx::{EthBatchSignData, EthSignData, TxEthSignature}, Address, Order, SignedZkSyncTx, Token, ZkSyncTx, }; // Local uses use crate::{eth_checker::EthereumChecker, tx_error::TxAddError}; -use zksync_eth_client::EthereumGateway; -use zksync_utils::panic_notify::ThreadPanicNotify; /// `TxVariant` is used to form a verify request. It is possible to wrap /// either a single transaction, or the transaction batch. @@ -380,42 +380,26 @@ impl RequestData { /// Main routine of the concurrent signature checker. /// See the module documentation for details. -pub fn start_sign_checker_detached( +pub fn start_sign_checker( client: EthereumGateway, input: mpsc::Receiver, - panic_notify: mpsc::Sender, -) { +) -> JoinHandle<()> { let eth_checker = EthereumChecker::new(client); - /// Main signature check requests handler. /// Basically it receives the requests through the channel and verifies signatures, /// notifying the request sender about the check result. async fn checker_routine( - handle: Handle, mut input: mpsc::Receiver, eth_checker: EthereumChecker, ) { while let Some(VerifySignatureRequest { data, response }) = input.next().await { let eth_checker = eth_checker.clone(); - handle.spawn(async move { + tokio::spawn(async move { let resp = VerifiedTx::verify(data, ð_checker).await; response.send(resp).unwrap_or_default(); }); } } - - std::thread::Builder::new() - .name("Signature checker thread".to_string()) - .spawn(move || { - let _panic_sentinel = ThreadPanicNotify(panic_notify.clone()); - - let runtime = Builder::new_multi_thread() - .enable_all() - .build() - .expect("failed to build runtime for signature processor"); - let handle = runtime.handle().clone(); - runtime.block_on(checker_routine(handle, input, eth_checker)); - }) - .expect("failed to start signature checker thread"); + tokio::spawn(checker_routine(input, eth_checker)) } diff --git a/core/bin/zksync_core/src/bin/eth_watcher.rs b/core/bin/zksync_core/src/bin/eth_watcher.rs index a978c49b4b..f4f8375dfc 100644 --- a/core/bin/zksync_core/src/bin/eth_watcher.rs +++ b/core/bin/zksync_core/src/bin/eth_watcher.rs @@ -3,7 +3,7 @@ use std::time::Duration; use futures::{channel::mpsc, SinkExt}; use tokio::{runtime::Runtime, time}; -use zksync_config::ZkSyncConfig; +use zksync_config::{ContractsConfig, ETHClientConfig, ETHSenderConfig}; use zksync_core::eth_watch::{EthHttpClient, EthWatch, EthWatchRequest}; use zksync_eth_client::EthereumGateway; @@ -13,16 +13,18 @@ fn main() { let _sentry_guard = vlog::init(); vlog::info!("ETH watcher started"); - let config = ZkSyncConfig::from_env(); - let client = EthereumGateway::from_config(&config); + let contracts = ContractsConfig::from_env(); + let eth_client_config = ETHClientConfig::from_env(); + let eth_sender_config = ETHSenderConfig::from_env(); + let client = EthereumGateway::from_config( + ð_client_config, + ð_sender_config, + contracts.contract_addr, + ); let (eth_req_sender, eth_req_receiver) = mpsc::channel(256); - let eth_client = EthHttpClient::new( - client, - config.contracts.contract_addr, - config.contracts.governance_addr, - ); + let eth_client = EthHttpClient::new(client, contracts.contract_addr, contracts.governance_addr); let watcher = EthWatch::new(eth_client, 0); main_runtime.spawn(watcher.run(eth_req_receiver)); diff --git a/core/bin/zksync_core/src/bin/multiplexed_gateway_watcher.rs b/core/bin/zksync_core/src/bin/multiplexed_gateway_watcher.rs index 7efa3e2706..2230982dc1 100644 --- a/core/bin/zksync_core/src/bin/multiplexed_gateway_watcher.rs +++ b/core/bin/zksync_core/src/bin/multiplexed_gateway_watcher.rs @@ -1,19 +1,26 @@ -use zksync_config::ZkSyncConfig; +use zksync_config::{ContractsConfig, ETHClientConfig, ETHSenderConfig, GatewayWatcherConfig}; use zksync_eth_client::EthereumGateway; use zksync_gateway_watcher::MultiplexedGatewayWatcher; #[tokio::main] async fn main() { vlog::init(); - let config = ZkSyncConfig::from_env(); + let contracts = ContractsConfig::from_env(); + let eth_client_config = ETHClientConfig::from_env(); + let eth_sender_config = ETHSenderConfig::from_env(); + let eth_watcher_config = GatewayWatcherConfig::from_env(); MultiplexedGatewayWatcher::new( - EthereumGateway::from_config(&config), - config.gateway_watcher.check_interval(), - config.gateway_watcher.retry_delay(), - config.gateway_watcher.request_timeout(), - Some(config.gateway_watcher.request_per_task_limit()), - Some(config.gateway_watcher.task_limit()), + EthereumGateway::from_config( + ð_client_config, + ð_sender_config, + contracts.contract_addr, + ), + eth_watcher_config.check_interval(), + eth_watcher_config.retry_delay(), + eth_watcher_config.request_timeout(), + Some(eth_watcher_config.request_per_task_limit()), + Some(eth_watcher_config.task_limit()), ) .run() .await; diff --git a/core/bin/zksync_core/src/committer/aggregated_committer.rs b/core/bin/zksync_core/src/committer/aggregated_committer.rs index 20d30b37c1..06c5256e92 100644 --- a/core/bin/zksync_core/src/committer/aggregated_committer.rs +++ b/core/bin/zksync_core/src/committer/aggregated_committer.rs @@ -1,6 +1,6 @@ use chrono::{DateTime, Utc}; use std::{cmp::max, time::Duration}; -use zksync_config::ZkSyncConfig; +use zksync_config::ChainConfig; use zksync_crypto::proof::AggregatedProof; use zksync_storage::{ chain::{block::BlockSchema, operations::OperationsSchema}, @@ -223,7 +223,7 @@ async fn is_fast_processing_requested( async fn create_aggregated_commits_storage( storage: &mut StorageProcessor<'_>, - config: &ZkSyncConfig, + config: &ChainConfig, ) -> anyhow::Result { let mut transaction = storage.start_transaction().await?; let last_aggregate_committed_block = OperationsSchema(&mut transaction) @@ -252,9 +252,9 @@ async fn create_aggregated_commits_storage( &old_committed_block, &new_blocks, Utc::now(), - config.chain.state_keeper.max_aggregated_blocks_to_commit, - config.chain.state_keeper.block_commit_deadline(), - config.chain.state_keeper.max_aggregated_tx_gas.into(), + config.state_keeper.max_aggregated_blocks_to_commit, + config.state_keeper.block_commit_deadline(), + config.state_keeper.max_aggregated_tx_gas.into(), fast_processing_requested, ); @@ -275,7 +275,7 @@ async fn create_aggregated_commits_storage( async fn create_aggregated_prover_task_storage( storage: &mut StorageProcessor<'_>, - config: &ZkSyncConfig, + config: &ChainConfig, ) -> anyhow::Result { let mut transaction = storage.start_transaction().await?; let last_aggregate_committed_block = OperationsSchema(&mut transaction) @@ -311,10 +311,10 @@ async fn create_aggregated_prover_task_storage( let create_proof_operation = create_new_create_proof_operation( &blocks_with_proofs, - &config.chain.state_keeper.aggregated_proof_sizes, + &config.state_keeper.aggregated_proof_sizes, Utc::now(), - config.chain.state_keeper.block_prove_deadline(), - config.chain.state_keeper.max_aggregated_tx_gas.into(), + config.state_keeper.block_prove_deadline(), + config.state_keeper.max_aggregated_tx_gas.into(), fast_processing_requested, ); let result = if let Some(operation) = create_proof_operation { @@ -403,7 +403,7 @@ async fn create_aggregated_publish_proof_operation_storage( async fn create_aggregated_execute_operation_storage( storage: &mut StorageProcessor<'_>, - config: &ZkSyncConfig, + config: &ChainConfig, ) -> anyhow::Result { let mut transaction = storage.start_transaction().await?; let last_aggregate_executed_block = OperationsSchema(&mut transaction) @@ -433,9 +433,9 @@ async fn create_aggregated_execute_operation_storage( let execute_operation = create_execute_blocks_operation( &blocks, Utc::now(), - config.chain.state_keeper.max_aggregated_blocks_to_execute, - config.chain.state_keeper.block_execute_deadline(), - config.chain.state_keeper.max_aggregated_tx_gas.into(), + config.state_keeper.max_aggregated_blocks_to_execute, + config.state_keeper.block_execute_deadline(), + config.state_keeper.max_aggregated_tx_gas.into(), fast_processing_requested, ); @@ -456,7 +456,7 @@ async fn create_aggregated_execute_operation_storage( pub async fn create_aggregated_operations_storage( storage: &mut StorageProcessor<'_>, - config: &ZkSyncConfig, + config: &ChainConfig, ) -> anyhow::Result<()> { while create_aggregated_commits_storage(storage, config).await? {} while create_aggregated_prover_task_storage(storage, config).await? {} diff --git a/core/bin/zksync_core/src/committer/mod.rs b/core/bin/zksync_core/src/committer/mod.rs index 676cd34187..f1e262cae7 100644 --- a/core/bin/zksync_core/src/committer/mod.rs +++ b/core/bin/zksync_core/src/committer/mod.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; use tokio::{task::JoinHandle, time}; // Workspace uses use crate::mempool::MempoolBlocksRequest; -use zksync_config::ZkSyncConfig; +use zksync_config::ChainConfig; use zksync_storage::ConnectionPool; use zksync_types::{ block::{Block, BlockMetadata, ExecutedOperations, PendingBlock}, @@ -197,7 +197,7 @@ async fn commit_block( metrics::histogram!("committer.commit_block", start.elapsed()); } -async fn poll_for_new_proofs_task(pool: ConnectionPool, config: ZkSyncConfig) { +async fn poll_for_new_proofs_task(pool: ConnectionPool, config: ChainConfig) { let mut timer = time::interval(PROOF_POLL_INTERVAL); loop { timer.tick().await; @@ -219,12 +219,12 @@ pub fn run_committer( rx_for_ops: Receiver, mempool_req_sender: Sender, pool: ConnectionPool, - config: &ZkSyncConfig, + config: ChainConfig, ) -> JoinHandle<()> { tokio::spawn(handle_new_commit_task( rx_for_ops, mempool_req_sender, pool.clone(), )); - tokio::spawn(poll_for_new_proofs_task(pool, config.clone())) + tokio::spawn(poll_for_new_proofs_task(pool, config)) } diff --git a/core/bin/zksync_core/src/eth_watch/mod.rs b/core/bin/zksync_core/src/eth_watch/mod.rs index 3246d11cae..38eb226641 100644 --- a/core/bin/zksync_core/src/eth_watch/mod.rs +++ b/core/bin/zksync_core/src/eth_watch/mod.rs @@ -29,7 +29,7 @@ use zksync_api_types::{ }, Either, }; -use zksync_config::ZkSyncConfig; +use zksync_config::{ContractsConfig, ETHWatchConfig}; use zksync_crypto::params::PRIORITY_EXPIRATION; use zksync_eth_client::ethereum_gateway::EthereumGateway; use zksync_types::{ @@ -680,22 +680,20 @@ pub fn start_eth_watch( eth_req_sender: mpsc::Sender, eth_req_receiver: mpsc::Receiver, eth_gateway: EthereumGateway, - config_options: &ZkSyncConfig, + contract_config: &ContractsConfig, + eth_watcher_config: ÐWatchConfig, ) -> JoinHandle<()> { let eth_client = EthHttpClient::new( eth_gateway, - config_options.contracts.contract_addr, - config_options.contracts.governance_addr, + contract_config.contract_addr, + contract_config.governance_addr, ); - let eth_watch = EthWatch::new( - eth_client, - config_options.eth_watch.confirmations_for_eth_event, - ); + let eth_watch = EthWatch::new(eth_client, eth_watcher_config.confirmations_for_eth_event); tokio::spawn(eth_watch.run(eth_req_receiver)); - let poll_interval = config_options.eth_watch.poll_interval(); + let poll_interval = eth_watcher_config.poll_interval(); tokio::spawn(async move { let mut timer = time::interval(poll_interval); diff --git a/core/bin/zksync_core/src/lib.rs b/core/bin/zksync_core/src/lib.rs index 60ea2d617d..85a06147b2 100644 --- a/core/bin/zksync_core/src/lib.rs +++ b/core/bin/zksync_core/src/lib.rs @@ -6,15 +6,13 @@ use crate::{ eth_watch::start_eth_watch, mempool::run_mempool_tasks, private_api::start_private_core_api, - rejected_tx_cleaner::run_rejected_tx_cleaner, state_keeper::{start_state_keeper, ZkSyncStateKeeper}, token_handler::run_token_handler, }; use futures::{channel::mpsc, future}; use tokio::task::JoinHandle; -use zksync_config::ZkSyncConfig; +use zksync_config::{ChainConfig, ZkSyncConfig}; use zksync_eth_client::EthereumGateway; -use zksync_gateway_watcher::run_gateway_watcher_if_multiplexed; use zksync_storage::ConnectionPool; use zksync_types::{tokens::get_genesis_token_list, Token, TokenId, TokenKind}; @@ -51,17 +49,14 @@ pub async fn wait_for_tasks(task_futures: Vec>) { } /// Inserts the initial information about zkSync tokens into the database. -pub async fn genesis_init(config: &ZkSyncConfig) { +pub async fn genesis_init(config: &ChainConfig) { let pool = ConnectionPool::new(Some(1)); vlog::info!("Generating genesis block."); - ZkSyncStateKeeper::create_genesis_block( - pool.clone(), - &config.chain.state_keeper.fee_account_addr, - ) - .await; + ZkSyncStateKeeper::create_genesis_block(pool.clone(), &config.state_keeper.fee_account_addr) + .await; vlog::info!("Adding initial tokens to db"); - let genesis_tokens = get_genesis_token_list(&config.chain.eth.network.to_string()) + let genesis_tokens = get_genesis_token_list(&config.eth.network.to_string()) .expect("Initial token list not found"); for (id, token) in (1..).zip(genesis_tokens) { vlog::info!( @@ -97,9 +92,8 @@ pub async fn genesis_init(config: &ZkSyncConfig) { /// - private Core API server. pub async fn run_core( connection_pool: ConnectionPool, - panic_notify: mpsc::Sender, - eth_gateway: EthereumGateway, config: &ZkSyncConfig, + eth_gateway: EthereumGateway, ) -> anyhow::Result>> { let (proposed_blocks_sender, proposed_blocks_receiver) = mpsc::channel(DEFAULT_CHANNEL_CAPACITY); @@ -119,7 +113,8 @@ pub async fn run_core( eth_watch_req_sender.clone(), eth_watch_req_receiver, eth_gateway.clone(), - config, + &config.contracts, + &config.eth_watch, ); // Insert pending withdrawals into database (if required) @@ -148,7 +143,7 @@ pub async fn run_core( proposed_blocks_receiver, mempool_block_request_sender.clone(), connection_pool.clone(), - config, + config.chain.clone(), ); // Start mempool. @@ -157,28 +152,24 @@ pub async fn run_core( mempool_tx_request_receiver, mempool_block_request_receiver, eth_watch_req_sender.clone(), - config, 4, DEFAULT_CHANNEL_CAPACITY, + config.chain.state_keeper.block_chunk_sizes.clone(), ); - let gateway_watcher_task_opt = run_gateway_watcher_if_multiplexed(eth_gateway.clone(), config); - // Start token handler. let token_handler_task = run_token_handler( connection_pool.clone(), eth_watch_req_sender.clone(), - config, + &config.token_handler, ); // Start token handler. let register_factory_task = run_register_factory_handler( connection_pool.clone(), eth_watch_req_sender.clone(), - config, + config.token_handler.clone(), ); - // Start rejected transactions cleaner task. - let rejected_tx_cleaner_task = run_rejected_tx_cleaner(config, connection_pool.clone()); let tx_event_emitter_task = tx_event_emitter::run_tx_event_emitter_task( connection_pool.clone(), @@ -193,28 +184,23 @@ pub async fn run_core( ); // Start private API. - start_private_core_api( - panic_notify.clone(), + let private_api_task = start_private_core_api( mempool_tx_request_sender, eth_watch_req_sender, config.api.private.clone(), ); - let mut task_futures = vec![ + let task_futures = vec![ eth_watch_task, state_keeper_task, committer_task, mempool_task, proposer_task, - rejected_tx_cleaner_task, token_handler_task, register_factory_task, tx_event_emitter_task, + private_api_task, ]; - if let Some(task) = gateway_watcher_task_opt { - task_futures.push(task); - } - Ok(task_futures) } diff --git a/core/bin/zksync_core/src/main.rs b/core/bin/zksync_core/src/main.rs deleted file mode 100644 index f35ca444ce..0000000000 --- a/core/bin/zksync_core/src/main.rs +++ /dev/null @@ -1,51 +0,0 @@ -use futures::{channel::mpsc, executor::block_on, SinkExt, StreamExt}; -use std::cell::RefCell; -use zksync_config::ZkSyncConfig; -use zksync_core::{run_core, wait_for_tasks}; -use zksync_eth_client::EthereumGateway; -use zksync_prometheus_exporter::run_prometheus_exporter; -use zksync_storage::ConnectionPool; - -#[tokio::main] -async fn main() -> anyhow::Result<()> { - let _sentry_guard = vlog::init(); - - // Handle ctrl+c. - let config = ZkSyncConfig::from_env(); - let eth_gateway = EthereumGateway::from_config(&config); - let (stop_signal_sender, mut stop_signal_receiver) = mpsc::channel(256); - { - let stop_signal_sender = RefCell::new(stop_signal_sender.clone()); - ctrlc::set_handler(move || { - let mut sender = stop_signal_sender.borrow_mut(); - block_on(sender.send(true)).expect("Ctrl+C signal send"); - }) - .expect("Error setting Ctrl+C handler"); - } - let connection_pool = ConnectionPool::new(None); - - // Run prometheus data exporter. - let (prometheus_task_handle, counter_task_handle) = - run_prometheus_exporter(connection_pool.clone(), config.api.prometheus.port, true); - - let task_handles = run_core(connection_pool, stop_signal_sender, eth_gateway, &config) - .await - .expect("Unable to start Core actors"); - - tokio::select! { - _ = async { wait_for_tasks(task_handles).await } => { - // We don't need to do anything here, since actors will panic upon future resolving. - }, - _ = async { prometheus_task_handle.await } => { - panic!("Prometheus exporter actors aren't supposed to finish their execution") - }, - _ = async { counter_task_handle.unwrap().await } => { - panic!("Operation counting actor is not supposed to finish its execution") - }, - _ = async { stop_signal_receiver.next().await } => { - vlog::warn!("Stop signal received, shutting down"); - } - }; - - Ok(()) -} diff --git a/core/bin/zksync_core/src/mempool/mod.rs b/core/bin/zksync_core/src/mempool/mod.rs index 165e4d7f06..41155cef31 100644 --- a/core/bin/zksync_core/src/mempool/mod.rs +++ b/core/bin/zksync_core/src/mempool/mod.rs @@ -32,7 +32,7 @@ use tokio::task::JoinHandle; // Workspace uses use zksync_balancer::{Balancer, BuildBalancedItem}; -use zksync_config::ZkSyncConfig; + use zksync_storage::ConnectionPool; use zksync_types::{ mempool::{SignedTxVariant, SignedTxsBatch}, @@ -636,17 +636,13 @@ pub fn run_mempool_tasks( tx_requests: mpsc::Receiver, block_requests: mpsc::Receiver, eth_watch_req: mpsc::Sender, - config: &ZkSyncConfig, number_of_mempool_transaction_handlers: u8, channel_capacity: usize, + block_chunk_sizes: Vec, ) -> JoinHandle<()> { - let config = config.clone(); tokio::spawn(async move { let mempool_state = Arc::new(RwLock::new(MempoolState::restore_from_db(&db_pool).await)); - let max_block_size_chunks = *config - .chain - .state_keeper - .block_chunk_sizes + let max_block_size_chunks = *block_chunk_sizes .iter() .max() .expect("failed to find max block chunks size"); diff --git a/core/bin/zksync_core/src/private_api.rs b/core/bin/zksync_core/src/private_api.rs index 12a630baa6..9f324c5df0 100644 --- a/core/bin/zksync_core/src/private_api.rs +++ b/core/bin/zksync_core/src/private_api.rs @@ -13,14 +13,16 @@ use actix_web::{web, App, HttpResponse, HttpServer}; use futures::{ channel::{mpsc, oneshot}, sink::SinkExt, + StreamExt, }; use serde::Deserialize; use std::{str::FromStr, thread}; +use tokio::task::JoinHandle; use zksync_api_types::{ v02::pagination::{ApiEither, PaginationDirection, PaginationQuery, PendingOpsRequest}, PriorityOpLookupQuery, }; -use zksync_config::configs::api::PrivateApi; +use zksync_config::configs::api::PrivateApiConfig; use zksync_types::{tx::TxEthSignature, AccountId, Address, SignedZkSyncTx}; use zksync_utils::panic_notify::ThreadPanicNotify; @@ -180,15 +182,16 @@ async fn unconfirmed_op( #[allow(clippy::too_many_arguments)] pub fn start_private_core_api( - panic_notify: mpsc::Sender, mempool_tx_sender: mpsc::Sender, eth_watch_req_sender: mpsc::Sender, - config: PrivateApi, -) { + config: PrivateApiConfig, +) -> JoinHandle<()> { + let (panic_sender, mut panic_receiver) = mpsc::channel(1); + thread::Builder::new() .name("core-private-api".to_string()) .spawn(move || { - let _panic_sentinel = ThreadPanicNotify(panic_notify.clone()); + let _panic_sentinel = ThreadPanicNotify(panic_sender.clone()); let actix_runtime = actix_rt::System::new(); actix_runtime.block_on(async move { @@ -218,4 +221,7 @@ pub fn start_private_core_api( }) }) .expect("failed to start prover server"); + tokio::spawn(async move { + panic_receiver.next().await.unwrap(); + }) } diff --git a/core/bin/zksync_core/src/register_factory_handler.rs b/core/bin/zksync_core/src/register_factory_handler.rs index 6d4b67a376..b2e9a27503 100644 --- a/core/bin/zksync_core/src/register_factory_handler.rs +++ b/core/bin/zksync_core/src/register_factory_handler.rs @@ -1,136 +1,134 @@ -// Built-in deps -use std::time::Duration; - -// External uses -use futures::{ - channel::{mpsc, oneshot}, - SinkExt, -}; -use tokio::task::JoinHandle; -// Workspace uses -use zksync_config::{TokenHandlerConfig, ZkSyncConfig}; -use zksync_storage::{ConnectionPool, StorageProcessor}; -use zksync_types::RegisterNFTFactoryEvent; -// Local uses -use crate::eth_watch::EthWatchRequest; - -/// Handle events about registering factories for minting tokens -#[derive(Debug)] -struct NFTFactoryHandler { - connection_pool: ConnectionPool, - poll_interval: Duration, - eth_watch_req: mpsc::Sender, - last_eth_block: Option, -} - -impl NFTFactoryHandler { - async fn new( - connection_pool: ConnectionPool, - eth_watch_req: mpsc::Sender, - config: TokenHandlerConfig, - ) -> Self { - let poll_interval = config.poll_interval(); - - Self { - connection_pool, - eth_watch_req, - poll_interval, - last_eth_block: None, - } - } - - async fn load_register_nft_factory_events(&self) -> Vec { - let (sender, receiver) = oneshot::channel(); - self.eth_watch_req - .clone() - .send(EthWatchRequest::GetRegisterNFTFactoryEvents { - last_eth_block: self.last_eth_block, - resp: sender, - }) - .await - .expect("ETH watch req receiver dropped"); - - receiver.await.expect("Err response from eth watch") - } - - async fn save_register_factory( - &self, - storage: &mut StorageProcessor<'_>, - register_nft_factory_events: Vec, - ) -> anyhow::Result<()> { - let mut transaction = storage.start_transaction().await?; - - let factories = { - let mut factories = vec![]; - let mut account_schema = transaction.chain().account_schema(); - for factory in register_nft_factory_events { - // If account does not exists skip factory - if let Some(account_id) = account_schema - .account_id_by_address(factory.creator_address) - .await? - { - factories.push((account_id, factory)) - } else { - vlog::warn!( - "Cant register factory, creator {:?} does not exist", - &factory.creator_address - ) - } - } - factories - }; - - let mut token_schema = transaction.tokens_schema(); - for (account_id, nft_factory) in factories { - token_schema - .store_nft_factory( - account_id, - nft_factory.creator_address, - nft_factory.factory_address, - ) - .await? - } - transaction.commit().await?; - Ok(()) - } - - async fn run(&mut self) { - let mut timer = tokio::time::interval(self.poll_interval); - loop { - timer.tick().await; - - let register_nft_factory_events = self.load_register_nft_factory_events().await; - - self.last_eth_block = register_nft_factory_events - .iter() - .map(|event| event.eth_block) - .max() - .or(self.last_eth_block); - - let mut storage = self - .connection_pool - .access_storage() - .await - .expect("db connection failed for token handler"); - - self.save_register_factory(&mut storage, register_nft_factory_events) - .await - .expect("failed to add register tokens to the database"); - } - } -} - -#[must_use] -pub fn run_register_factory_handler( - db_pool: ConnectionPool, - eth_watch_req: mpsc::Sender, - config: &ZkSyncConfig, -) -> JoinHandle<()> { - let config = config.clone(); - tokio::spawn(async move { - let mut handler = - NFTFactoryHandler::new(db_pool, eth_watch_req, config.token_handler.clone()).await; - - handler.run().await - }) -} +// Built-in deps +use std::time::Duration; + +// External uses +use futures::{ + channel::{mpsc, oneshot}, + SinkExt, +}; +use tokio::task::JoinHandle; +// Workspace uses +use zksync_config::TokenHandlerConfig; +use zksync_storage::{ConnectionPool, StorageProcessor}; +use zksync_types::RegisterNFTFactoryEvent; +// Local uses +use crate::eth_watch::EthWatchRequest; + +/// Handle events about registering factories for minting tokens +#[derive(Debug)] +struct NFTFactoryHandler { + connection_pool: ConnectionPool, + poll_interval: Duration, + eth_watch_req: mpsc::Sender, + last_eth_block: Option, +} + +impl NFTFactoryHandler { + async fn new( + connection_pool: ConnectionPool, + eth_watch_req: mpsc::Sender, + config: &TokenHandlerConfig, + ) -> Self { + let poll_interval = config.poll_interval(); + + Self { + connection_pool, + eth_watch_req, + poll_interval, + last_eth_block: None, + } + } + + async fn load_register_nft_factory_events(&self) -> Vec { + let (sender, receiver) = oneshot::channel(); + self.eth_watch_req + .clone() + .send(EthWatchRequest::GetRegisterNFTFactoryEvents { + last_eth_block: self.last_eth_block, + resp: sender, + }) + .await + .expect("ETH watch req receiver dropped"); + + receiver.await.expect("Err response from eth watch") + } + + async fn save_register_factory( + &self, + storage: &mut StorageProcessor<'_>, + register_nft_factory_events: Vec, + ) -> anyhow::Result<()> { + let mut transaction = storage.start_transaction().await?; + + let factories = { + let mut factories = vec![]; + let mut account_schema = transaction.chain().account_schema(); + for factory in register_nft_factory_events { + // If account does not exists skip factory + if let Some(account_id) = account_schema + .account_id_by_address(factory.creator_address) + .await? + { + factories.push((account_id, factory)) + } else { + vlog::warn!( + "Cant register factory, creator {:?} does not exist", + &factory.creator_address + ) + } + } + factories + }; + + let mut token_schema = transaction.tokens_schema(); + for (account_id, nft_factory) in factories { + token_schema + .store_nft_factory( + account_id, + nft_factory.creator_address, + nft_factory.factory_address, + ) + .await? + } + transaction.commit().await?; + Ok(()) + } + + async fn run(&mut self) { + let mut timer = tokio::time::interval(self.poll_interval); + loop { + timer.tick().await; + + let register_nft_factory_events = self.load_register_nft_factory_events().await; + + self.last_eth_block = register_nft_factory_events + .iter() + .map(|event| event.eth_block) + .max() + .or(self.last_eth_block); + + let mut storage = self + .connection_pool + .access_storage() + .await + .expect("db connection failed for token handler"); + + self.save_register_factory(&mut storage, register_nft_factory_events) + .await + .expect("failed to add register tokens to the database"); + } + } +} + +#[must_use] +pub fn run_register_factory_handler( + db_pool: ConnectionPool, + eth_watch_req: mpsc::Sender, + config: TokenHandlerConfig, +) -> JoinHandle<()> { + tokio::spawn(async move { + let mut handler = NFTFactoryHandler::new(db_pool, eth_watch_req, &config).await; + + handler.run().await + }) +} diff --git a/core/bin/zksync_core/src/rejected_tx_cleaner.rs b/core/bin/zksync_core/src/rejected_tx_cleaner.rs index ff8c14a3c2..853cf0e028 100644 --- a/core/bin/zksync_core/src/rejected_tx_cleaner.rs +++ b/core/bin/zksync_core/src/rejected_tx_cleaner.rs @@ -10,13 +10,13 @@ use tokio::{task::JoinHandle, time}; // Workspace deps -use zksync_config::ZkSyncConfig; +use zksync_config::DBConfig; use zksync_storage::ConnectionPool; #[must_use] -pub fn run_rejected_tx_cleaner(config: &ZkSyncConfig, db_pool: ConnectionPool) -> JoinHandle<()> { - let max_age = chrono::Duration::from_std(config.db.rejected_transactions_max_age()).unwrap(); - let interval = config.db.rejected_transactions_cleaner_interval(); +pub fn run_rejected_tx_cleaner(config: &DBConfig, db_pool: ConnectionPool) -> JoinHandle<()> { + let max_age = chrono::Duration::from_std(config.rejected_transactions_max_age()).unwrap(); + let interval = config.rejected_transactions_cleaner_interval(); let mut timer = time::interval(interval); tokio::spawn(async move { diff --git a/core/bin/zksync_core/src/token_handler.rs b/core/bin/zksync_core/src/token_handler.rs index 90c442c412..4b089992fd 100644 --- a/core/bin/zksync_core/src/token_handler.rs +++ b/core/bin/zksync_core/src/token_handler.rs @@ -14,7 +14,7 @@ use futures::{ }; use tokio::task::JoinHandle; // Workspace uses -use zksync_config::{TokenHandlerConfig, ZkSyncConfig}; +use zksync_config::TokenHandlerConfig; use zksync_notifier::Notifier; use zksync_storage::{tokens::StoreTokenError, ConnectionPool, StorageProcessor}; use zksync_types::{ @@ -234,12 +234,11 @@ impl TokenHandler { pub fn run_token_handler( db_pool: ConnectionPool, eth_watch_req: mpsc::Sender, - config: &ZkSyncConfig, + config: &TokenHandlerConfig, ) -> JoinHandle<()> { let config = config.clone(); tokio::spawn(async move { - let mut token_handler = - TokenHandler::new(db_pool, eth_watch_req, config.token_handler.clone()).await; + let mut token_handler = TokenHandler::new(db_pool, eth_watch_req, config.clone()).await; token_handler.run().await }) diff --git a/core/bin/zksync_eth_sender/src/lib.rs b/core/bin/zksync_eth_sender/src/lib.rs index 411923432f..de0a4c43c9 100644 --- a/core/bin/zksync_eth_sender/src/lib.rs +++ b/core/bin/zksync_eth_sender/src/lib.rs @@ -14,7 +14,7 @@ use web3::{ types::{TransactionReceipt, H256, U256}, }; // Workspace uses -use zksync_config::{ETHSenderConfig, ZkSyncConfig}; +use zksync_config::ETHSenderConfig; use zksync_eth_client::{EthereumGateway, SignedCallResult}; use zksync_storage::ConnectionPool; use zksync_types::ethereum::ETHOperation; @@ -798,12 +798,12 @@ impl ETHSender { pub fn run_eth_sender( pool: ConnectionPool, eth_gateway: EthereumGateway, - options: ZkSyncConfig, + options: ETHSenderConfig, ) -> JoinHandle<()> { let db = Database::new(pool); tokio::spawn(async move { - let eth_sender = ETHSender::new(options.eth_sender, db, eth_gateway).await; + let eth_sender = ETHSender::new(options, db, eth_gateway).await; eth_sender.run().await }) diff --git a/core/bin/zksync_eth_sender/src/main.rs b/core/bin/zksync_eth_sender/src/main.rs deleted file mode 100644 index a1b2cd3380..0000000000 --- a/core/bin/zksync_eth_sender/src/main.rs +++ /dev/null @@ -1,55 +0,0 @@ -use futures::{channel::mpsc, executor::block_on, SinkExt, StreamExt}; -use std::cell::RefCell; -use zksync_config::ZkSyncConfig; -use zksync_eth_client::EthereumGateway; -use zksync_eth_sender::run_eth_sender; -use zksync_gateway_watcher::run_gateway_watcher_if_multiplexed; -use zksync_prometheus_exporter::run_prometheus_exporter; -use zksync_storage::ConnectionPool; - -#[tokio::main] -async fn main() -> anyhow::Result<()> { - // `eth_sender` doesn't require many connections to the database. - const ETH_SENDER_CONNECTION_POOL_SIZE: u32 = 2; - - let _sentry_guard = vlog::init(); - - // handle ctrl+c - let (stop_signal_sender, mut stop_signal_receiver) = mpsc::channel(256); - { - let stop_signal_sender = RefCell::new(stop_signal_sender.clone()); - ctrlc::set_handler(move || { - let mut sender = stop_signal_sender.borrow_mut(); - block_on(sender.send(true)).expect("crtlc signal send"); - }) - .expect("Error setting Ctrl-C handler"); - } - - let pool = ConnectionPool::new(Some(ETH_SENDER_CONNECTION_POOL_SIZE)); - let config = ZkSyncConfig::from_env(); - let eth_gateway = EthereumGateway::from_config(&config); - let gateway_watcher_task_opt = run_gateway_watcher_if_multiplexed(eth_gateway.clone(), &config); - - // Run prometheus data exporter. - let (prometheus_task_handle, _) = - run_prometheus_exporter(pool.clone(), config.api.prometheus.port, false); - - let task_handle = run_eth_sender(pool, eth_gateway, config); - - tokio::select! { - _ = async { task_handle.await } => { - panic!("Ethereum sender actors aren't supposed to finish their execution") - }, - _ = async { gateway_watcher_task_opt.unwrap().await }, if gateway_watcher_task_opt.is_some() => { - panic!("Gateway Watcher actors aren't supposed to finish their execution") - }, - _ = async { prometheus_task_handle.await } => { - panic!("Prometheus exporter actors aren't supposed to finish their execution") - }, - _ = async { stop_signal_receiver.next().await } => { - vlog::warn!("Stop signal received, shutting down"); - } - }; - - Ok(()) -} diff --git a/core/bin/zksync_forced_exit_requests/src/core_interaction_wrapper.rs b/core/bin/zksync_forced_exit_requests/src/core_interaction_wrapper.rs index 0debcf4c69..d17210cb70 100644 --- a/core/bin/zksync_forced_exit_requests/src/core_interaction_wrapper.rs +++ b/core/bin/zksync_forced_exit_requests/src/core_interaction_wrapper.rs @@ -1,6 +1,6 @@ use chrono::Utc; use num::Zero; -use zksync_config::ZkSyncConfig; + use zksync_storage::{chain::operations_ext::records::TxReceiptResponse, ConnectionPool}; use zksync_types::{ forced_exit_requests::{ForcedExitRequest, ForcedExitRequestId}, @@ -52,11 +52,11 @@ pub struct MempoolCoreInteractionWrapper { impl MempoolCoreInteractionWrapper { pub fn new( - config: ZkSyncConfig, + forced_exit_minimum_account_age_secs: u64, core_api_client: CoreApiClient, connection_pool: ConnectionPool, ) -> Self { - let forced_exit_checker = ForcedExitChecker::new(&config); + let forced_exit_checker = ForcedExitChecker::new(forced_exit_minimum_account_age_secs); Self { core_api_client, connection_pool, diff --git a/core/bin/zksync_forced_exit_requests/src/eth_watch.rs b/core/bin/zksync_forced_exit_requests/src/eth_watch.rs index 723ffaaf10..f60523a533 100644 --- a/core/bin/zksync_forced_exit_requests/src/eth_watch.rs +++ b/core/bin/zksync_forced_exit_requests/src/eth_watch.rs @@ -14,7 +14,7 @@ use web3::{ types::{BlockNumber, FilterBuilder, Log}, Web3, }; -use zksync_config::ZkSyncConfig; +use zksync_config::ForcedExitRequestsConfig; use zksync_storage::ConnectionPool; use zksync_contracts::forced_exit_contract; @@ -129,7 +129,7 @@ where Interactor: CoreInteractionWrapper, { core_interaction_wrapper: Interactor, - config: ZkSyncConfig, + config: ForcedExitRequestsConfig, eth_client: Client, last_viewed_block: u64, forced_exit_sender: Sender, @@ -183,7 +183,7 @@ where { pub fn new( core_interaction_wrapper: Interactor, - config: ZkSyncConfig, + config: ForcedExitRequestsConfig, eth_client: Client, forced_exit_sender: Sender, db_cleanup_interval: chrono::Duration, @@ -207,7 +207,7 @@ where .core_interaction_wrapper .get_oldest_unfulfilled_request() .await?; - let wait_confirmations = self.config.forced_exit_requests.wait_confirmations; + let wait_confirmations = self.config.wait_confirmations; // No oldest request means that there are no requests that were possibly ignored let oldest_request = match oldest_request { @@ -276,7 +276,6 @@ where pub async fn delete_expired(&mut self) -> anyhow::Result<()> { let expiration_time = chrono::Duration::milliseconds( self.config - .forced_exit_requests .expiration_period .try_into() .expect("Failed to convert expiration period to i64"), @@ -301,7 +300,7 @@ where } }; - let wait_confirmations = self.config.forced_exit_requests.wait_confirmations; + let wait_confirmations = self.config.wait_confirmations; let last_confirmed_block = last_block.saturating_sub(wait_confirmations); if last_confirmed_block <= self.last_viewed_block { return; @@ -309,7 +308,7 @@ where let block_to_watch_from = self .last_viewed_block - .saturating_sub(self.config.forced_exit_requests.blocks_check_amount); + .saturating_sub(self.config.blocks_check_amount); let events = self .eth_client @@ -374,7 +373,7 @@ where .await .expect("Failed to restore state for ForcedExit eth_watcher"); - let mut timer = time::interval(self.config.forced_exit_requests.poll_interval()); + let mut timer = time::interval(self.config.poll_interval()); loop { timer.tick().await; @@ -386,15 +385,18 @@ where pub fn run_forced_exit_contract_watcher( core_api_client: CoreApiClient, connection_pool: ConnectionPool, - config: ZkSyncConfig, + config: ForcedExitRequestsConfig, + forced_exit_minimum_account_age_secs: u64, + contract: Address, + web3_url: String, ) -> JoinHandle<()> { - let transport = web3::transports::Http::new(&config.eth_client.web3_url[0]).unwrap(); + let transport = web3::transports::Http::new(&web3_url).unwrap(); let web3 = web3::Web3::new(transport); - let eth_client = EthHttpClient::new(web3, config.contracts.forced_exit_addr); + let eth_client = EthHttpClient::new(web3, contract); tokio::spawn(async move { // We should not proceed if the feature is disabled - if !config.forced_exit_requests.enabled { + if !config.enabled { infinite_async_loop().await } @@ -409,7 +411,7 @@ pub fn run_forced_exit_contract_watcher( .unwrap(); let core_interaction_wrapper = MempoolCoreInteractionWrapper::new( - config.clone(), + forced_exit_minimum_account_age_secs, core_api_client, connection_pool.clone(), ); @@ -480,7 +482,7 @@ pub async fn infinite_async_loop() { mod test { use num::{BigUint, FromPrimitive}; use std::{str::FromStr, sync::Mutex}; - use zksync_config::ZkSyncConfig; + use zksync_types::{forced_exit_requests::ForcedExitRequest, Address, TokenId}; use super::*; @@ -540,7 +542,7 @@ mod test { fn get_test_forced_exit_contract_watcher() -> TestForcedExitContractWatcher { let core_interaction_wrapper = MockCoreInteractionWrapper::default(); - let config = ZkSyncConfig::from_env(); + let config = ForcedExitRequestsConfig::from_env(); let eth_client = MockEthClient { events: vec![], current_block_number: TEST_FIRST_CURRENT_BLOCK, @@ -611,9 +613,7 @@ mod test { // that both wait_confirmations and the time of creation of the oldest unfulfilled request // is taken into account - let confirmations_time = ZkSyncConfig::from_env() - .forced_exit_requests - .wait_confirmations; + let confirmations_time = ForcedExitRequestsConfig::from_env().wait_confirmations; // Case 1. No requests => choose the youngest stable block let mut watcher = get_test_forced_exit_contract_watcher(); @@ -683,7 +683,7 @@ mod test { let mut watcher = get_test_forced_exit_contract_watcher(); let wait_confirmations = 5; - watcher.config.forced_exit_requests.wait_confirmations = wait_confirmations; + watcher.config.wait_confirmations = wait_confirmations; watcher.eth_client.events = vec![ FundsReceivedEvent { diff --git a/core/bin/zksync_forced_exit_requests/src/forced_exit_sender.rs b/core/bin/zksync_forced_exit_requests/src/forced_exit_sender.rs index d269c415ee..079a6d382c 100644 --- a/core/bin/zksync_forced_exit_requests/src/forced_exit_sender.rs +++ b/core/bin/zksync_forced_exit_requests/src/forced_exit_sender.rs @@ -4,7 +4,7 @@ use chrono::{DateTime, Utc}; use num::BigUint; use tokio::time; -use zksync_config::ZkSyncConfig; +use zksync_config::ForcedExitRequestsConfig; use zksync_types::{ forced_exit_requests::ForcedExitRequest, tx::TimeRange, tx::TxHash, AccountId, Address, Nonce, @@ -29,7 +29,7 @@ pub trait ForcedExitSender { pub struct MempoolForcedExitSender { core_interaction_wrapper: T, - config: ZkSyncConfig, + config: ForcedExitRequestsConfig, forced_exit_sender_account_id: AccountId, sender_private_key: PrivateKey, } @@ -63,11 +63,11 @@ impl ForcedExitSender for MempoolForced impl MempoolForcedExitSender { pub fn new( core_interaction_wrapper: T, - config: ZkSyncConfig, + config: ForcedExitRequestsConfig, forced_exit_sender_account_id: AccountId, ) -> Self { - let sender_private_key = hex::decode(&config.forced_exit_requests.sender_private_key[2..]) - .expect("Decoding private key failed"); + let sender_private_key = + hex::decode(&config.sender_private_key[2..]).expect("Decoding private key failed"); let sender_private_key = read_signing_key(&sender_private_key).expect("Reading private key failed"); @@ -225,10 +225,7 @@ impl MempoolForcedExitSender { amount: BigUint, submission_time: DateTime, ) -> anyhow::Result<()> { - let (id, amount) = utils::extract_id_from_amount( - amount, - self.config.forced_exit_requests.digits_in_id as u32, - ); + let (id, amount) = utils::extract_id_from_amount(amount, self.config.digits_in_id as u32); let fe_request = self.core_interaction_wrapper.get_request_by_id(id).await?; @@ -280,11 +277,11 @@ mod test { const TEST_ACCOUNT_FORCED_EXIT_SENDER_ID: u32 = 12; fn get_test_forced_exit_sender( - config: Option, + config: Option, ) -> MempoolForcedExitSender { let core_interaction_wrapper = MockCoreInteractionWrapper::default(); - let config = config.unwrap_or_else(ZkSyncConfig::from_env); + let config = config.unwrap_or_else(ForcedExitRequestsConfig::from_env); MempoolForcedExitSender::new( core_interaction_wrapper, @@ -297,18 +294,13 @@ mod test { async fn test_forced_exit_sender() { let day = chrono::Duration::days(1); - let config = ZkSyncConfig::from_env(); let forced_exit_requests = ForcedExitRequestsConfig { // There must be 10 digits in id digits_in_id: 10, - ..config.forced_exit_requests - }; - let config = ZkSyncConfig { - forced_exit_requests, - ..config + ..ForcedExitRequestsConfig::from_env() }; - let forced_exit_sender = get_test_forced_exit_sender(Some(config)); + let forced_exit_sender = get_test_forced_exit_sender(Some(forced_exit_requests)); add_request( &forced_exit_sender.core_interaction_wrapper.requests, diff --git a/core/bin/zksync_forced_exit_requests/src/lib.rs b/core/bin/zksync_forced_exit_requests/src/lib.rs index 1588238edc..d81f2379b5 100644 --- a/core/bin/zksync_forced_exit_requests/src/lib.rs +++ b/core/bin/zksync_forced_exit_requests/src/lib.rs @@ -1,10 +1,11 @@ use tokio::task::JoinHandle; -use zksync_config::ZkSyncConfig; +use zksync_config::{ContractsConfig, ForcedExitRequestsConfig}; use zksync_storage::ConnectionPool; use zksync_api::core_api_client::CoreApiClient; use forced_exit_sender::ForcedExitSender; +use zksync_config::configs::api::CommonApiConfig; mod core_interaction_wrapper; pub mod eth_watch; @@ -18,8 +19,19 @@ pub mod test; #[must_use] pub fn run_forced_exit_requests_actors( pool: ConnectionPool, - config: ZkSyncConfig, + private_url: String, + config: ForcedExitRequestsConfig, + common: CommonApiConfig, + contracts: ContractsConfig, + web3_url: String, ) -> JoinHandle<()> { - let core_api_client = CoreApiClient::new(config.api.private.url.clone()); - eth_watch::run_forced_exit_contract_watcher(core_api_client, pool, config) + let core_api_client = CoreApiClient::new(private_url); + eth_watch::run_forced_exit_contract_watcher( + core_api_client, + pool, + config, + common.forced_exit_minimum_account_age_secs, + contracts.forced_exit_addr, + web3_url, + ) } diff --git a/core/bin/zksync_forced_exit_requests/src/prepare_forced_exit_sender.rs b/core/bin/zksync_forced_exit_requests/src/prepare_forced_exit_sender.rs index 8afa2163ce..2e735996dc 100644 --- a/core/bin/zksync_forced_exit_requests/src/prepare_forced_exit_sender.rs +++ b/core/bin/zksync_forced_exit_requests/src/prepare_forced_exit_sender.rs @@ -1,6 +1,6 @@ use num::BigUint; use std::time::Duration; -use zksync_config::ZkSyncConfig; +use zksync_config::ForcedExitRequestsConfig; use zksync_storage::{ chain::operations_ext::records::TxReceiptResponse, ConnectionPool, StorageProcessor, }; @@ -24,18 +24,18 @@ use super::utils::{read_signing_key, Engine}; pub async fn prepare_forced_exit_sender_account( connection_pool: ConnectionPool, api_client: CoreApiClient, - config: &ZkSyncConfig, + config: &ForcedExitRequestsConfig, ) -> anyhow::Result { let mut storage = connection_pool .access_storage() .await .expect("forced_exit_requests: Failed to get the connection to storage"); - let sender_sk = hex::decode(&config.forced_exit_requests.sender_private_key[2..]) + let sender_sk = hex::decode(&config.sender_private_key[2..]) .expect("Failed to decode forced_exit_sender sk"); let sender_sk = read_signing_key(&sender_sk).expect("Failed to read forced exit sender sk"); - let sender_address = config.forced_exit_requests.sender_account_address; - let sender_eth_private_key = config.forced_exit_requests.sender_eth_private_key; + let sender_address = config.sender_account_address; + let sender_eth_private_key = config.sender_eth_private_key; let is_sender_prepared = check_forced_exit_sender_prepared(&mut storage, &sender_sk, sender_address) diff --git a/core/bin/zksync_witness_generator/src/lib.rs b/core/bin/zksync_witness_generator/src/lib.rs index 289c1140f8..c815f2749f 100644 --- a/core/bin/zksync_witness_generator/src/lib.rs +++ b/core/bin/zksync_witness_generator/src/lib.rs @@ -10,17 +10,19 @@ use actix_web_httpauth::extractors::{ AuthenticationError, }; use actix_web_httpauth::middleware::HttpAuthentication; -use futures::channel::mpsc; + use jsonwebtoken::errors::Error as JwtError; use jsonwebtoken::{decode, DecodingKey, Validation}; use serde::{Deserialize, Serialize}; use tokio::sync::RwLock; // Workspace deps -use zksync_config::ZkSyncConfig; +use zksync_config::ProverConfig; // Local deps use self::database_interface::DatabaseInterface; use self::scaler::ScalerOracle; +use tokio::task::JoinHandle; use zksync_circuit::serialization::ProverData; +use zksync_config::configs::api::ProverApiConfig; use zksync_prover_utils::api::{ JobRequestData, JobResultData, ProverInputRequest, ProverInputResponse, ProverOutputRequest, WorkingOn, @@ -32,7 +34,7 @@ use zksync_types::prover::{ ProverJobType, AGGREGATED_PROOF_JOB_PRIORITY, SINGLE_PROOF_JOB_PRIORITY, }; use zksync_types::BlockNumber; -use zksync_utils::panic_notify::ThreadPanicNotify; +use zksync_utils::panic_notify::{spawn_panic_handler, ThreadPanicNotify}; #[cfg(test)] mod tests; @@ -386,17 +388,17 @@ async fn update_prover_job_queue(database: DB) -> anyhow: pub fn run_prover_server( database: DB, - panic_notify: mpsc::Sender, - config: ZkSyncConfig, -) { - let witness_generator_opts = config.prover.witness_generator; - let core_opts = config.prover.core; - let prover_api_opts = config.api.prover; + prover_api_opts: ProverApiConfig, + prover_opts: ProverConfig, +) -> JoinHandle<()> { + let witness_generator_opts = prover_opts.witness_generator; + let core_opts = prover_opts.core; + let (handler, panic_sender) = spawn_panic_handler(); thread::Builder::new() .name("prover_server".to_string()) .spawn(move || { - let _panic_sentinel = ThreadPanicNotify(panic_notify.clone()); + let _panic_sentinel = ThreadPanicNotify(panic_sender.clone()); let actix_runtime = actix_rt::System::new(); actix_runtime.block_on(async move { @@ -433,7 +435,7 @@ pub fn run_prover_server( BlockNumber(start_block), BlockNumber(block_step), ); - pool_maintainer.start(panic_notify.clone()); + pool_maintainer.start(panic_sender.clone()); } // Start HTTP server. let secret_auth = prover_api_opts.secret_auth.clone(); @@ -475,4 +477,6 @@ pub fn run_prover_server( }) }) .expect("failed to start prover server"); + + handler } diff --git a/core/bin/zksync_witness_generator/src/main.rs b/core/bin/zksync_witness_generator/src/main.rs deleted file mode 100644 index 074dde99be..0000000000 --- a/core/bin/zksync_witness_generator/src/main.rs +++ /dev/null @@ -1,47 +0,0 @@ -use futures::{channel::mpsc, executor::block_on, SinkExt, StreamExt}; -use std::cell::RefCell; -use zksync_config::ZkSyncConfig; -use zksync_prometheus_exporter::run_prometheus_exporter; -use zksync_storage::ConnectionPool; -use zksync_witness_generator::database::Database; -use zksync_witness_generator::run_prover_server; - -#[tokio::main] -async fn main() -> anyhow::Result<()> { - // `witness_generator` doesn't require many connections to the database. - const WITNESS_GENERATOR_CONNECTION_POOL_SIZE: u32 = 2; - - let _sentry_guard = vlog::init(); - - // handle ctrl+c - let (stop_signal_sender, mut stop_signal_receiver) = mpsc::channel(256); - { - let stop_signal_sender = RefCell::new(stop_signal_sender.clone()); - ctrlc::set_handler(move || { - let mut sender = stop_signal_sender.borrow_mut(); - block_on(sender.send(true)).expect("crtlc signal send"); - }) - .expect("Error setting Ctrl-C handler"); - } - - let connection_pool = ConnectionPool::new(Some(WITNESS_GENERATOR_CONNECTION_POOL_SIZE)); - let database = Database::new(connection_pool.clone()); - let zksync_config = ZkSyncConfig::from_env(); - - // Run prometheus data exporter. - let (prometheus_task_handle, _) = - run_prometheus_exporter(connection_pool, zksync_config.api.prometheus.port, false); - - run_prover_server(database, stop_signal_sender, zksync_config); - - tokio::select! { - _ = async { prometheus_task_handle.await } => { - panic!("Prometheus exporter actors aren't supposed to finish their execution") - }, - _ = async { stop_signal_receiver.next().await } => { - vlog::warn!("Stop signal received, shutting down"); - } - }; - - Ok(()) -} diff --git a/core/bin/zksync_witness_generator/src/tests/prover_server.rs b/core/bin/zksync_witness_generator/src/tests/prover_server.rs index d3fb8af4af..1035673fd5 100644 --- a/core/bin/zksync_witness_generator/src/tests/prover_server.rs +++ b/core/bin/zksync_witness_generator/src/tests/prover_server.rs @@ -1,7 +1,7 @@ // Built-in deps -use std::{thread, time::Duration}; +use std::time::Duration; // External deps -use futures::channel::mpsc; + use num::BigUint; // Workspace deps use zksync_config::ZkSyncConfig; @@ -39,10 +39,13 @@ impl Default for MockProverOptions { async fn spawn_server(database: MockDatabase) { let prover_options = MockProverOptions::default(); - let (tx, _rx) = mpsc::channel(1); - thread::spawn(move || { - run_prover_server(database, tx, prover_options.0); + tokio::spawn({ + run_prover_server( + database, + prover_options.0.api.prover, + prover_options.0.prover, + ) }); } diff --git a/core/lib/config/src/configs/api.rs b/core/lib/config/src/configs/api.rs index 77d1167506..203961c1ea 100644 --- a/core/lib/config/src/configs/api.rs +++ b/core/lib/config/src/configs/api.rs @@ -11,21 +11,21 @@ use crate::envy_load; #[derive(Debug, Deserialize, Clone, PartialEq)] pub struct ApiConfig { /// Common configuration options for the API. - pub common: Common, + pub common: CommonApiConfig, /// Configuration options for the Admin API server. - pub admin: AdminApi, + pub admin: AdminApiConfig, /// Configuration options for the REST API server. - pub rest: RestApi, + pub rest: RestApiConfig, /// Configuration options for the JSON RPC servers. - pub json_rpc: JsonRpc, + pub json_rpc: JsonRpcConfig, /// Configuration options for the web3 JSON RPC server. - pub web3: Web3, + pub web3: Web3Config, /// Configuration options for the private core API. - pub private: PrivateApi, + pub private: PrivateApiConfig, /// Configuration options for the prover server. - pub prover: ProverApi, + pub prover: ProverApiConfig, /// Configuration options for the Prometheus exporter. - pub prometheus: Prometheus, + pub prometheus: PrometheusConfig, } impl ApiConfig { @@ -43,9 +43,56 @@ impl ApiConfig { } } +impl CommonApiConfig { + pub fn from_env() -> Self { + envy_load!("common", "API_COMMON_") + } +} + +impl AdminApiConfig { + pub fn from_env() -> Self { + envy_load!("admin", "API_ADMIN_") + } +} + +impl RestApiConfig { + pub fn from_env() -> Self { + envy_load!("rest", "API_REST_") + } +} + +impl JsonRpcConfig { + pub fn from_env() -> Self { + envy_load!("json_rpc", "API_JSON_RPC_") + } +} + +impl Web3Config { + pub fn from_env() -> Self { + envy_load!("web3", "API_WEB3_") + } +} + +impl PrivateApiConfig { + pub fn from_env() -> Self { + envy_load!("private", "API_PRIVATE_") + } +} +impl ProverApiConfig { + pub fn from_env() -> Self { + envy_load!("prover", "API_PROVER_") + } +} + +impl PrometheusConfig { + pub fn from_env() -> Self { + envy_load!("prometheus", "API_PROMETHEUS_") + } +} + // Common configuration options for the API #[derive(Debug, Deserialize, Clone, PartialEq)] -pub struct Common { +pub struct CommonApiConfig { // Size of LRU caches for requests pub caches_size: usize, // Determines the required minimum account age for `ForcedExit` operation to be allowed. @@ -60,7 +107,7 @@ pub struct Common { } #[derive(Debug, Deserialize, Clone, PartialEq)] -pub struct AdminApi { +pub struct AdminApiConfig { /// Port to which the API server is listening. pub port: u16, /// URL to access API server. @@ -69,14 +116,14 @@ pub struct AdminApi { pub secret_auth: String, } -impl AdminApi { +impl AdminApiConfig { pub fn bind_addr(&self) -> SocketAddr { SocketAddr::new("0.0.0.0".parse().unwrap(), self.port) } } #[derive(Debug, Deserialize, Clone, PartialEq)] -pub struct ProverApi { +pub struct ProverApiConfig { /// Port to which the API server is listening. pub port: u16, /// URL to access API server. @@ -85,42 +132,42 @@ pub struct ProverApi { pub secret_auth: String, } -impl ProverApi { +impl ProverApiConfig { pub fn bind_addr(&self) -> SocketAddr { SocketAddr::new("0.0.0.0".parse().unwrap(), self.port) } } #[derive(Debug, Deserialize, Clone, PartialEq)] -pub struct PrivateApi { +pub struct PrivateApiConfig { /// Port to which the API server is listening. pub port: u16, /// URL to access API server. pub url: String, } -impl PrivateApi { +impl PrivateApiConfig { pub fn bind_addr(&self) -> SocketAddr { SocketAddr::new("0.0.0.0".parse().unwrap(), self.port) } } #[derive(Debug, Deserialize, Clone, PartialEq)] -pub struct RestApi { +pub struct RestApiConfig { /// Port to which the API server is listening. pub port: u16, /// URL to access API server. pub url: String, } -impl RestApi { +impl RestApiConfig { pub fn bind_addr(&self) -> SocketAddr { SocketAddr::new("0.0.0.0".parse().unwrap(), self.port) } } #[derive(Debug, Deserialize, Clone, PartialEq)] -pub struct JsonRpc { +pub struct JsonRpcConfig { /// Port to which the HTTP RPC server is listening. pub http_port: u16, /// URL to access HTTP RPC server. @@ -131,7 +178,7 @@ pub struct JsonRpc { pub ws_url: String, } -impl JsonRpc { +impl JsonRpcConfig { pub fn http_bind_addr(&self) -> SocketAddr { SocketAddr::new("0.0.0.0".parse().unwrap(), self.http_port) } @@ -142,23 +189,24 @@ impl JsonRpc { } #[derive(Debug, Deserialize, Clone, PartialEq)] -pub struct Web3 { +pub struct Web3Config { /// Port to which the web3 JSON RPC server is listening. pub port: u16, /// URL to access web3 JSON RPC server. pub url: String, /// Max difference between blocks in `eth_getLogs` method. pub max_block_range: u32, + pub chain_id: u32, } -impl Web3 { +impl Web3Config { pub fn bind_addr(&self) -> SocketAddr { SocketAddr::new("0.0.0.0".parse().unwrap(), self.port) } } #[derive(Debug, Deserialize, Clone, PartialEq)] -pub struct Prometheus { +pub struct PrometheusConfig { /// Port to which the Prometheus exporter server is listening. pub port: u16, } @@ -171,7 +219,7 @@ mod tests { fn expected_config() -> ApiConfig { ApiConfig { - common: Common { + common: CommonApiConfig { caches_size: 10_000, forced_exit_minimum_account_age_secs: 0, enforce_pubkey_change_fee: true, @@ -179,36 +227,37 @@ mod tests { max_number_of_authors_per_batch: 10, fee_free_accounts: vec![AccountId(4078), AccountId(387)], }, - admin: AdminApi { + admin: AdminApiConfig { port: 8080, url: "http://127.0.0.1:8080".into(), secret_auth: "sample".into(), }, - rest: RestApi { + rest: RestApiConfig { port: 3001, url: "http://127.0.0.1:3001".into(), }, - json_rpc: JsonRpc { + json_rpc: JsonRpcConfig { http_port: 3030, http_url: "http://127.0.0.1:3030".into(), ws_port: 3031, ws_url: "ws://127.0.0.1:3031".into(), }, - web3: Web3 { + web3: Web3Config { port: 3002, url: "http://127.0.0.1:3002".into(), max_block_range: 10, + chain_id: 240, }, - private: PrivateApi { + private: PrivateApiConfig { port: 8090, url: "http://127.0.0.1:8090".into(), }, - prover: ProverApi { + prover: ProverApiConfig { port: 8088, url: "http://127.0.0.1:8088".into(), secret_auth: "sample".into(), }, - prometheus: Prometheus { port: 3312 }, + prometheus: PrometheusConfig { port: 3312 }, } } @@ -232,6 +281,7 @@ API_JSON_RPC_WS_PORT="3031" API_JSON_RPC_WS_URL="ws://127.0.0.1:3031" API_WEB3_PORT="3002" API_WEB3_URL="http://127.0.0.1:3002" +API_WEB3_CHAIN_ID="240" API_WEB3_MAX_BLOCK_RANGE="10" API_PRIVATE_PORT="8090" API_PRIVATE_URL="http://127.0.0.1:8090" diff --git a/core/lib/config/src/configs/chain.rs b/core/lib/config/src/configs/chain.rs index c05e030c18..c7e638109b 100644 --- a/core/lib/config/src/configs/chain.rs +++ b/core/lib/config/src/configs/chain.rs @@ -26,6 +26,12 @@ impl ChainConfig { state_keeper: envy_load!("state_keeper", "CHAIN_STATE_KEEPER_"), } } + pub fn max_blocks_to_aggregate(&self) -> u32 { + std::cmp::max( + self.state_keeper.max_aggregated_blocks_to_commit, + self.state_keeper.max_aggregated_blocks_to_execute, + ) as u32 + } } #[derive(Debug, Deserialize, Clone, PartialEq)] diff --git a/core/lib/eth_client/src/ethereum_gateway.rs b/core/lib/eth_client/src/ethereum_gateway.rs index 6757e7e6d1..6bf2d9bf59 100644 --- a/core/lib/eth_client/src/ethereum_gateway.rs +++ b/core/lib/eth_client/src/ethereum_gateway.rs @@ -4,7 +4,7 @@ use web3::transports::Http; use web3::types::{Address, BlockId, Filter, Log, Transaction, U64}; use std::fmt::Debug; -use zksync_config::ZkSyncConfig; +use zksync_config::{ETHClientConfig, ETHSenderConfig}; use zksync_contracts::zksync_contract; use zksync_eth_signer::PrivateKeySigner; use zksync_types::{TransactionReceipt, H160, H256, U256}; @@ -49,35 +49,39 @@ pub enum EthereumGateway { } impl EthereumGateway { - pub fn from_config(config: &ZkSyncConfig) -> Self { - if config.eth_client.web3_url.len() == 1 { - let transport = web3::transports::Http::new(&config.eth_client.web3_url()).unwrap(); + pub fn from_config( + eth_client_config: ÐClientConfig, + eth_sender_config: ÐSenderConfig, + main_contract: Address, + ) -> Self { + if eth_client_config.web3_url.len() == 1 { + let transport = web3::transports::Http::new(ð_client_config.web3_url()).unwrap(); EthereumGateway::Direct(ETHDirectClient::new( transport, zksync_contract(), - config.eth_sender.sender.operator_commit_eth_addr, - PrivateKeySigner::new(config.eth_sender.sender.operator_private_key), - config.contracts.contract_addr, - config.eth_client.chain_id, - config.eth_client.gas_price_factor, + eth_sender_config.sender.operator_commit_eth_addr, + PrivateKeySigner::new(eth_sender_config.sender.operator_private_key), + main_contract, + eth_client_config.chain_id, + eth_client_config.gas_price_factor, )) } else { let mut client = MultiplexerEthereumClient::new(); let contract = zksync_contract(); - for web3_url in config.eth_client.web3_url.iter() { - let transport = web3::transports::Http::new(web3_url).unwrap(); + for web3_url in eth_client_config.web3_url.iter().cloned() { + let transport = web3::transports::Http::new(&web3_url).unwrap(); client.add_client( - web3_url.clone(), + web3_url, ETHDirectClient::new( transport, contract.clone(), - config.eth_sender.sender.operator_commit_eth_addr, - PrivateKeySigner::new(config.eth_sender.sender.operator_private_key), - config.contracts.contract_addr, - config.eth_client.chain_id, - config.eth_client.gas_price_factor, + eth_sender_config.sender.operator_commit_eth_addr, + PrivateKeySigner::new(eth_sender_config.sender.operator_private_key), + main_contract, + eth_client_config.chain_id, + eth_client_config.gas_price_factor, ), ); } diff --git a/core/lib/gateway_watcher/src/multiplexed_gateway_watcher.rs b/core/lib/gateway_watcher/src/multiplexed_gateway_watcher.rs index 28268a32e8..89d877cd7a 100644 --- a/core/lib/gateway_watcher/src/multiplexed_gateway_watcher.rs +++ b/core/lib/gateway_watcher/src/multiplexed_gateway_watcher.rs @@ -7,7 +7,7 @@ use tokio::{task::JoinHandle, time}; use tokio_stream::wrappers::IntervalStream; use web3::types::{Block, BlockId, BlockNumber, H256, U64}; -use zksync_config::ZkSyncConfig; +use zksync_config::GatewayWatcherConfig; use zksync_eth_client::{EthereumGateway, MultiplexerEthereumClient}; use zksync_utils::retry_opt; @@ -209,15 +209,15 @@ impl MultiplexedGatewayWatcher { #[must_use] pub fn run_multiplexed_gateway_watcher( eth_gateway: EthereumGateway, - config: &ZkSyncConfig, + config: &GatewayWatcherConfig, ) -> JoinHandle<()> { let gateway_watcher = MultiplexedGatewayWatcher::new( eth_gateway, - config.gateway_watcher.check_interval(), - config.gateway_watcher.retry_delay(), - config.gateway_watcher.request_timeout(), - Some(config.gateway_watcher.request_per_task_limit()), - Some(config.gateway_watcher.task_limit()), + config.check_interval(), + config.retry_delay(), + config.request_timeout(), + Some(config.request_per_task_limit()), + Some(config.task_limit()), ); tokio::spawn(gateway_watcher.run()) @@ -227,7 +227,7 @@ pub fn run_multiplexed_gateway_watcher( #[must_use] pub fn run_gateway_watcher_if_multiplexed( eth_gateway: EthereumGateway, - config: &ZkSyncConfig, + config: &GatewayWatcherConfig, ) -> Option> { if eth_gateway.is_multiplexed() { Some(run_multiplexed_gateway_watcher(eth_gateway, config)) diff --git a/core/lib/utils/Cargo.toml b/core/lib/utils/Cargo.toml index 2d3ff714c1..1df04a2685 100644 --- a/core/lib/utils/Cargo.toml +++ b/core/lib/utils/Cargo.toml @@ -16,7 +16,7 @@ serde = { version = "1.0", features = ["derive"] } anyhow = "1.0" futures = "0.3" hex = "0.4" +tokio = { version = "1", features = ["full"] } [dev-dependencies] serde_json = "1.0.0" -tokio = { version = "1", features = ["full"] } diff --git a/core/lib/utils/src/panic_notify.rs b/core/lib/utils/src/panic_notify.rs index 0e860ef2e8..3d6bf87de1 100644 --- a/core/lib/utils/src/panic_notify.rs +++ b/core/lib/utils/src/panic_notify.rs @@ -1,6 +1,7 @@ // Built-in deps // External uses -use futures::{channel::mpsc, executor::block_on, SinkExt}; +use futures::{channel::mpsc, executor::block_on, SinkExt, StreamExt}; +use tokio::task::JoinHandle; // Local uses /// If its placed inside thread::spawn closure it will notify channel when this thread panics. @@ -13,3 +14,12 @@ impl Drop for ThreadPanicNotify { } } } + +pub fn spawn_panic_handler() -> (JoinHandle<()>, mpsc::Sender) { + let (panic_sender, mut panic_receiver) = mpsc::channel(1); + + let handler = tokio::spawn(async move { + panic_receiver.next().await.unwrap(); + }); + (handler, panic_sender) +} diff --git a/etc/env/base/api.toml b/etc/env/base/api.toml index 10d587a189..29209e326f 100644 --- a/etc/env/base/api.toml +++ b/etc/env/base/api.toml @@ -43,6 +43,7 @@ ws_url="ws://127.0.0.1:3031" port=3002 url="http://127.0.0.1:3002" max_block_range=10 +chain_id=240 # Configuration for the core private server. [api.private] diff --git a/infrastructure/zk/src/server.ts b/infrastructure/zk/src/server.ts index 4e552bf23b..fe8ecb3df0 100644 --- a/infrastructure/zk/src/server.ts +++ b/infrastructure/zk/src/server.ts @@ -6,6 +6,24 @@ import * as db from './db/db'; import { ethers } from 'ethers'; +export async function core() { + prepareForcedExitRequestAccount(); + + await utils.spawn( + 'cargo run --bin zksync_server --release -- --components=eth-sender,witness-generator,forced-exit,prometheus,core,rejected-task-cleaner' + ); +} + +export async function web3Node() { + await utils.spawn('cargo run --bin zksync_server --release -- --components=web3-api'); +} + +export async function apiNode() { + await utils.spawn( + 'cargo run --bin zksync_server --release -- --components=web3-api,rest-api,rpc-api,rpc-websocket-api' + ); +} + export async function server() { // By the time this function is run the server is most likely not be running yet // However, it does not matter, since the only thing the function does is depositing @@ -90,3 +108,7 @@ export const command = new Command('server') await server(); } }); + +command.command('api').description('start api node').action(apiNode); +command.command('web3').description('start web3 node').action(web3Node); +command.command('core').description('start core').action(core); diff --git a/sdk/zksync-crypto/src/lib.rs b/sdk/zksync-crypto/src/lib.rs index 7672c94245..4291c43c90 100644 --- a/sdk/zksync-crypto/src/lib.rs +++ b/sdk/zksync-crypto/src/lib.rs @@ -200,8 +200,8 @@ pub fn verify_musig(msg: &[u8], signature: &[u8]) -> Result { &msg, &signature, FixedGenerators::SpendingKeyGenerator, - &rescue_params, - &jubjub_params, + rescue_params, + jubjub_params, ) }) }); diff --git a/sdk/zksync-crypto/src/utils.rs b/sdk/zksync-crypto/src/utils.rs index 07edcc781e..4268c09a1b 100644 --- a/sdk/zksync-crypto/src/utils.rs +++ b/sdk/zksync-crypto/src/utils.rs @@ -91,7 +91,7 @@ fn rescue_hash_fr(input: Vec) -> Fr { fn rescue_hash_elements(input: &[Fr]) -> Fr { RESCUE_PARAMS.with(|params| { - let sponge_output = rescue_hash::(params, &input); + let sponge_output = rescue_hash::(params, input); assert_eq!(sponge_output.len(), 1, "rescue hash problem"); sponge_output[0] })