diff --git a/common/version/version.go b/common/version/version.go index ebbd083707..7a3b2af81e 100644 --- a/common/version/version.go +++ b/common/version/version.go @@ -5,7 +5,7 @@ import ( "runtime/debug" ) -var tag = "v4.4.83" +var tag = "v4.4.86" var commit = func() string { if info, ok := debug.ReadBuildInfo(); ok { diff --git a/prover/Cargo.lock b/prover/Cargo.lock index cff651cc37..1defcdc113 100644 --- a/prover/Cargo.lock +++ b/prover/Cargo.lock @@ -441,6 +441,55 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" +[[package]] +name = "axum" +version = "0.6.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" +dependencies = [ + "async-trait", + "axum-core", + "bitflags 1.3.2", + "bytes", + "futures-util", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.28", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 0.2.12", + "http-body 0.4.6", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.71" @@ -501,6 +550,26 @@ dependencies = [ "serde", ] +[[package]] +name = "bindgen" +version = "0.64.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4243e6031260db77ede97ad86c27e501d646a27ab57b59a574f725d98ab1fb4" +dependencies = [ + "bitflags 1.3.2", + "cexpr", + "clang-sys", + "lazy_static", + "lazycell", + "peeking_take_while", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "syn 1.0.109", +] + [[package]] name = "bit-set" version = "0.5.3" @@ -703,6 +772,17 @@ dependencies = [ "serde", ] +[[package]] +name = "bzip2-sys" +version = "0.1.11+1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "736a955f3fa7875102d57c82b8cac37ec45224a07fd32d58f9f7a186b6cd4cdc" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "c-kzg" version = "1.0.2" @@ -728,6 +808,15 @@ dependencies = [ "once_cell", ] +[[package]] +name = "cexpr" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" +dependencies = [ + "nom", +] + [[package]] name = "cfg-if" version = "0.1.10" @@ -764,6 +853,17 @@ dependencies = [ "inout", ] +[[package]] +name = "clang-sys" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4" +dependencies = [ + "glob", + "libc", + "libloading", +] + [[package]] name = "clap" version = "4.5.4" @@ -1136,6 +1236,12 @@ dependencies = [ "subtle", ] +[[package]] +name = "dotenv" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" + [[package]] name = "dotenvy" version = "0.15.7" @@ -2551,18 +2657,61 @@ dependencies = [ "spin 0.5.2", ] +[[package]] +name = "lazycell" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" + [[package]] name = "libc" version = "0.2.155" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" +[[package]] +name = "libloading" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34" +dependencies = [ + "cfg-if 1.0.0", + "windows-targets 0.48.5", +] + [[package]] name = "libm" version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" +[[package]] +name = "librocksdb-sys" +version = "0.10.0+7.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fe4d5874f5ff2bc616e55e8c6086d478fcda13faf9495768a4aa1c22042d30b" +dependencies = [ + "bindgen", + "bzip2-sys", + "cc", + "glob", + "libc", + "libz-sys", + "lz4-sys", + "zstd-sys 2.0.13+zstd.1.5.6", +] + +[[package]] +name = "libz-sys" +version = "1.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2d16453e800a8cf6dd2fc3eb4bc99b786a9b90c663b8559a5b1a041bf89e472" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linux-raw-sys" version = "0.4.14" @@ -2612,6 +2761,31 @@ dependencies = [ "winapi", ] +[[package]] +name = "lz4-sys" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "109de74d5d2353660401699a4174a4ff23fcc649caf553df71933c7fb45ad868" +dependencies = [ + "cc", + "libc", +] + +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "maybe-rayon" version = "0.1.1" @@ -2634,6 +2808,12 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.7.3" @@ -2746,6 +2926,26 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num" version = "0.4.3" @@ -2974,6 +3174,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "pairing" version = "0.23.0" @@ -3097,6 +3303,12 @@ dependencies = [ "hmac", ] +[[package]] +name = "peeking_take_while" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" + [[package]] name = "percent-encoding" version = "2.3.1" @@ -3267,7 +3479,7 @@ dependencies = [ "rand", "rand_chacha", "rand_xorshift", - "regex-syntax", + "regex-syntax 0.8.3", "rusty-fork", "tempfile", "unarray", @@ -3278,6 +3490,7 @@ name = "prover" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "base64 0.13.1", "clap", "ctor 0.2.8", @@ -3298,6 +3511,7 @@ dependencies = [ "reqwest-middleware", "reqwest-retry", "rlp", + "scroll-proving-sdk", "serde", "serde_json", "sled", @@ -3489,8 +3703,17 @@ checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c" dependencies = [ "aho-corasick", "memchr", - "regex-automata", - "regex-syntax", + "regex-automata 0.4.6", + "regex-syntax 0.8.3", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", ] [[package]] @@ -3501,9 +3724,15 @@ checksum = "86b83b8b9847f9bf95ef68afb0b8e6cdb80f498442f5179a29fad448fcc1eaea" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.8.3", ] +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + [[package]] name = "regex-syntax" version = "0.8.3" @@ -3787,6 +4016,16 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "rocksdb" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "015439787fce1e75d55f279078d33ff14b4af5d93d995e8838ee4631301c8a99" +dependencies = [ + "libc", + "librocksdb-sys", +] + [[package]] name = "ruint" version = "1.12.1" @@ -3995,6 +4234,45 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "scroll-proving-sdk" +version = "0.1.0" +source = "git+https://github.com/scroll-tech/scroll-proving-sdk.git?rev=7d010b0#7d010b0c4670d6d3546fd72c449b771464b32a12" +dependencies = [ + "anyhow", + "async-trait", + "axum", + "base64 0.13.1", + "clap", + "ctor 0.2.8", + "dotenv", + "env_logger 0.11.3", + "eth-keystore", + "ethers-core 2.0.7 (git+https://github.com/scroll-tech/ethers-rs.git?branch=v2.0.7)", + "ethers-providers 2.0.7 (git+https://github.com/scroll-tech/ethers-rs.git?branch=v2.0.7)", + "futures", + "halo2_proofs", + "hex", + "http 1.1.0", + "log", + "once_cell", + "prover 0.13.0", + "rand", + "reqwest 0.12.4", + "reqwest-middleware", + "reqwest-retry", + "rlp", + "rocksdb", + "serde", + "serde_json", + "sled", + "snark-verifier-sdk", + "tiny-keccak", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "scrypt" version = "0.10.0" @@ -4167,6 +4445,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af99884400da37c88f5e9146b7f1fd0fbcae8f6eec4e9da38b67d05486f814a6" +dependencies = [ + "itoa", + "serde", +] + [[package]] name = "serde_stacker" version = "0.1.11" @@ -4265,6 +4553,30 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + +[[package]] +name = "signal-hook-registry" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" +dependencies = [ + "libc", +] + [[package]] name = "signature" version = "2.2.0" @@ -4580,6 +4892,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "thread_local" +version = "1.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" +dependencies = [ + "cfg-if 1.0.0", + "once_cell", +] + [[package]] name = "threadpool" version = "1.8.1" @@ -4623,11 +4945,26 @@ dependencies = [ "bytes", "libc", "mio", + "num_cpus", + "parking_lot 0.12.3", "pin-project-lite", + "signal-hook-registry", "socket2", + "tokio-macros", "windows-sys 0.48.0", ] +[[package]] +name = "tokio-macros" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.66", +] + [[package]] name = "tokio-native-tls" version = "0.3.1" @@ -4717,6 +5054,7 @@ dependencies = [ "tokio", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -4737,6 +5075,7 @@ version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -4760,6 +5099,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", + "valuable", ] [[package]] @@ -4772,6 +5112,35 @@ dependencies = [ "tracing", ] +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", +] + [[package]] name = "try-lock" version = "0.2.5" @@ -5478,7 +5847,7 @@ name = "zstd-safe" version = "7.0.0" source = "git+https://github.com/scroll-tech/zstd-rs?branch=hack/mul-block#5c0892b6567dab31394d701477183ce9d6a32aca" dependencies = [ - "zstd-sys", + "zstd-sys 2.0.9+zstd.1.5.5", ] [[package]] @@ -5489,3 +5858,13 @@ dependencies = [ "cc", "pkg-config", ] + +[[package]] +name = "zstd-sys" +version = "2.0.13+zstd.1.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38ff0f21cfee8f97d94cef41359e0c89aa6113028ab0291aa8ca0038995a95aa" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/prover/Cargo.toml b/prover/Cargo.toml index 36db6a39b2..92592355b9 100644 --- a/prover/Cargo.toml +++ b/prover/Cargo.toml @@ -31,6 +31,7 @@ halo2_proofs = { git = "https://github.com/scroll-tech/halo2.git", branch = "v1. snark-verifier-sdk = { git = "https://github.com/scroll-tech/snark-verifier", branch = "develop", default-features = false, features = ["loader_halo2", "loader_evm", "halo2-pse"] } prover_darwin = { git = "https://github.com/scroll-tech/zkevm-circuits.git", tag = "v0.12.2", package = "prover", default-features = false, features = ["parallel_syn", "scroll"] } prover_darwin_v2 = { git = "https://github.com/scroll-tech/zkevm-circuits.git", tag = "v0.13.1", package = "prover", default-features = false, features = ["parallel_syn", "scroll"] } +scroll-proving-sdk = { git = "https://github.com/scroll-tech/scroll-proving-sdk.git", rev = "7d010b0"} base64 = "0.13.1" reqwest = { version = "0.12.4", features = ["gzip"] } reqwest-middleware = "0.3" @@ -42,6 +43,7 @@ rand = "0.8.5" eth-keystore = "0.5.0" rlp = "0.5.2" tokio = "1.37.0" +async-trait = "0.1" sled = "0.34.7" http = "1.1.0" clap = { version = "4.5", features = ["derive"] } diff --git a/prover/config.json b/prover/config.json index 0a816360d5..7247ce49b1 100644 --- a/prover/config.json +++ b/prover/config.json @@ -3,7 +3,7 @@ "keystore_path": "keystore.json", "keystore_password": "prover-pwd", "db_path": "unique-db-path-for-prover-1", - "prover_type": 2, + "prover_types": [2], "low_version_circuit": { "hard_fork_name": "bernoulli", "params_path": "params", diff --git a/prover/src/config.rs b/prover/src/config.rs index 4e3c1f2ccc..4effb958d0 100644 --- a/prover/src/config.rs +++ b/prover/src/config.rs @@ -1,55 +1,4 @@ use anyhow::{bail, Result}; -use serde::{Deserialize, Serialize}; -use std::fs::File; - -use crate::types::ProverType; - -#[derive(Debug, Serialize, Deserialize)] -pub struct CircuitConfig { - pub hard_fork_name: String, - pub params_path: String, - pub assets_path: String, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct CoordinatorConfig { - pub base_url: String, - pub retry_count: u32, - pub retry_wait_time_sec: u64, - pub connection_timeout_sec: u64, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct L2GethConfig { - pub endpoint: String, -} - -#[derive(Debug, Deserialize)] -pub struct Config { - pub prover_name: String, - pub keystore_path: String, - pub keystore_password: String, - pub db_path: String, - pub prover_type: ProverType, - pub low_version_circuit: CircuitConfig, - pub high_version_circuit: CircuitConfig, - pub coordinator: CoordinatorConfig, - pub l2geth: Option, -} - -impl Config { - pub fn from_reader(reader: R) -> Result - where - R: std::io::Read, - { - serde_json::from_reader(reader).map_err(|e| anyhow::anyhow!(e)) - } - - pub fn from_file(file_name: String) -> Result { - let file = File::open(file_name)?; - Config::from_reader(&file) - } -} static SCROLL_PROVER_ASSETS_DIR_ENV_NAME: &str = "SCROLL_PROVER_ASSETS_DIR"; static mut SCROLL_PROVER_ASSETS_DIRS: Vec = vec![]; diff --git a/prover/src/coordinator_client.rs b/prover/src/coordinator_client.rs deleted file mode 100644 index 46067d7ccf..0000000000 --- a/prover/src/coordinator_client.rs +++ /dev/null @@ -1,142 +0,0 @@ -mod api; -mod errors; -pub mod listener; -pub mod types; - -use anyhow::{bail, Context, Ok, Result}; -use std::rc::Rc; - -use api::Api; -use errors::*; -use listener::Listener; -use tokio::runtime::Runtime; -use types::*; - -use crate::{config::Config, key_signer::KeySigner}; - -pub use errors::ProofStatusNotOKError; - -pub struct CoordinatorClient<'a> { - api: Api, - token: Option, - config: &'a Config, - key_signer: Rc, - rt: Runtime, - listener: Box, - vks: Vec, -} - -impl<'a> CoordinatorClient<'a> { - pub fn new( - config: &'a Config, - key_signer: Rc, - listener: Box, - vks: Vec, - ) -> Result { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build()?; - - let api = Api::new( - &config.coordinator.base_url, - core::time::Duration::from_secs(config.coordinator.connection_timeout_sec), - config.coordinator.retry_count, - config.coordinator.retry_wait_time_sec, - )?; - let mut client = Self { - api, - token: None, - config, - key_signer, - rt, - listener, - vks, - }; - client.login()?; - Ok(client) - } - - fn login(&mut self) -> Result<()> { - let api = &self.api; - let challenge_response = self.rt.block_on(api.challenge())?; - if challenge_response.errcode != ErrorCode::Success { - bail!("challenge failed: {}", challenge_response.errmsg) - } - let mut token: String; - if let Some(r) = challenge_response.data { - token = r.token; - } else { - bail!("challenge failed: got empty token") - } - - let login_message = LoginMessage { - challenge: token.clone(), - prover_name: self.config.prover_name.clone(), - prover_version: crate::version::get_version(), - prover_types: vec![self.config.prover_type], - vks: self.vks.clone(), - }; - - let buffer = rlp::encode(&login_message); - let signature = self.key_signer.sign_buffer(&buffer)?; - let login_request = LoginRequest { - message: login_message, - public_key: self.key_signer.get_public_key(), - signature, - }; - let login_response = self.rt.block_on(api.login(&login_request, &token))?; - if login_response.errcode != ErrorCode::Success { - bail!("login failed: {}", login_response.errmsg) - } - if let Some(r) = login_response.data { - token = r.token; - } else { - bail!("login failed: got empty token") - } - self.token = Some(token); - Ok(()) - } - - fn action_with_re_login(&mut self, req: &R, mut f: F) -> Result> - where - F: FnMut(&mut Self, &R) -> Result>, - { - let response = f(self, req)?; - if response.errcode == ErrorCode::ErrJWTTokenExpired { - log::info!("JWT expired, attempting to re-login"); - self.login().context("JWT expired, re-login failed")?; - log::info!("re-login success"); - return self.action_with_re_login(req, f); - } else if response.errcode != ErrorCode::Success { - bail!("action failed: {}", response.errmsg) - } - Ok(response) - } - - fn do_get_task(&mut self, req: &GetTaskRequest) -> Result> { - self.rt - .block_on(self.api.get_task(req, self.token.as_ref().unwrap())) - } - - pub fn get_task(&mut self, req: &GetTaskRequest) -> Result> { - self.action_with_re_login(req, |s, req| s.do_get_task(req)) - } - - fn do_submit_proof( - &mut self, - req: &SubmitProofRequest, - ) -> Result> { - let response = self - .rt - .block_on(self.api.submit_proof(req, self.token.as_ref().unwrap()))?; - self.listener.on_proof_submitted(req); - Ok(response) - } - - pub fn submit_proof( - &mut self, - req: &SubmitProofRequest, - ) -> Result> { - self.action_with_re_login(req, |s, req| s.do_submit_proof(req)) - } -} diff --git a/prover/src/coordinator_client/api.rs b/prover/src/coordinator_client/api.rs deleted file mode 100644 index 905a1e61c5..0000000000 --- a/prover/src/coordinator_client/api.rs +++ /dev/null @@ -1,144 +0,0 @@ -use crate::{coordinator_client::ProofStatusNotOKError, types::ProofStatus}; - -use super::{errors::*, types::*}; -use anyhow::{bail, Result}; -use core::time::Duration; -use reqwest::{header::CONTENT_TYPE, Url}; -use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; -use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware}; -use serde::Serialize; - -pub struct Api { - url_base: Url, - send_timeout: Duration, - pub client: ClientWithMiddleware, -} - -impl Api { - pub fn new( - url_base: &str, - send_timeout: Duration, - retry_count: u32, - retry_wait_time_sec: u64, - ) -> Result { - let retry_wait_duration = core::time::Duration::from_secs(retry_wait_time_sec); - let retry_policy = ExponentialBackoff::builder() - .retry_bounds(retry_wait_duration / 2, retry_wait_duration) - .build_with_max_retries(retry_count); - - let client = ClientBuilder::new(reqwest::Client::new()) - .with(RetryTransientMiddleware::new_with_policy(retry_policy)) - .build(); - - Ok(Self { - url_base: Url::parse(url_base)?, - send_timeout, - client, - }) - } - - pub async fn challenge(&self) -> Result> { - let method = "/coordinator/v1/challenge"; - let url = self.build_url(method)?; - - let response = self - .client - .get(url) - .header(CONTENT_TYPE, "application/json") - .timeout(self.send_timeout) - .send() - .await?; - - let response_body = response.text().await?; - - serde_json::from_str(&response_body).map_err(|e| anyhow::anyhow!(e)) - } - - pub async fn login( - &self, - req: &LoginRequest, - token: &String, - ) -> Result> { - let method = "/coordinator/v1/login"; - self.post_with_token(method, req, token).await - } - - pub async fn get_task( - &self, - req: &GetTaskRequest, - token: &String, - ) -> Result> { - let method = "/coordinator/v1/get_task"; - self.post_with_token(method, req, token).await - } - - pub async fn submit_proof( - &self, - req: &SubmitProofRequest, - token: &String, - ) -> Result> { - let method = "/coordinator/v1/submit_proof"; - let response = self - .post_with_token::>( - method, req, token, - ) - .await?; - - // when req's status already not ok, we mark the error returned from coordinator and will - // ignore it later. - if response.errcode == ErrorCode::ErrCoordinatorHandleZkProofFailure - && req.status != ProofStatus::Ok - && response - .errmsg - .contains("validator failure proof msg status not ok") - { - return Err(anyhow::anyhow!(ProofStatusNotOKError)); - } - Ok(response) - } - - async fn post_with_token( - &self, - method: &str, - req: &Req, - token: &String, - ) -> Result - where - Req: ?Sized + Serialize, - Resp: serde::de::DeserializeOwned, - { - let url = self.build_url(method)?; - let request_body = serde_json::to_string(req)?; - - log::info!("[coordinator client], {method}, request: {request_body}"); - let response = self - .client - .post(url) - .header(CONTENT_TYPE, "application/json") - .bearer_auth(token) - .body(request_body) - .timeout(self.send_timeout) - .send() - .await?; - - if response.status() != http::status::StatusCode::OK { - log::error!( - "[coordinator client], {method}, status not ok: {}", - response.status() - ); - bail!( - "[coordinator client], {method}, status not ok: {}", - response.status() - ) - } - - let response_body = response.text().await?; - - log::info!("[coordinator client], {method}, response: {response_body}"); - serde_json::from_str(&response_body).map_err(|e| anyhow::anyhow!(e)) - } - - fn build_url(&self, method: &str) -> Result { - self.url_base.join(method).map_err(|e| anyhow::anyhow!(e)) - } -} diff --git a/prover/src/coordinator_client/errors.rs b/prover/src/coordinator_client/errors.rs deleted file mode 100644 index 9bad256fac..0000000000 --- a/prover/src/coordinator_client/errors.rs +++ /dev/null @@ -1,65 +0,0 @@ -use serde::{Deserialize, Deserializer}; -use std::fmt; - -#[derive(Debug, Clone, Copy, PartialEq)] -pub enum ErrorCode { - Success, - InternalServerError, - - ErrProverStatsAPIParameterInvalidNo, - ErrProverStatsAPIProverTaskFailure, - ErrProverStatsAPIProverTotalRewardFailure, - - ErrCoordinatorParameterInvalidNo, - ErrCoordinatorGetTaskFailure, - ErrCoordinatorHandleZkProofFailure, - ErrCoordinatorEmptyProofData, - - ErrJWTCommonErr, - ErrJWTTokenExpired, - - Undefined(i32), -} - -impl ErrorCode { - fn from_i32(v: i32) -> Self { - match v { - 0 => ErrorCode::Success, - 500 => ErrorCode::InternalServerError, - 10001 => ErrorCode::ErrProverStatsAPIParameterInvalidNo, - 10002 => ErrorCode::ErrProverStatsAPIProverTaskFailure, - 10003 => ErrorCode::ErrProverStatsAPIProverTotalRewardFailure, - 20001 => ErrorCode::ErrCoordinatorParameterInvalidNo, - 20002 => ErrorCode::ErrCoordinatorGetTaskFailure, - 20003 => ErrorCode::ErrCoordinatorHandleZkProofFailure, - 20004 => ErrorCode::ErrCoordinatorEmptyProofData, - 50000 => ErrorCode::ErrJWTCommonErr, - 50001 => ErrorCode::ErrJWTTokenExpired, - _ => { - log::error!("get unexpected error code from coordinator: {v}"); - ErrorCode::Undefined(v) - } - } - } -} - -impl<'de> Deserialize<'de> for ErrorCode { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let v: i32 = i32::deserialize(deserializer)?; - Ok(ErrorCode::from_i32(v)) - } -} - -// ==================================================== - -#[derive(Debug, Clone)] -pub struct ProofStatusNotOKError; - -impl fmt::Display for ProofStatusNotOKError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "proof status not ok") - } -} diff --git a/prover/src/coordinator_client/listener.rs b/prover/src/coordinator_client/listener.rs deleted file mode 100644 index 9f9f70f606..0000000000 --- a/prover/src/coordinator_client/listener.rs +++ /dev/null @@ -1,5 +0,0 @@ -use super::SubmitProofRequest; - -pub trait Listener { - fn on_proof_submitted(&self, req: &SubmitProofRequest); -} diff --git a/prover/src/coordinator_client/types.rs b/prover/src/coordinator_client/types.rs deleted file mode 100644 index c646a9afd4..0000000000 --- a/prover/src/coordinator_client/types.rs +++ /dev/null @@ -1,86 +0,0 @@ -use super::errors::ErrorCode; -use crate::types::{ProofFailureType, ProofStatus, ProverType, TaskType}; -use rlp::{Encodable, RlpStream}; -use serde::{Deserialize, Serialize}; - -#[derive(Deserialize)] -pub struct Response { - pub errcode: ErrorCode, - pub errmsg: String, - pub data: Option, -} - -#[derive(Serialize, Deserialize)] -pub struct LoginMessage { - pub challenge: String, - pub prover_name: String, - pub prover_version: String, - pub prover_types: Vec, - pub vks: Vec, -} - -impl Encodable for LoginMessage { - fn rlp_append(&self, s: &mut RlpStream) { - let num_fields = 5; - s.begin_list(num_fields); - s.append(&self.challenge); - s.append(&self.prover_version); - s.append(&self.prover_name); - // The ProverType in go side is an type alias of uint8 - // A uint8 slice is treated as a string when doing the rlp encoding - let prover_types = self - .prover_types - .iter() - .map(|prover_type: &ProverType| prover_type.to_u8()) - .collect::>(); - s.append(&prover_types); - s.begin_list(self.vks.len()); - for vk in &self.vks { - s.append(vk); - } - } -} - -#[derive(Serialize, Deserialize)] -pub struct LoginRequest { - pub message: LoginMessage, - pub public_key: String, - pub signature: String, -} - -#[derive(Serialize, Deserialize)] -pub struct LoginResponseData { - pub time: String, - pub token: String, -} - -pub type ChallengeResponseData = LoginResponseData; - -#[derive(Default, Serialize, Deserialize)] -pub struct GetTaskRequest { - pub task_types: Vec, - pub prover_height: Option, -} - -#[derive(Serialize, Deserialize)] -pub struct GetTaskResponseData { - pub uuid: String, - pub task_id: String, - pub task_type: TaskType, - pub task_data: String, - pub hard_fork_name: String, -} - -#[derive(Serialize, Deserialize, Default)] -pub struct SubmitProofRequest { - pub uuid: String, - pub task_id: String, - pub task_type: TaskType, - pub status: ProofStatus, - pub proof: String, - pub failure_type: Option, - pub failure_msg: Option, -} - -#[derive(Serialize, Deserialize)] -pub struct SubmitProofResponseData {} diff --git a/prover/src/geth_client.rs b/prover/src/geth_client.rs deleted file mode 100644 index e617d8eba8..0000000000 --- a/prover/src/geth_client.rs +++ /dev/null @@ -1,57 +0,0 @@ -use crate::types::CommonHash; -use anyhow::Result; -use ethers_core::types::BlockNumber; -use tokio::runtime::Runtime; - -use serde::{de::DeserializeOwned, Serialize}; -use std::fmt::Debug; - -use ethers_providers::{Http, Provider}; - -pub struct GethClient { - id: String, - provider: Provider, - rt: Runtime, -} - -impl GethClient { - pub fn new(id: &str, api_url: &str) -> Result { - let provider = Provider::::try_from(api_url)?; - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build()?; - - Ok(Self { - id: id.to_string(), - provider, - rt, - }) - } - - pub fn get_block_trace_by_hash(&mut self, hash: &CommonHash) -> Result - where - T: Serialize + DeserializeOwned + Debug + Send, - { - log::info!( - "{}: calling get_block_trace_by_hash, hash: {:#?}", - self.id, - hash - ); - - let trace_future = self - .provider - .request("scroll_getBlockTraceByNumberOrHash", [format!("{hash:#x}")]); - - let trace = self.rt.block_on(trace_future)?; - Ok(trace) - } - - pub fn block_number(&mut self) -> Result { - log::info!("{}: calling block_number", self.id); - - let trace_future = self.provider.request("eth_blockNumber", ()); - - let trace = self.rt.block_on(trace_future)?; - Ok(trace) - } -} diff --git a/prover/src/key_signer.rs b/prover/src/key_signer.rs deleted file mode 100644 index 77707f41f1..0000000000 --- a/prover/src/key_signer.rs +++ /dev/null @@ -1,103 +0,0 @@ -use std::path::Path; - -use anyhow::Result; -use ethers_core::{ - k256::{ - ecdsa::{signature::hazmat::PrehashSigner, RecoveryId, Signature, SigningKey}, - elliptic_curve::{sec1::ToEncodedPoint, FieldBytes}, - PublicKey, Secp256k1, SecretKey, - }, - types::Signature as EthSignature, -}; - -use ethers_core::types::{H256, U256}; -use hex::ToHex; -use tiny_keccak::{Hasher, Keccak}; - -pub struct KeySigner { - public_key: PublicKey, - signer: SigningKey, -} - -impl KeySigner { - pub fn new(key_path: &str, passwd: &str) -> Result { - let p = Path::new(key_path); - - let secret = if !p.exists() { - log::info!("[key_signer] key_path not exists, create one"); - let dir = p.parent().unwrap(); - let name = p.file_name().and_then(|s| s.to_str()); - let mut rng = rand::thread_rng(); - let (secret, _) = eth_keystore::new(dir, &mut rng, passwd, name)?; - secret - } else { - log::info!("[key_signer] key_path already exists, load it"); - eth_keystore::decrypt_key(key_path, passwd).map_err(|e| anyhow::anyhow!(e))? - }; - - let secret_key = SecretKey::from_bytes(secret.as_slice().into())?; - - let signer = SigningKey::from(secret_key.clone()); - - Ok(Self { - public_key: secret_key.public_key(), - signer, - }) - } - - pub fn get_public_key(&self) -> String { - let v: Vec = Vec::from(self.public_key.to_encoded_point(true).as_bytes()); - buffer_to_hex(&v, false) - } - - /// Signs the provided hash. - pub fn sign_hash(&self, hash: H256) -> Result { - let signer = &self.signer as &dyn PrehashSigner<(Signature, RecoveryId)>; - let (recoverable_sig, recovery_id) = signer.sign_prehash(hash.as_ref())?; - - let v = u8::from(recovery_id) as u64; - - let r_bytes: FieldBytes = recoverable_sig.r().into(); - let s_bytes: FieldBytes = recoverable_sig.s().into(); - let r = U256::from_big_endian(r_bytes.as_slice()); - let s = U256::from_big_endian(s_bytes.as_slice()); - - Ok(EthSignature { r, s, v }) - } - - pub fn sign_buffer(&self, buffer: &T) -> Result - where - T: AsRef<[u8]>, - { - let pre_hash = keccak256(buffer); - - let hash = H256::from(pre_hash); - let sig = self.sign_hash(hash)?; - - Ok(buffer_to_hex(&sig.to_vec(), true)) - } -} - -fn buffer_to_hex(buffer: &T, has_prefix: bool) -> String -where - T: AsRef<[u8]>, -{ - if has_prefix { - format!("0x{}", buffer.encode_hex::()) - } else { - buffer.encode_hex::() - } -} - -/// Compute the Keccak-256 hash of input bytes. -/// -/// Note that strings are interpreted as UTF-8 bytes, -pub fn keccak256>(bytes: T) -> [u8; 32] { - let mut output = [0u8; 32]; - - let mut hasher = Keccak::v256(); - hasher.update(bytes.as_ref()); - hasher.finalize(&mut output); - - output -} diff --git a/prover/src/main.rs b/prover/src/main.rs index 75553187a9..d01d3144ff 100644 --- a/prover/src/main.rs +++ b/prover/src/main.rs @@ -2,26 +2,16 @@ #![feature(core_intrinsics)] mod config; -mod coordinator_client; -mod geth_client; -mod key_signer; mod prover; -mod task_cache; -mod task_processor; mod types; mod utils; mod version; mod zk_circuits_handler; -use anyhow::Result; use clap::{ArgAction, Parser}; -use config::{AssetsDirEnvConfig, Config}; -use prover::Prover; -use std::rc::Rc; -use task_cache::{ClearCacheCoordinatorListener, TaskCache}; -use task_processor::TaskProcessor; +use prover::LocalProver; +use scroll_proving_sdk::{config::Config, prover::ProverBuilder, utils::init_tracing}; -/// Simple program to greet a person #[derive(Parser, Debug)] #[clap(disable_version_flag = true)] struct Args { @@ -38,7 +28,10 @@ struct Args { log_file: Option, } -fn start() -> Result<()> { +#[tokio::main] +async fn main() -> anyhow::Result<()> { + init_tracing(); + let args = Args::parse(); if args.version { @@ -48,39 +41,19 @@ fn start() -> Result<()> { utils::log_init(args.log_file); - let config: Config = Config::from_file(args.config_file)?; - - if let Err(e) = AssetsDirEnvConfig::init() { - log::error!("AssetsDirEnvConfig init failed: {:#}", e); - std::process::exit(-2); - } - - let task_cache = Rc::new(TaskCache::new(&config.db_path)?); - - let coordinator_listener = Box::new(ClearCacheCoordinatorListener { - task_cache: task_cache.clone(), - }); - - let prover = Prover::new(&config, coordinator_listener)?; - - log::info!( - "prover start successfully. name: {}, type: {:?}, publickey: {}, version: {}", - config.prover_name, - config.prover_type, - prover.get_public_key(), - version::get_version(), + let cfg: Config = Config::from_file(args.config_file)?; + let local_prover = LocalProver::new( + cfg.prover + .local + .clone() + .ok_or_else(|| anyhow::anyhow!("Missing local prover configuration"))?, ); + let prover = ProverBuilder::new(cfg) + .with_proving_service(Box::new(local_prover)) + .build() + .await?; - let task_processor = TaskProcessor::new(&prover, task_cache); - - task_processor.start(); + prover.run().await; Ok(()) } - -fn main() { - let result = start(); - if let Err(e) = result { - log::error!("main exit with error {:#}", e) - } -} diff --git a/prover/src/prover.rs b/prover/src/prover.rs index 7de83906e0..b23adfdca4 100644 --- a/prover/src/prover.rs +++ b/prover/src/prover.rs @@ -1,170 +1,227 @@ -use anyhow::{bail, Context, Error, Ok, Result}; -use ethers_core::types::U64; - -use std::{cell::RefCell, rc::Rc}; - use crate::{ - config::Config, - coordinator_client::{listener::Listener, types::*, CoordinatorClient}, - geth_client::GethClient, - key_signer::KeySigner, - types::{ProofFailureType, ProofStatus, ProverType}, - utils::get_task_types, + utils::get_prover_type, zk_circuits_handler::{CircuitsHandler, CircuitsHandlerProvider}, }; +use anyhow::{Context, Result}; +use async_trait::async_trait; +use scroll_proving_sdk::{ + config::LocalProverConfig, + prover::{ + proving_service::{ + GetVkRequest, GetVkResponse, ProveRequest, ProveResponse, QueryTaskRequest, + QueryTaskResponse, TaskStatus, + }, + CircuitType, ProvingService, + }, +}; +use std::{ + sync::{Arc, Mutex}, + time::{SystemTime, UNIX_EPOCH}, +}; +use tokio::{runtime::Runtime, sync::RwLock, task::JoinHandle}; -use super::types::{ProofDetail, Task}; - -pub struct Prover<'a> { - config: &'a Config, - key_signer: Rc, - circuits_handler_provider: RefCell>, - coordinator_client: RefCell>, - geth_client: Option>>, +pub struct LocalProver { + config: LocalProverConfig, + circuits_handler_provider: RwLock, + current_task: Arc>>>>, + next_task_id: Arc>, } -impl<'a> Prover<'a> { - pub fn new(config: &'a Config, coordinator_listener: Box) -> Result { - let prover_type = config.prover_type; - let keystore_path = &config.keystore_path; - let keystore_password = &config.keystore_password; - - let geth_client = if config.prover_type == ProverType::Chunk { - Some(Rc::new(RefCell::new( - GethClient::new( - &config.prover_name, - &config.l2geth.as_ref().unwrap().endpoint, - ) - .context("failed to create l2 geth_client")?, - ))) - } else { - None - }; - - let provider = CircuitsHandlerProvider::new(prover_type, config, geth_client.clone()) - .context("failed to create circuits handler provider")?; - - let vks = provider.init_vks(prover_type, config, geth_client.clone()); - - let key_signer = Rc::new(KeySigner::new(keystore_path, keystore_password)?); - let coordinator_client = - CoordinatorClient::new(config, Rc::clone(&key_signer), coordinator_listener, vks) - .context("failed to create coordinator_client")?; - - let prover = Prover { - config, - key_signer: Rc::clone(&key_signer), - circuits_handler_provider: RefCell::new(provider), - coordinator_client: RefCell::new(coordinator_client), - geth_client, - }; - - Ok(prover) - } - - pub fn get_public_key(&self) -> String { - self.key_signer.get_public_key() +#[async_trait] +impl ProvingService for LocalProver { + fn is_local(&self) -> bool { + true } - - pub fn fetch_task(&self) -> Result { - log::info!("[prover] start to fetch_task"); - let mut req = GetTaskRequest { - task_types: get_task_types(self.config.prover_type), - prover_height: None, - }; - - if self.config.prover_type == ProverType::Chunk { - let latest_block_number = self.get_latest_block_number_value()?; - if let Some(v) = latest_block_number { - if v.as_u64() == 0 { - bail!("omit to prove task of the genesis block") + async fn get_vks(&self, req: GetVkRequest) -> GetVkResponse { + let mut prover_types = vec![]; + req.circuit_types.iter().for_each(|circuit_type| { + if let Some(pt) = get_prover_type(*circuit_type) { + if !prover_types.contains(&pt) { + prover_types.push(pt); } - req.prover_height = Some(v.as_u64()); - } else { - log::error!("[prover] failed to fetch latest confirmed block number, got None"); - bail!("failed to fetch latest confirmed block number, got None") } - } - let resp = self.coordinator_client.borrow_mut().get_task(&req)?; + }); - match resp.data { - Some(d) => Ok(Task::from(d)), + let local_prover_config = self.config.clone(); + let vks = self + .circuits_handler_provider + .read() + .await + .init_vks(&local_prover_config, prover_types) + .await; + GetVkResponse { vks, error: None } + } + async fn prove(&self, req: ProveRequest) -> ProveResponse { + let prover_type = match get_prover_type(req.circuit_type) { + Some(pt) => pt, None => { - bail!("data of get_task empty, while error_code is success. there may be something wrong in response data or inner logic.") + return build_prove_error_response( + String::new(), + TaskStatus::Failed, + None, + String::from("unsupported prover_type"), + ) } + }; + let handler = self + .circuits_handler_provider + .write() + .await + .get_circuits_handler(&req.hard_fork_name, prover_type) + .context("failed to get circuit handler") + .unwrap(); + + match self.do_prove(req.clone(), handler) { + Ok(resp) => resp, + Err(e) => build_prove_error_response( + String::new(), + TaskStatus::Failed, + None, + String::from(&format!("failed to request proof: {}", e)), + ), } } - pub fn prove_task(&self, task: &Task) -> Result { - log::info!("[prover] start to prove_task, task id: {}", task.id); - let handler: Rc> = self - .circuits_handler_provider - .borrow_mut() - .get_circuits_handler(&task.hard_fork_name) - .context("failed to get circuit handler")?; - self.do_prove(task, handler) + async fn query_task(&self, req: QueryTaskRequest) -> QueryTaskResponse { + let mut current_task = self.current_task.lock().unwrap(); + + if let Some(handle) = current_task.take() { + if handle.is_finished() { + let result = Runtime::new().unwrap().block_on(handle).unwrap(); + match result { + Ok(proof) => { + return build_query_task_response( + req.task_id, + TaskStatus::Success, + Some(proof), + None, + ) + } + Err(e) => { + return build_query_task_response( + req.task_id, + TaskStatus::Failed, + None, + Some(e.to_string()), + ) + } + } + } else { + *current_task = Some(handle); + return build_query_task_response(req.task_id, TaskStatus::Proving, None, None); + } + } else { + let task_id = req.task_id.clone(); + return build_query_task_response( + req.task_id, + TaskStatus::Failed, + None, + Some(String::from(&format!( + "failed to query task, task_id: {}", + task_id + ))), + ); + } } +} - fn do_prove(&self, task: &Task, handler: Rc>) -> Result { - let mut proof_detail = ProofDetail { - id: task.id.clone(), - proof_type: task.task_type, - ..Default::default() - }; +impl LocalProver { + pub fn new(config: LocalProverConfig) -> Self { + let circuits_handler_provider = CircuitsHandlerProvider::new(config.clone()) + .context("failed to create circuits handler provider") + .unwrap(); - proof_detail.proof_data = handler.get_proof_data(task.task_type, task)?; - Ok(proof_detail) + Self { + config, + circuits_handler_provider: RwLock::new(circuits_handler_provider), + current_task: Arc::new(Mutex::new(None)), + next_task_id: Arc::new(Mutex::new(0)), + } } - pub fn submit_proof(&self, proof_detail: ProofDetail, task: &Task) -> Result<()> { - log::info!( - "[prover] start to submit_proof, task id: {}", - proof_detail.id - ); + fn do_prove( + &self, + req: ProveRequest, + handler: Arc>, + ) -> Result { + let mut current_task = self.current_task.lock().unwrap(); + if current_task.is_some() { + return Err(anyhow::Error::msg("prover working on previous task")); + } - let request = SubmitProofRequest { - uuid: task.uuid.clone(), - task_id: proof_detail.id, - task_type: proof_detail.proof_type, - status: ProofStatus::Ok, - proof: proof_detail.proof_data, - ..Default::default() + let task_id = { + let mut next_task_id = self.next_task_id.lock().unwrap(); + *next_task_id += 1; + *next_task_id }; - self.do_submit(&request) - } - - pub fn submit_error( - &self, - task: &Task, - failure_type: ProofFailureType, - error: Error, - ) -> Result<()> { - log::info!("[prover] start to submit_error, task id: {}", task.id); - let request = SubmitProofRequest { - uuid: task.uuid.clone(), - task_id: task.id.clone(), - task_type: task.task_type, - status: ProofStatus::Error, - failure_type: Some(failure_type), - failure_msg: Some(format!("{:#}", error)), - ..Default::default() - }; - self.do_submit(&request) + let req_clone = req.clone(); + let handle = tokio::spawn(async move { handler.get_proof_data(req_clone).await }); + *current_task = Some(handle); + + let duration = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); + let created_at = duration.as_secs() as f64 + duration.subsec_nanos() as f64 * 1e-9; + + Ok(ProveResponse { + task_id: task_id.to_string(), + circuit_type: req.circuit_type, + circuit_version: req.circuit_version.clone(), + hard_fork_name: req.hard_fork_name.clone(), + status: TaskStatus::Proving, + created_at, + started_at: None, + finished_at: None, + compute_time_sec: None, + input: Some(req.input), + proof: None, + vk: None, + error: None, + }) } +} - fn do_submit(&self, request: &SubmitProofRequest) -> Result<()> { - self.coordinator_client.borrow_mut().submit_proof(request)?; - Ok(()) +fn build_prove_error_response( + task_id: String, + status: TaskStatus, + proof: Option, + error_msg: String, +) -> ProveResponse { + ProveResponse { + task_id, + circuit_type: CircuitType::Undefined, // TODO + circuit_version: "".to_string(), + hard_fork_name: "".to_string(), + status, + created_at: 0.0, + started_at: None, + finished_at: None, + compute_time_sec: None, + input: None, + proof, + vk: None, + error: Some(error_msg), } +} - fn get_latest_block_number_value(&self) -> Result> { - let number = self - .geth_client - .as_ref() - .unwrap() - .borrow_mut() - .block_number()?; - Ok(number.as_number()) +fn build_query_task_response( + task_id: String, + status: TaskStatus, + proof: Option, + error_msg: Option, +) -> QueryTaskResponse { + QueryTaskResponse { + task_id, + circuit_type: CircuitType::Undefined, // TODO + circuit_version: "".to_string(), + hard_fork_name: "".to_string(), + status, + created_at: 0.0, + started_at: None, + finished_at: None, + compute_time_sec: None, + input: None, + proof, + vk: None, + error: error_msg, } } diff --git a/prover/src/task_cache.rs b/prover/src/task_cache.rs deleted file mode 100644 index a592c26b7d..0000000000 --- a/prover/src/task_cache.rs +++ /dev/null @@ -1,66 +0,0 @@ -use anyhow::{Ok, Result}; - -use super::coordinator_client::{listener::Listener, types::SubmitProofRequest}; -use crate::types::TaskWrapper; -use sled::{Config, Db}; -use std::rc::Rc; - -pub struct TaskCache { - db: Db, -} - -impl TaskCache { - pub fn new(db_path: &String) -> Result { - let config = Config::new().path(db_path); - let db = config.open()?; - log::info!("[task_cache] initiate successfully to {db_path}"); - Ok(Self { db }) - } - - pub fn put_task(&self, task_wrapper: &TaskWrapper) -> Result<()> { - let k = task_wrapper.task.id.clone().into_bytes(); - let v = serde_json::to_vec(task_wrapper)?; - self.db.insert(k, v)?; - log::info!( - "[task_cache] put_task with task_id: {}", - task_wrapper.task.id - ); - Ok(()) - } - - pub fn get_last_task(&self) -> Result> { - let last = self.db.last()?; - if let Some((k, v)) = last { - let kk = std::str::from_utf8(k.as_ref())?; - let task_wrapper: TaskWrapper = serde_json::from_slice(v.as_ref())?; - log::info!( - "[task_cache] get_last_task with task_id: {kk}, count: {}", - task_wrapper.get_count() - ); - return Ok(Some(task_wrapper)); - } - Ok(None) - } - - pub fn delete_task(&self, task_id: String) -> Result<()> { - let k = task_id.clone().into_bytes(); - self.db.remove(k)?; - log::info!("[task cache] delete_task with task_id: {task_id}"); - Ok(()) - } -} - -// ========================= listener =========================== - -pub struct ClearCacheCoordinatorListener { - pub task_cache: Rc, -} - -impl Listener for ClearCacheCoordinatorListener { - fn on_proof_submitted(&self, req: &SubmitProofRequest) { - let result = self.task_cache.delete_task(req.task_id.clone()); - if let Err(e) = result { - log::error!("delete task from embed db failed, {:#}", e); - } - } -} diff --git a/prover/src/task_processor.rs b/prover/src/task_processor.rs deleted file mode 100644 index df4629d5bd..0000000000 --- a/prover/src/task_processor.rs +++ /dev/null @@ -1,89 +0,0 @@ -use super::{coordinator_client::ProofStatusNotOKError, prover::Prover, task_cache::TaskCache}; -use anyhow::{Context, Result}; -use std::rc::Rc; - -pub struct TaskProcessor<'a> { - prover: &'a Prover<'a>, - task_cache: Rc, -} - -impl<'a> TaskProcessor<'a> { - pub fn new(prover: &'a Prover<'a>, task_cache: Rc) -> Self { - TaskProcessor { prover, task_cache } - } - - pub fn start(&self) { - loop { - log::info!("start a new round."); - if let Err(err) = self.prove_and_submit() { - if err.is::() { - log::info!("proof status not ok, downgrade level to info."); - } else { - log::error!("encounter error: {:#}", err); - } - } else { - log::info!("prove & submit succeed."); - } - } - } - - fn prove_and_submit(&self) -> Result<()> { - let task_from_cache = self - .task_cache - .get_last_task() - .context("failed to peek from stack")?; - - let mut task_wrapper = match task_from_cache { - Some(t) => t, - None => { - let fetch_result = self.prover.fetch_task(); - if let Err(err) = fetch_result { - std::thread::sleep(core::time::Duration::from_secs(10)); - return Err(err).context("failed to fetch task from coordinator"); - } - fetch_result.unwrap().into() - } - }; - - if task_wrapper.get_count() <= 2 { - task_wrapper.increment_count(); - self.task_cache - .put_task(&task_wrapper) - .context("failed to push task into stack, updating count")?; - - log::info!( - "start to prove task, task_type: {:?}, task_id: {}", - task_wrapper.task.task_type, - task_wrapper.task.id - ); - let result = match self.prover.prove_task(&task_wrapper.task) { - Ok(proof_detail) => self.prover.submit_proof(proof_detail, &task_wrapper.task), - Err(error) => { - log::error!( - "failed to prove task, id: {}, error: {:#}", - &task_wrapper.task.id, - error - ); - self.prover.submit_error( - &task_wrapper.task, - super::types::ProofFailureType::NoPanic, - error, - ) - } - }; - return result; - } - - // if tried times >= 3, it's probably due to circuit proving panic - log::error!( - "zk proving panic for task, task_type: {:?}, task_id: {}", - task_wrapper.task.task_type, - task_wrapper.task.id - ); - self.prover.submit_error( - &task_wrapper.task, - super::types::ProofFailureType::Panic, - anyhow::anyhow!("zk proving panic for task"), - ) - } -} diff --git a/prover/src/types.rs b/prover/src/types.rs index 513995d6d1..39a99b37c8 100644 --- a/prover/src/types.rs +++ b/prover/src/types.rs @@ -1,59 +1,10 @@ use ethers_core::types::H256; use serde::{Deserialize, Deserializer, Serialize, Serializer}; -use crate::coordinator_client::types::GetTaskResponseData; +use scroll_proving_sdk::prover::types::CircuitType; pub type CommonHash = H256; -#[derive(Debug, Clone, Copy, PartialEq)] -pub enum TaskType { - Undefined, - Chunk, - Batch, - Bundle, -} - -impl TaskType { - fn from_u8(v: u8) -> Self { - match v { - 1 => TaskType::Chunk, - 2 => TaskType::Batch, - 3 => TaskType::Bundle, - _ => TaskType::Undefined, - } - } -} - -impl Serialize for TaskType { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - match *self { - TaskType::Undefined => serializer.serialize_u8(0), - TaskType::Chunk => serializer.serialize_u8(1), - TaskType::Batch => serializer.serialize_u8(2), - TaskType::Bundle => serializer.serialize_u8(3), - } - } -} - -impl<'de> Deserialize<'de> for TaskType { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let v: u8 = u8::deserialize(deserializer)?; - Ok(TaskType::from_u8(v)) - } -} - -impl Default for TaskType { - fn default() -> Self { - Self::Undefined - } -} - #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum ProverType { Chunk, @@ -70,13 +21,6 @@ impl ProverType { } } } - - pub fn to_u8(self) -> u8 { - match self { - ProverType::Chunk => 1, - ProverType::Batch => 2, - } - } } impl Serialize for ProverType { @@ -103,54 +47,18 @@ impl<'de> Deserialize<'de> for ProverType { #[derive(Serialize, Deserialize, Default)] pub struct Task { - pub uuid: String, - pub id: String, #[serde(rename = "type", default)] - pub task_type: TaskType, + pub task_type: CircuitType, pub task_data: String, #[serde(default)] pub hard_fork_name: String, } -impl From for Task { - fn from(value: GetTaskResponseData) -> Self { - Self { - uuid: value.uuid, - id: value.task_id, - task_type: value.task_type, - task_data: value.task_data, - hard_fork_name: value.hard_fork_name, - } - } -} - -#[derive(Serialize, Deserialize, Default)] -pub struct TaskWrapper { - pub task: Task, - count: usize, -} - -impl TaskWrapper { - pub fn increment_count(&mut self) { - self.count += 1; - } - - pub fn get_count(&self) -> usize { - self.count - } -} - -impl From for TaskWrapper { - fn from(task: Task) -> Self { - TaskWrapper { task, count: 0 } - } -} - #[derive(Serialize, Deserialize, Default)] pub struct ProofDetail { pub id: String, #[serde(rename = "type", default)] - pub proof_type: TaskType, + pub proof_type: CircuitType, pub proof_data: String, pub error: String, } diff --git a/prover/src/utils.rs b/prover/src/utils.rs index 18be4ac7a1..cca12bc5fc 100644 --- a/prover/src/utils.rs +++ b/prover/src/utils.rs @@ -1,7 +1,8 @@ use env_logger::Env; use std::{fs::OpenOptions, sync::Once}; -use crate::types::{ProverType, TaskType}; +use crate::types::ProverType; +use scroll_proving_sdk::prover::types::CircuitType; static LOG_INIT: Once = Once::new(); @@ -24,9 +25,18 @@ pub fn log_init(log_file: Option) { }); } -pub fn get_task_types(prover_type: ProverType) -> Vec { +pub fn get_circuit_types(prover_type: ProverType) -> Vec { match prover_type { - ProverType::Chunk => vec![TaskType::Chunk], - ProverType::Batch => vec![TaskType::Batch, TaskType::Bundle], + ProverType::Chunk => vec![CircuitType::Chunk], + ProverType::Batch => vec![CircuitType::Batch, CircuitType::Bundle], + } +} + +pub fn get_prover_type(task_type: CircuitType) -> Option { + match task_type { + CircuitType::Undefined => None, + CircuitType::Chunk => Some(ProverType::Chunk), + CircuitType::Batch => Some(ProverType::Batch), + CircuitType::Bundle => Some(ProverType::Batch), } } diff --git a/prover/src/zk_circuits_handler.rs b/prover/src/zk_circuits_handler.rs index d1a8eb38c5..6436473c34 100644 --- a/prover/src/zk_circuits_handler.rs +++ b/prover/src/zk_circuits_handler.rs @@ -2,16 +2,16 @@ mod common; mod darwin; mod darwin_v2; -use super::geth_client::GethClient; -use crate::{ - config::{AssetsDirEnvConfig, Config}, - types::{ProverType, Task, TaskType}, - utils::get_task_types, -}; +use crate::{config::AssetsDirEnvConfig, types::ProverType, utils::get_circuit_types}; use anyhow::{bail, Result}; +use async_trait::async_trait; use darwin::DarwinHandler; use darwin_v2::DarwinV2Handler; -use std::{cell::RefCell, collections::HashMap, rc::Rc}; +use scroll_proving_sdk::{ + config::LocalProverConfig, + prover::{proving_service::ProveRequest, CircuitType}, +}; +use std::{collections::HashMap, sync::Arc}; type HardForkName = String; @@ -21,40 +21,37 @@ pub mod utils { } } -pub trait CircuitsHandler { - fn get_vk(&self, task_type: TaskType) -> Option>; +#[async_trait] +pub trait CircuitsHandler: Send + Sync { + async fn get_vk(&self, task_type: CircuitType) -> Option>; - fn get_proof_data(&self, task_type: TaskType, task: &Task) -> Result; + async fn get_proof_data(&self, prove_request: ProveRequest) -> Result; } -type CircuitsHandlerBuilder = fn( - prover_type: ProverType, - config: &Config, - geth_client: Option>>, -) -> Result>; +type CircuitsHandlerBuilder = + fn(prover_type: ProverType, config: &LocalProverConfig) -> Result>; -pub struct CircuitsHandlerProvider<'a> { - prover_type: ProverType, - config: &'a Config, - geth_client: Option>>, +pub struct CircuitsHandlerProvider { + config: LocalProverConfig, circuits_handler_builder_map: HashMap, current_fork_name: Option, - current_circuit: Option>>, + current_prover_type: Option, + current_circuit: Option>>, } -impl<'a> CircuitsHandlerProvider<'a> { - pub fn new( - prover_type: ProverType, - config: &'a Config, - geth_client: Option>>, - ) -> Result { +impl CircuitsHandlerProvider { + pub fn new(config: LocalProverConfig) -> Result { let mut m: HashMap = HashMap::new(); + if let Err(e) = AssetsDirEnvConfig::init() { + log::error!("AssetsDirEnvConfig init failed: {:#}", e); + std::process::exit(-2); + } + fn handler_builder( prover_type: ProverType, - config: &Config, - geth_client: Option>>, + config: &LocalProverConfig, ) -> Result> { log::info!( "now init zk circuits handler, hard_fork_name: {}", @@ -65,7 +62,6 @@ impl<'a> CircuitsHandlerProvider<'a> { prover_type, &config.low_version_circuit.params_path, &config.low_version_circuit.assets_path, - geth_client, ) .map(|handler| Box::new(handler) as Box) } @@ -76,8 +72,7 @@ impl<'a> CircuitsHandlerProvider<'a> { fn next_handler_builder( prover_type: ProverType, - config: &Config, - geth_client: Option>>, + config: &LocalProverConfig, ) -> Result> { log::info!( "now init zk circuits handler, hard_fork_name: {}", @@ -88,7 +83,6 @@ impl<'a> CircuitsHandlerProvider<'a> { prover_type, &config.high_version_circuit.params_path, &config.high_version_circuit.assets_path, - geth_client, ) .map(|handler| Box::new(handler) as Box) } @@ -99,11 +93,10 @@ impl<'a> CircuitsHandlerProvider<'a> { ); let provider = CircuitsHandlerProvider { - prover_type, config, - geth_client, circuits_handler_builder_map: m, current_fork_name: None, + current_prover_type: None, current_circuit: None, }; @@ -113,7 +106,8 @@ impl<'a> CircuitsHandlerProvider<'a> { pub fn get_circuits_handler( &mut self, hard_fork_name: &String, - ) -> Result>> { + prover_type: ProverType, + ) -> Result>> { match &self.current_fork_name { Some(fork_name) if fork_name == hard_fork_name => { log::info!("get circuits handler from cache"); @@ -129,10 +123,11 @@ impl<'a> CircuitsHandlerProvider<'a> { ); if let Some(builder) = self.circuits_handler_builder_map.get(hard_fork_name) { log::info!("building circuits handler for {hard_fork_name}"); - let handler = builder(self.prover_type, self.config, self.geth_client.clone()) + let handler = builder(prover_type, &self.config) .expect("failed to build circuits handler"); self.current_fork_name = Some(hard_fork_name.clone()); - let rc_handler = Rc::new(handler); + self.current_prover_type = Some(prover_type); + let rc_handler = Arc::new(handler); self.current_circuit = Some(rc_handler.clone()); Ok(rc_handler) } else { @@ -142,33 +137,32 @@ impl<'a> CircuitsHandlerProvider<'a> { } } - pub fn init_vks( + pub async fn init_vks( &self, - prover_type: ProverType, - config: &'a Config, - geth_client: Option>>, + config: &LocalProverConfig, + prover_types: Vec, ) -> Vec { - self.circuits_handler_builder_map - .iter() - .flat_map(|(hard_fork_name, build)| { - let handler = build(prover_type, config, geth_client.clone()) - .expect("failed to build circuits handler"); - - get_task_types(prover_type) - .into_iter() - .map(|task_type| { - let vk = handler - .get_vk(task_type) - .map_or("".to_string(), utils::encode_vk); - log::info!( - "vk for {hard_fork_name}, is {vk}, task_type: {:?}", - task_type - ); - vk - }) - .filter(|vk| !vk.is_empty()) - .collect::>() - }) - .collect::>() + let mut vks: Vec = Vec::new(); + for (hard_fork_name, build) in self.circuits_handler_builder_map.iter() { + for prover_type in prover_types.iter() { + let handler = + build(*prover_type, config).expect("failed to build circuits handler"); + + for task_type in get_circuit_types(*prover_type).into_iter() { + let vk = handler + .get_vk(task_type) + .await + .map_or("".to_string(), utils::encode_vk); + log::info!( + "vk for {hard_fork_name}, is {vk}, task_type: {:?}", + task_type + ); + if !vk.is_empty() { + vks.push(vk) + } + } + } + } + vks } } diff --git a/prover/src/zk_circuits_handler/darwin.rs b/prover/src/zk_circuits_handler/darwin.rs index 96618f9508..e2ae353905 100644 --- a/prover/src/zk_circuits_handler/darwin.rs +++ b/prover/src/zk_circuits_handler/darwin.rs @@ -1,14 +1,14 @@ use super::{common::*, CircuitsHandler}; -use crate::{ - geth_client::GethClient, - types::{ProverType, TaskType}, -}; +use crate::types::ProverType; use anyhow::{bail, Context, Ok, Result}; +use async_trait::async_trait; use once_cell::sync::Lazy; +use scroll_proving_sdk::prover::{proving_service::ProveRequest, CircuitType}; use serde::Deserialize; +use tokio::sync::RwLock; -use crate::types::{CommonHash, Task}; -use std::{cell::RefCell, cmp::Ordering, env, rc::Rc}; +use crate::types::CommonHash; +use std::env; use prover_darwin::{ aggregator::Prover as BatchProver, @@ -37,16 +37,10 @@ pub struct ChunkTaskDetail { pub block_hashes: Vec, } -fn get_block_number(block_trace: &BlockTrace) -> Option { - block_trace.header.number.map(|n| n.as_u64()) -} - #[derive(Default)] pub struct DarwinHandler { - chunk_prover: Option>>, - batch_prover: Option>>, - - geth_client: Option>>, + chunk_prover: Option>>, + batch_prover: Option>>, } impl DarwinHandler { @@ -54,7 +48,6 @@ impl DarwinHandler { prover_types: Vec, params_dir: &str, assets_dir: &str, - geth_client: Option>>, ) -> Result { let class_name = std::intrinsics::type_name::(); let prover_types_set = prover_types @@ -63,7 +56,6 @@ impl DarwinHandler { let mut handler = Self { batch_prover: None, chunk_prover: None, - geth_client, }; let degrees: Vec = get_degrees(&prover_types_set, |prover_type| match prover_type { ProverType::Chunk => ZKEVM_DEGREES.clone(), @@ -81,12 +73,12 @@ impl DarwinHandler { for prover_type in prover_types_set { match prover_type { ProverType::Chunk => { - handler.chunk_prover = Some(RefCell::new(ChunkProver::from_params_and_assets( + handler.chunk_prover = Some(RwLock::new(ChunkProver::from_params_and_assets( params_map, assets_dir, ))); } ProverType::Batch => { - handler.batch_prover = Some(RefCell::new(BatchProver::from_params_and_assets( + handler.batch_prover = Some(RwLock::new(BatchProver::from_params_and_assets( params_map, assets_dir, ))) } @@ -95,22 +87,18 @@ impl DarwinHandler { Ok(handler) } - pub fn new( - prover_type: ProverType, - params_dir: &str, - assets_dir: &str, - geth_client: Option>>, - ) -> Result { - Self::new_multi(vec![prover_type], params_dir, assets_dir, geth_client) + pub fn new(prover_type: ProverType, params_dir: &str, assets_dir: &str) -> Result { + Self::new_multi(vec![prover_type], params_dir, assets_dir) } - fn gen_chunk_proof_raw(&self, chunk_trace: Vec) -> Result { + async fn gen_chunk_proof_raw(&self, chunk_trace: Vec) -> Result { if let Some(prover) = self.chunk_prover.as_ref() { let chunk = ChunkProvingTask::from(chunk_trace); let chunk_proof = prover - .borrow_mut() + .write() + .await .gen_chunk_proof(chunk, None, None, self.get_output_dir())?; return Ok(chunk_proof); @@ -118,13 +106,13 @@ impl DarwinHandler { unreachable!("please check errors in proof_type logic") } - fn gen_chunk_proof(&self, task: &crate::types::Task) -> Result { - let chunk_trace = self.gen_chunk_traces(task)?; - let chunk_proof = self.gen_chunk_proof_raw(chunk_trace)?; + async fn gen_chunk_proof(&self, prove_request: ProveRequest) -> Result { + let chunk_traces: Vec = serde_json::from_str(&prove_request.input)?; + let chunk_proof = self.gen_chunk_proof_raw(chunk_traces).await?; Ok(serde_json::to_string(&chunk_proof)?) } - fn gen_batch_proof_raw(&self, batch_task_detail: BatchTaskDetail) -> Result { + async fn gen_batch_proof_raw(&self, batch_task_detail: BatchTaskDetail) -> Result { if let Some(prover) = self.batch_prover.as_ref() { let chunk_hashes_proofs: Vec<(ChunkInfo, ChunkProof)> = batch_task_detail .chunk_infos @@ -136,13 +124,13 @@ impl DarwinHandler { let chunk_proofs: Vec = chunk_hashes_proofs.iter().map(|t| t.1.clone()).collect(); - let is_valid = prover.borrow_mut().check_protocol_of_chunks(&chunk_proofs); + let is_valid = prover.read().await.check_protocol_of_chunks(&chunk_proofs); if !is_valid { bail!("non-match chunk protocol") } check_chunk_hashes("", &chunk_hashes_proofs).context("failed to check chunk info")?; - let batch_proof = prover.borrow_mut().gen_batch_proof( + let batch_proof = prover.write().await.gen_batch_proof( batch_task_detail.batch_proving_task, None, self.get_output_dir(), @@ -153,17 +141,18 @@ impl DarwinHandler { unreachable!("please check errors in proof_type logic") } - fn gen_batch_proof(&self, task: &crate::types::Task) -> Result { - log::info!("[circuit] gen_batch_proof for task {}", task.id); - - let batch_task_detail: BatchTaskDetail = serde_json::from_str(&task.task_data)?; - let batch_proof = self.gen_batch_proof_raw(batch_task_detail)?; + async fn gen_batch_proof(&self, prove_request: ProveRequest) -> Result { + let batch_task_detail: BatchTaskDetail = serde_json::from_str(&prove_request.input)?; + let batch_proof = self.gen_batch_proof_raw(batch_task_detail).await?; Ok(serde_json::to_string(&batch_proof)?) } - fn gen_bundle_proof_raw(&self, bundle_task_detail: BundleTaskDetail) -> Result { + async fn gen_bundle_proof_raw( + &self, + bundle_task_detail: BundleTaskDetail, + ) -> Result { if let Some(prover) = self.batch_prover.as_ref() { - let bundle_proof = prover.borrow_mut().gen_bundle_proof( + let bundle_proof = prover.write().await.gen_bundle_proof( bundle_task_detail, None, self.get_output_dir(), @@ -174,100 +163,45 @@ impl DarwinHandler { unreachable!("please check errors in proof_type logic") } - fn gen_bundle_proof(&self, task: &crate::types::Task) -> Result { - log::info!("[circuit] gen_bundle_proof for task {}", task.id); - let bundle_task_detail: BundleTaskDetail = serde_json::from_str(&task.task_data)?; - let bundle_proof = self.gen_bundle_proof_raw(bundle_task_detail)?; + async fn gen_bundle_proof(&self, prove_request: ProveRequest) -> Result { + let bundle_task_detail: BundleTaskDetail = serde_json::from_str(&prove_request.input)?; + let bundle_proof = self.gen_bundle_proof_raw(bundle_task_detail).await?; Ok(serde_json::to_string(&bundle_proof)?) } fn get_output_dir(&self) -> Option<&str> { OUTPUT_DIR.as_deref() } - - fn gen_chunk_traces(&self, task: &Task) -> Result> { - let chunk_task_detail: ChunkTaskDetail = serde_json::from_str(&task.task_data)?; - self.get_sorted_traces_by_hashes(&chunk_task_detail.block_hashes) - } - - fn get_sorted_traces_by_hashes(&self, block_hashes: &[CommonHash]) -> Result> { - if block_hashes.is_empty() { - log::error!("[prover] failed to get sorted traces: block_hashes are empty"); - bail!("block_hashes are empty") - } - - let mut block_traces = Vec::new(); - for hash in block_hashes.iter() { - let trace = self - .geth_client - .as_ref() - .unwrap() - .borrow_mut() - .get_block_trace_by_hash(hash)?; - block_traces.push(trace); - } - - block_traces.sort_by(|a, b| { - if get_block_number(a).is_none() { - Ordering::Less - } else if get_block_number(b).is_none() { - Ordering::Greater - } else { - get_block_number(a) - .unwrap() - .cmp(&get_block_number(b).unwrap()) - } - }); - - let block_numbers: Vec = block_traces - .iter() - .map(|trace| get_block_number(trace).unwrap_or(0)) - .collect(); - let mut i = 0; - while i < block_numbers.len() - 1 { - if block_numbers[i] + 1 != block_numbers[i + 1] { - log::error!( - "[prover] block numbers are not continuous, got {} and {}", - block_numbers[i], - block_numbers[i + 1] - ); - bail!( - "block numbers are not continuous, got {} and {}", - block_numbers[i], - block_numbers[i + 1] - ) - } - i += 1; - } - - Ok(block_traces) - } } +#[async_trait] impl CircuitsHandler for DarwinHandler { - fn get_vk(&self, task_type: TaskType) -> Option> { + async fn get_vk(&self, task_type: CircuitType) -> Option> { match task_type { - TaskType::Chunk => self - .chunk_prover - .as_ref() - .and_then(|prover| prover.borrow().get_vk()), - TaskType::Batch => self + CircuitType::Chunk => self.chunk_prover.as_ref().unwrap().read().await.get_vk(), + CircuitType::Batch => self .batch_prover .as_ref() - .and_then(|prover| prover.borrow().get_batch_vk()), - TaskType::Bundle => self + .unwrap() + .read() + .await + .get_batch_vk(), + CircuitType::Bundle => self .batch_prover .as_ref() - .and_then(|prover| prover.borrow().get_bundle_vk()), + .unwrap() + .read() + .await + .get_bundle_vk(), _ => unreachable!(), } } - fn get_proof_data(&self, task_type: TaskType, task: &crate::types::Task) -> Result { - match task_type { - TaskType::Chunk => self.gen_chunk_proof(task), - TaskType::Batch => self.gen_batch_proof(task), - TaskType::Bundle => self.gen_bundle_proof(task), + async fn get_proof_data(&self, prove_request: ProveRequest) -> Result { + match prove_request.circuit_type { + CircuitType::Chunk => self.gen_chunk_proof(prove_request).await, + CircuitType::Batch => self.gen_batch_proof(prove_request).await, + CircuitType::Bundle => self.gen_bundle_proof(prove_request).await, _ => unreachable!(), } } @@ -312,19 +246,18 @@ mod tests { assert!(result); } - #[test] - fn test_circuits() -> Result<()> { + #[tokio::test] + async fn test_circuits() -> Result<()> { let bi_handler = DarwinHandler::new_multi( vec![ProverType::Chunk, ProverType::Batch], &PARAMS_PATH, &ASSETS_PATH, - None, )?; let chunk_handler = bi_handler; - let chunk_vk = chunk_handler.get_vk(TaskType::Chunk).unwrap(); + let chunk_vk = chunk_handler.get_vk(CircuitType::Chunk).await.unwrap(); - check_vk(TaskType::Chunk, chunk_vk, "chunk vk must be available"); + check_vk(CircuitType::Chunk, chunk_vk, "chunk vk must be available"); let chunk_dir_paths = get_chunk_dir_paths()?; log::info!("chunk_dir_paths, {:?}", chunk_dir_paths); let mut chunk_infos = vec![]; @@ -338,18 +271,18 @@ mod tests { chunk_infos.push(chunk_info); log::info!("start to prove {chunk_id}"); - let chunk_proof = chunk_handler.gen_chunk_proof_raw(chunk_trace)?; + let chunk_proof = chunk_handler.gen_chunk_proof_raw(chunk_trace).await?; let proof_data = serde_json::to_string(&chunk_proof)?; dump_proof(chunk_id, proof_data)?; chunk_proofs.push(chunk_proof); } let batch_handler = chunk_handler; - let batch_vk = batch_handler.get_vk(TaskType::Batch).unwrap(); - check_vk(TaskType::Batch, batch_vk, "batch vk must be available"); + let batch_vk = batch_handler.get_vk(CircuitType::Batch).await.unwrap(); + check_vk(CircuitType::Batch, batch_vk, "batch vk must be available"); let batch_task_detail = make_batch_task_detail(chunk_infos, chunk_proofs); log::info!("start to prove batch"); - let batch_proof = batch_handler.gen_batch_proof_raw(batch_task_detail)?; + let batch_proof = batch_handler.gen_batch_proof_raw(batch_task_detail).await?; let proof_data = serde_json::to_string(&batch_proof)?; dump_proof("batch_proof".to_string(), proof_data)?; @@ -369,19 +302,19 @@ mod tests { // } } - fn check_vk(proof_type: TaskType, vk: Vec, info: &str) { + fn check_vk(proof_type: CircuitType, vk: Vec, info: &str) { log::info!("check_vk, {:?}", proof_type); let vk_from_file = read_vk(proof_type).unwrap(); assert_eq!(vk_from_file, encode_vk(vk), "{info}") } - fn read_vk(proof_type: TaskType) -> Result { + fn read_vk(proof_type: CircuitType) -> Result { log::info!("read_vk, {:?}", proof_type); let vk_file = match proof_type { - TaskType::Chunk => CHUNK_VK_PATH.clone(), - TaskType::Batch => BATCH_VK_PATH.clone(), - TaskType::Bundle => todo!(), - TaskType::Undefined => unreachable!(), + CircuitType::Chunk => CHUNK_VK_PATH.clone(), + CircuitType::Batch => BATCH_VK_PATH.clone(), + CircuitType::Bundle => todo!(), + CircuitType::Undefined => unreachable!(), }; let data = std::fs::read(vk_file)?; diff --git a/prover/src/zk_circuits_handler/darwin_v2.rs b/prover/src/zk_circuits_handler/darwin_v2.rs index fce3871d03..a81776faf8 100644 --- a/prover/src/zk_circuits_handler/darwin_v2.rs +++ b/prover/src/zk_circuits_handler/darwin_v2.rs @@ -1,14 +1,14 @@ use super::{common::*, CircuitsHandler}; -use crate::{ - geth_client::GethClient, - types::{ProverType, TaskType}, -}; +use crate::types::ProverType; use anyhow::{bail, Context, Ok, Result}; +use async_trait::async_trait; use once_cell::sync::Lazy; +use scroll_proving_sdk::prover::{proving_service::ProveRequest, CircuitType}; use serde::Deserialize; +use tokio::sync::RwLock; -use crate::types::{CommonHash, Task}; -use std::{cell::RefCell, cmp::Ordering, env, rc::Rc}; +use crate::types::CommonHash; +use std::env; use prover_darwin_v2::{ aggregator::Prover as BatchProver, @@ -37,16 +37,10 @@ pub struct ChunkTaskDetail { pub block_hashes: Vec, } -fn get_block_number(block_trace: &BlockTrace) -> Option { - block_trace.header.number.map(|n| n.as_u64()) -} - #[derive(Default)] pub struct DarwinV2Handler { - chunk_prover: Option>>, - batch_prover: Option>>, - - geth_client: Option>>, + chunk_prover: Option>>, + batch_prover: Option>>, } impl DarwinV2Handler { @@ -54,7 +48,6 @@ impl DarwinV2Handler { prover_types: Vec, params_dir: &str, assets_dir: &str, - geth_client: Option>>, ) -> Result { let class_name = std::intrinsics::type_name::(); let prover_types_set = prover_types @@ -63,7 +56,6 @@ impl DarwinV2Handler { let mut handler = Self { batch_prover: None, chunk_prover: None, - geth_client, }; let degrees: Vec = get_degrees(&prover_types_set, |prover_type| match prover_type { ProverType::Chunk => ZKEVM_DEGREES.clone(), @@ -81,12 +73,12 @@ impl DarwinV2Handler { for prover_type in prover_types_set { match prover_type { ProverType::Chunk => { - handler.chunk_prover = Some(RefCell::new(ChunkProver::from_params_and_assets( + handler.chunk_prover = Some(RwLock::new(ChunkProver::from_params_and_assets( params_map, assets_dir, ))); } ProverType::Batch => { - handler.batch_prover = Some(RefCell::new(BatchProver::from_params_and_assets( + handler.batch_prover = Some(RwLock::new(BatchProver::from_params_and_assets( params_map, assets_dir, ))) } @@ -95,22 +87,18 @@ impl DarwinV2Handler { Ok(handler) } - pub fn new( - prover_type: ProverType, - params_dir: &str, - assets_dir: &str, - geth_client: Option>>, - ) -> Result { - Self::new_multi(vec![prover_type], params_dir, assets_dir, geth_client) + pub fn new(prover_type: ProverType, params_dir: &str, assets_dir: &str) -> Result { + Self::new_multi(vec![prover_type], params_dir, assets_dir) } - fn gen_chunk_proof_raw(&self, chunk_trace: Vec) -> Result { + async fn gen_chunk_proof_raw(&self, chunk_trace: Vec) -> Result { if let Some(prover) = self.chunk_prover.as_ref() { let chunk = ChunkProvingTask::from(chunk_trace); let chunk_proof = prover - .borrow_mut() + .write() + .await .gen_chunk_proof(chunk, None, None, self.get_output_dir())?; return Ok(chunk_proof); @@ -118,13 +106,13 @@ impl DarwinV2Handler { unreachable!("please check errors in proof_type logic") } - fn gen_chunk_proof(&self, task: &crate::types::Task) -> Result { - let chunk_trace = self.gen_chunk_traces(task)?; - let chunk_proof = self.gen_chunk_proof_raw(chunk_trace)?; + async fn gen_chunk_proof(&self, prove_request: ProveRequest) -> Result { + let chunk_traces: Vec = serde_json::from_str(&prove_request.input)?; + let chunk_proof = self.gen_chunk_proof_raw(chunk_traces).await?; Ok(serde_json::to_string(&chunk_proof)?) } - fn gen_batch_proof_raw(&self, batch_task_detail: BatchTaskDetail) -> Result { + async fn gen_batch_proof_raw(&self, batch_task_detail: BatchTaskDetail) -> Result { if let Some(prover) = self.batch_prover.as_ref() { let chunk_hashes_proofs: Vec<(ChunkInfo, ChunkProof)> = batch_task_detail .chunk_infos @@ -136,13 +124,13 @@ impl DarwinV2Handler { let chunk_proofs: Vec = chunk_hashes_proofs.iter().map(|t| t.1.clone()).collect(); - let is_valid = prover.borrow_mut().check_protocol_of_chunks(&chunk_proofs); + let is_valid = prover.write().await.check_protocol_of_chunks(&chunk_proofs); if !is_valid { bail!("non-match chunk protocol") } check_chunk_hashes("", &chunk_hashes_proofs).context("failed to check chunk info")?; - let batch_proof = prover.borrow_mut().gen_batch_proof( + let batch_proof = prover.write().await.gen_batch_proof( batch_task_detail.batch_proving_task, None, self.get_output_dir(), @@ -153,17 +141,18 @@ impl DarwinV2Handler { unreachable!("please check errors in proof_type logic") } - fn gen_batch_proof(&self, task: &crate::types::Task) -> Result { - log::info!("[circuit] gen_batch_proof for task {}", task.id); - - let batch_task_detail: BatchTaskDetail = serde_json::from_str(&task.task_data)?; - let batch_proof = self.gen_batch_proof_raw(batch_task_detail)?; + async fn gen_batch_proof(&self, prove_request: ProveRequest) -> Result { + let batch_task_detail: BatchTaskDetail = serde_json::from_str(&prove_request.input)?; + let batch_proof = self.gen_batch_proof_raw(batch_task_detail).await?; Ok(serde_json::to_string(&batch_proof)?) } - fn gen_bundle_proof_raw(&self, bundle_task_detail: BundleTaskDetail) -> Result { + async fn gen_bundle_proof_raw( + &self, + bundle_task_detail: BundleTaskDetail, + ) -> Result { if let Some(prover) = self.batch_prover.as_ref() { - let bundle_proof = prover.borrow_mut().gen_bundle_proof( + let bundle_proof = prover.write().await.gen_bundle_proof( bundle_task_detail, None, self.get_output_dir(), @@ -174,100 +163,45 @@ impl DarwinV2Handler { unreachable!("please check errors in proof_type logic") } - fn gen_bundle_proof(&self, task: &crate::types::Task) -> Result { - log::info!("[circuit] gen_bundle_proof for task {}", task.id); - let bundle_task_detail: BundleTaskDetail = serde_json::from_str(&task.task_data)?; - let bundle_proof = self.gen_bundle_proof_raw(bundle_task_detail)?; + async fn gen_bundle_proof(&self, prove_request: ProveRequest) -> Result { + let bundle_task_detail: BundleTaskDetail = serde_json::from_str(&prove_request.input)?; + let bundle_proof = self.gen_bundle_proof_raw(bundle_task_detail).await?; Ok(serde_json::to_string(&bundle_proof)?) } fn get_output_dir(&self) -> Option<&str> { OUTPUT_DIR.as_deref() } - - fn gen_chunk_traces(&self, task: &Task) -> Result> { - let chunk_task_detail: ChunkTaskDetail = serde_json::from_str(&task.task_data)?; - self.get_sorted_traces_by_hashes(&chunk_task_detail.block_hashes) - } - - fn get_sorted_traces_by_hashes(&self, block_hashes: &[CommonHash]) -> Result> { - if block_hashes.is_empty() { - log::error!("[prover] failed to get sorted traces: block_hashes are empty"); - bail!("block_hashes are empty") - } - - let mut block_traces = Vec::new(); - for hash in block_hashes.iter() { - let trace = self - .geth_client - .as_ref() - .unwrap() - .borrow_mut() - .get_block_trace_by_hash(hash)?; - block_traces.push(trace); - } - - block_traces.sort_by(|a, b| { - if get_block_number(a).is_none() { - Ordering::Less - } else if get_block_number(b).is_none() { - Ordering::Greater - } else { - get_block_number(a) - .unwrap() - .cmp(&get_block_number(b).unwrap()) - } - }); - - let block_numbers: Vec = block_traces - .iter() - .map(|trace| get_block_number(trace).unwrap_or(0)) - .collect(); - let mut i = 0; - while i < block_numbers.len() - 1 { - if block_numbers[i] + 1 != block_numbers[i + 1] { - log::error!( - "[prover] block numbers are not continuous, got {} and {}", - block_numbers[i], - block_numbers[i + 1] - ); - bail!( - "block numbers are not continuous, got {} and {}", - block_numbers[i], - block_numbers[i + 1] - ) - } - i += 1; - } - - Ok(block_traces) - } } +#[async_trait] impl CircuitsHandler for DarwinV2Handler { - fn get_vk(&self, task_type: TaskType) -> Option> { + async fn get_vk(&self, task_type: CircuitType) -> Option> { match task_type { - TaskType::Chunk => self - .chunk_prover - .as_ref() - .and_then(|prover| prover.borrow().get_vk()), - TaskType::Batch => self + CircuitType::Chunk => self.chunk_prover.as_ref().unwrap().read().await.get_vk(), + CircuitType::Batch => self .batch_prover .as_ref() - .and_then(|prover| prover.borrow().get_batch_vk()), - TaskType::Bundle => self + .unwrap() + .read() + .await + .get_batch_vk(), + CircuitType::Bundle => self .batch_prover .as_ref() - .and_then(|prover| prover.borrow().get_bundle_vk()), + .unwrap() + .read() + .await + .get_bundle_vk(), _ => unreachable!(), } } - fn get_proof_data(&self, task_type: TaskType, task: &crate::types::Task) -> Result { - match task_type { - TaskType::Chunk => self.gen_chunk_proof(task), - TaskType::Batch => self.gen_batch_proof(task), - TaskType::Bundle => self.gen_bundle_proof(task), + async fn get_proof_data(&self, prove_request: ProveRequest) -> Result { + match prove_request.circuit_type { + CircuitType::Chunk => self.gen_chunk_proof(prove_request).await, + CircuitType::Batch => self.gen_batch_proof(prove_request).await, + CircuitType::Bundle => self.gen_bundle_proof(prove_request).await, _ => unreachable!(), } } @@ -316,19 +250,18 @@ mod tests { assert!(result); } - #[test] - fn test_circuits() -> Result<()> { + #[tokio::test] + async fn test_circuits() -> Result<()> { let bi_handler = DarwinV2Handler::new_multi( vec![ProverType::Chunk, ProverType::Batch], &PARAMS_PATH, &ASSETS_PATH, - None, )?; let chunk_handler = bi_handler; - let chunk_vk = chunk_handler.get_vk(TaskType::Chunk).unwrap(); + let chunk_vk = chunk_handler.get_vk(CircuitType::Chunk).await.unwrap(); - check_vk(TaskType::Chunk, chunk_vk, "chunk vk must be available"); + check_vk(CircuitType::Chunk, chunk_vk, "chunk vk must be available"); let chunk_dir_paths = get_chunk_dir_paths()?; log::info!("chunk_dir_paths, {:?}", chunk_dir_paths); let mut chunk_traces = vec![]; @@ -343,18 +276,18 @@ mod tests { chunk_infos.push(chunk_info); log::info!("start to prove {chunk_id}"); - let chunk_proof = chunk_handler.gen_chunk_proof_raw(chunk_trace)?; + let chunk_proof = chunk_handler.gen_chunk_proof_raw(chunk_trace).await?; let proof_data = serde_json::to_string(&chunk_proof)?; dump_proof(chunk_id, proof_data)?; chunk_proofs.push(chunk_proof); } let batch_handler = chunk_handler; - let batch_vk = batch_handler.get_vk(TaskType::Batch).unwrap(); - check_vk(TaskType::Batch, batch_vk, "batch vk must be available"); + let batch_vk = batch_handler.get_vk(CircuitType::Batch).await.unwrap(); + check_vk(CircuitType::Batch, batch_vk, "batch vk must be available"); let batch_task_detail = make_batch_task_detail(chunk_traces, chunk_proofs, None); log::info!("start to prove batch"); - let batch_proof = batch_handler.gen_batch_proof_raw(batch_task_detail)?; + let batch_proof = batch_handler.gen_batch_proof_raw(batch_task_detail).await?; let proof_data = serde_json::to_string(&batch_proof)?; dump_proof("batch_proof".to_string(), proof_data)?; @@ -427,19 +360,19 @@ mod tests { } } - fn check_vk(proof_type: TaskType, vk: Vec, info: &str) { + fn check_vk(proof_type: CircuitType, vk: Vec, info: &str) { log::info!("check_vk, {:?}", proof_type); let vk_from_file = read_vk(proof_type).unwrap(); assert_eq!(vk_from_file, encode_vk(vk), "{info}") } - fn read_vk(proof_type: TaskType) -> Result { + fn read_vk(proof_type: CircuitType) -> Result { log::info!("read_vk, {:?}", proof_type); let vk_file = match proof_type { - TaskType::Chunk => CHUNK_VK_PATH.clone(), - TaskType::Batch => BATCH_VK_PATH.clone(), - TaskType::Bundle => todo!(), - TaskType::Undefined => unreachable!(), + CircuitType::Chunk => CHUNK_VK_PATH.clone(), + CircuitType::Batch => BATCH_VK_PATH.clone(), + CircuitType::Bundle => todo!(), + CircuitType::Undefined => unreachable!(), }; let data = std::fs::read(vk_file)?; diff --git a/rollup/internal/controller/relayer/l1_relayer.go b/rollup/internal/controller/relayer/l1_relayer.go index 36aeade98b..2c498da502 100644 --- a/rollup/internal/controller/relayer/l1_relayer.go +++ b/rollup/internal/controller/relayer/l1_relayer.go @@ -251,14 +251,14 @@ func (r *Layer1Relayer) shouldUpdateGasOracle(baseFee uint64, blobBaseFee uint64 } expectedBaseFeeDelta := r.lastBaseFee*r.gasPriceDiff/gasPriceDiffPrecision + 1 - if baseFee >= r.minGasPrice && (baseFee >= r.lastBaseFee+expectedBaseFeeDelta || baseFee+expectedBaseFeeDelta <= r.lastBaseFee) { + if baseFee >= r.minGasPrice && math.Abs(float64(baseFee)-float64(r.lastBaseFee)) >= float64(expectedBaseFeeDelta) { return true } expectedBlobBaseFeeDelta := r.lastBlobBaseFee * r.gasPriceDiff / gasPriceDiffPrecision // Plus a minimum of 0.01 gwei, since the blob base fee is usually low, preventing short-time flunctuation. expectedBlobBaseFeeDelta += 10000000 - if blobBaseFee >= r.minGasPrice && (blobBaseFee >= r.lastBlobBaseFee+expectedBlobBaseFeeDelta || blobBaseFee+expectedBlobBaseFeeDelta <= r.lastBlobBaseFee) { + if blobBaseFee >= r.minGasPrice && math.Abs(float64(blobBaseFee)-float64(r.lastBlobBaseFee)) >= float64(expectedBlobBaseFeeDelta) { return true } diff --git a/rollup/internal/controller/relayer/l2_relayer.go b/rollup/internal/controller/relayer/l2_relayer.go index 33f6921a9b..60e866d2e1 100644 --- a/rollup/internal/controller/relayer/l2_relayer.go +++ b/rollup/internal/controller/relayer/l2_relayer.go @@ -344,8 +344,9 @@ func (r *Layer2Relayer) ProcessGasPriceOracle() { expectedDelta = 1 } - // last is undefine or (suggestGasPriceUint64 >= minGasPrice && exceed diff) - if r.lastGasPrice == 0 || (suggestGasPriceUint64 >= r.minGasPrice && (suggestGasPriceUint64 >= r.lastGasPrice+expectedDelta || suggestGasPriceUint64+expectedDelta <= r.lastGasPrice)) { + // last is undefined or (suggestGasPriceUint64 >= minGasPrice && exceed diff) + if r.lastGasPrice == 0 || (suggestGasPriceUint64 >= r.minGasPrice && + (math.Abs(float64(suggestGasPriceUint64)-float64(r.lastGasPrice)) >= float64(expectedDelta))) { data, err := r.l2GasOracleABI.Pack("setL2BaseFee", suggestGasPrice) if err != nil { log.Error("Failed to pack setL2BaseFee", "batch.Hash", batch.Hash, "GasPrice", suggestGasPrice.Uint64(), "err", err) @@ -529,7 +530,7 @@ func (r *Layer2Relayer) ProcessPendingBundles() { } case types.ProvingTaskVerified: - log.Info("Start to roll up zk proof", "bundle hash", bundle.Hash) + log.Info("Start to roll up zk proof", "index", bundle.Index, "bundle hash", bundle.Hash) r.metrics.rollupL2RelayerProcessPendingBundlesFinalizedTotal.Inc() if err := r.finalizeBundle(bundle, true); err != nil { log.Error("failed to finalize bundle with proof", "bundle index", bundle.Index, "start batch index", bundle.StartBatchIndex, "end batch index", bundle.EndBatchIndex, "err", err) @@ -636,8 +637,22 @@ func (r *Layer2Relayer) finalizeBundle(bundle *orm.Bundle, withProof bool) error log.Info("finalizeBundle in layer1", "with proof", withProof, "index", bundle.Index, "start batch index", bundle.StartBatchIndex, "end batch index", bundle.EndBatchIndex, "tx hash", txHash.String()) // Updating rollup status in database. - if err := r.bundleOrm.UpdateFinalizeTxHashAndRollupStatus(r.ctx, bundle.Hash, txHash.String(), types.RollupFinalizing); err != nil { - log.Error("UpdateFinalizeTxHashAndRollupStatus failed", "index", bundle.Index, "bundle hash", bundle.Hash, "tx hash", txHash.String(), "err", err) + err = r.db.Transaction(func(dbTX *gorm.DB) error { + if err = r.batchOrm.UpdateFinalizeTxHashAndRollupStatusByBundleHash(r.ctx, bundle.Hash, txHash.String(), types.RollupFinalizing, dbTX); err != nil { + log.Warn("UpdateFinalizeTxHashAndRollupStatusByBundleHash failed", "index", bundle.Index, "bundle hash", bundle.Hash, "tx hash", txHash.String(), "err", err) + return err + } + + if err = r.bundleOrm.UpdateFinalizeTxHashAndRollupStatus(r.ctx, bundle.Hash, txHash.String(), types.RollupFinalizing, dbTX); err != nil { + log.Warn("UpdateFinalizeTxHashAndRollupStatus failed", "index", bundle.Index, "bundle hash", bundle.Hash, "tx hash", txHash.String(), "err", err) + return err + } + + return nil + }) + + if err != nil { + log.Warn("failed to update rollup status of bundle and batches", "err", err) return err } diff --git a/rollup/internal/orm/batch.go b/rollup/internal/orm/batch.go index 6f909f2962..d965015eb0 100644 --- a/rollup/internal/orm/batch.go +++ b/rollup/internal/orm/batch.go @@ -347,11 +347,11 @@ func (o *Batch) UpdateProvingStatus(ctx context.Context, hash string, status typ switch status { case types.ProvingTaskAssigned: - updateFields["prover_assigned_at"] = time.Now() + updateFields["prover_assigned_at"] = utils.NowUTC() case types.ProvingTaskUnassigned: updateFields["prover_assigned_at"] = nil case types.ProvingTaskVerified: - updateFields["proved_at"] = time.Now() + updateFields["proved_at"] = utils.NowUTC() } db := o.db @@ -419,7 +419,7 @@ func (o *Batch) UpdateFinalizeTxHashAndRollupStatus(ctx context.Context, hash st updateFields["finalize_tx_hash"] = finalizeTxHash updateFields["rollup_status"] = int(status) if status == types.RollupFinalized { - updateFields["finalized_at"] = time.Now() + updateFields["finalized_at"] = utils.NowUTC() } db := o.db.WithContext(ctx) @@ -478,11 +478,11 @@ func (o *Batch) UpdateProvingStatusByBundleHash(ctx context.Context, bundleHash switch status { case types.ProvingTaskAssigned: - updateFields["prover_assigned_at"] = time.Now() + updateFields["prover_assigned_at"] = utils.NowUTC() case types.ProvingTaskUnassigned: updateFields["prover_assigned_at"] = nil case types.ProvingTaskVerified: - updateFields["proved_at"] = time.Now() + updateFields["proved_at"] = utils.NowUTC() } db := o.db diff --git a/rollup/internal/orm/bundle.go b/rollup/internal/orm/bundle.go index 80825dde23..54f7cf9fd3 100644 --- a/rollup/internal/orm/bundle.go +++ b/rollup/internal/orm/bundle.go @@ -194,7 +194,7 @@ func (o *Bundle) UpdateFinalizeTxHashAndRollupStatus(ctx context.Context, hash s updateFields["finalize_tx_hash"] = finalizeTxHash updateFields["rollup_status"] = int(status) if status == types.RollupFinalized { - updateFields["finalized_at"] = time.Now() + updateFields["finalized_at"] = utils.NowUTC() } db := o.db @@ -218,7 +218,7 @@ func (o *Bundle) UpdateProvingStatus(ctx context.Context, hash string, status ty switch status { case types.ProvingTaskVerified: - updateFields["proved_at"] = time.Now() + updateFields["proved_at"] = utils.NowUTC() } db := o.db @@ -241,7 +241,7 @@ func (o *Bundle) UpdateRollupStatus(ctx context.Context, hash string, status typ updateFields := make(map[string]interface{}) updateFields["rollup_status"] = int(status) if status == types.RollupFinalized { - updateFields["finalized_at"] = time.Now() + updateFields["finalized_at"] = utils.NowUTC() } db := o.db.WithContext(ctx) diff --git a/rollup/internal/orm/chunk.go b/rollup/internal/orm/chunk.go index 893290f757..d195fbbb44 100644 --- a/rollup/internal/orm/chunk.go +++ b/rollup/internal/orm/chunk.go @@ -11,8 +11,9 @@ import ( "gorm.io/gorm" "scroll-tech/common/types" + "scroll-tech/common/utils" - "scroll-tech/rollup/internal/utils" + rutils "scroll-tech/rollup/internal/utils" ) // Chunk represents a chunk of blocks in the database. @@ -177,7 +178,7 @@ func (o *Chunk) GetChunksByBatchHash(ctx context.Context, batchHash string) ([]* } // InsertChunk inserts a new chunk into the database. -func (o *Chunk) InsertChunk(ctx context.Context, chunk *encoding.Chunk, codecVersion encoding.CodecVersion, metrics utils.ChunkMetrics, dbTX ...*gorm.DB) (*Chunk, error) { +func (o *Chunk) InsertChunk(ctx context.Context, chunk *encoding.Chunk, codecVersion encoding.CodecVersion, metrics rutils.ChunkMetrics, dbTX ...*gorm.DB) (*Chunk, error) { if chunk == nil || len(chunk.Blocks) == 0 { return nil, errors.New("invalid args") } @@ -202,7 +203,7 @@ func (o *Chunk) InsertChunk(ctx context.Context, chunk *encoding.Chunk, codecVer parentChunkStateRoot = parentChunk.StateRoot } - chunkHash, err := utils.GetChunkHash(chunk, totalL1MessagePoppedBefore, codecVersion) + chunkHash, err := rutils.GetChunkHash(chunk, totalL1MessagePoppedBefore, codecVersion) if err != nil { log.Error("failed to get chunk hash", "err", err) return nil, fmt.Errorf("Chunk.InsertChunk error: %w", err) @@ -261,11 +262,11 @@ func (o *Chunk) UpdateProvingStatus(ctx context.Context, hash string, status typ switch status { case types.ProvingTaskAssigned: - updateFields["prover_assigned_at"] = time.Now() + updateFields["prover_assigned_at"] = utils.NowUTC() case types.ProvingTaskUnassigned: updateFields["prover_assigned_at"] = nil case types.ProvingTaskVerified: - updateFields["proved_at"] = time.Now() + updateFields["proved_at"] = utils.NowUTC() } db := o.db @@ -289,11 +290,11 @@ func (o *Chunk) UpdateProvingStatusByBatchHash(ctx context.Context, batchHash st switch status { case types.ProvingTaskAssigned: - updateFields["prover_assigned_at"] = time.Now() + updateFields["prover_assigned_at"] = utils.NowUTC() case types.ProvingTaskUnassigned: updateFields["prover_assigned_at"] = nil case types.ProvingTaskVerified: - updateFields["proved_at"] = time.Now() + updateFields["proved_at"] = utils.NowUTC() } db := o.db