diff --git a/python/pyproject.toml b/python/pyproject.toml index e64e4a2ad..612e69e83 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -25,7 +25,7 @@ google-cloud-bigquery-storage = "^2.19.1" google-cloud-bigquery = "^3.10.0" prometheus-client = "^0.17.1" twisted = "^22.10.0" -grpclib = "^0.4.5" +grpclib = "^0.4.6" alembic = "^1.11.1" aptos-protos = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "aee306923da1fae533a91b4015e0a58443742d45", subdirectory = "protos/python" } python-json-logger = "^2.0.7" diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 1d5ced4b7..f0b5a9412 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -24,6 +24,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77c3a9648d43b9cd48db467b3f87fdd6e146bcc88ab0180006cef2179fe11d01" dependencies = [ "cfg-if", + "const-random", "getrandom", "once_cell", "serde", @@ -40,6 +41,27 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocative" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "082af274fd02beef17b7f0725a49ecafe6c075ef56cac9d6363eb3916a9817ae" +dependencies = [ + "allocative_derive", + "ctor", +] + +[[package]] +name = "allocative_derive" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe233a377643e0fc1a56421d7c90acdec45c291b30345eb9f08e8d0ddce5a4ab" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "android-tzdata" version = "0.1.1" @@ -311,6 +333,12 @@ version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9475866fec1451be56a3c2400fd081ff546538961565ccb5b7142cbd22bc7a51" +[[package]] +name = "base64ct" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" + [[package]] name = "bb8" version = "0.8.1" @@ -417,6 +445,19 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" +[[package]] +name = "canonical_json" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f89083fd014d71c47a718d7f4ac050864dac8587668dbe90baf9e261064c5710" +dependencies = [ + "hex", + "regex", + "serde", + "serde_json", + "thiserror", +] + [[package]] name = "cc" version = "1.0.83" @@ -435,9 +476,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.31" +version = "0.4.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" +checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401" dependencies = [ "android-tzdata", "iana-time-zone", @@ -445,7 +486,7 @@ dependencies = [ "num-traits", "serde", "wasm-bindgen", - "windows-targets 0.48.1", + "windows-targets 0.52.5", ] [[package]] @@ -504,6 +545,32 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "const-oid" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4c78c047431fee22c1a7bb92e00ad095a02a983affe4d8a72e2a2c62c1b94f3" + +[[package]] +name = "const-random" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359" +dependencies = [ + "const-random-macro", +] + +[[package]] +name = "const-random-macro" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" +dependencies = [ + "getrandom", + "once_cell", + "tiny-keccak", +] + [[package]] name = "cookie" version = "0.16.2" @@ -587,6 +654,16 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" +[[package]] +name = "crypto-bigint" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03c6a1d5fa1de37e071642dfa44ec552ca5b299adb128fab16138e24b548fd21" +dependencies = [ + "generic-array", + "subtle", +] + [[package]] name = "crypto-common" version = "0.1.6" @@ -597,6 +674,16 @@ dependencies = [ "typenum", ] +[[package]] +name = "ctor" +version = "0.1.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d2301688392eb071b0bf1a37be05c469d3cc4dbbd95df672fe28ab021e6a096" +dependencies = [ + "quote", + "syn 1.0.109", +] + [[package]] name = "debugid" version = "0.8.0" @@ -606,6 +693,17 @@ dependencies = [ "uuid", ] +[[package]] +name = "der" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6919815d73839e7ad218de758883aae3a257ba6759ce7a9992501efbb53d705c" +dependencies = [ + "const-oid", + "crypto-bigint", + "pem-rfc7468", +] + [[package]] name = "deranged" version = "0.3.11" @@ -613,6 +711,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" dependencies = [ "powerfmt", + "serde", ] [[package]] @@ -1095,6 +1194,36 @@ dependencies = [ "tracing", ] +[[package]] +name = "google-cloud-storage" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22c57ca1d971d7c6f852c02eda4e87e88b1247b6ed8be9fa5b2768c68b0f2ca5" +dependencies = [ + "async-stream", + "base64 0.21.2", + "bytes", + "futures-util", + "google-cloud-auth", + "google-cloud-metadata", + "google-cloud-token", + "hex", + "once_cell", + "percent-encoding", + "regex", + "reqwest", + "ring 0.16.20", + "rsa", + "serde", + "serde_json", + "sha2 0.10.8", + "thiserror", + "time", + "tokio", + "tracing", + "url", +] + [[package]] name = "google-cloud-token" version = "0.1.1" @@ -1123,6 +1252,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "half" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dd08c532ae367adf81c312a4580bc67f1d0fe8bc9c460520283f4c0ff277888" +dependencies = [ + "cfg-if", + "crunchy", + "num-traits", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -1399,6 +1539,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "integer-encoding" +version = "3.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" + [[package]] name = "io-lifetimes" version = "1.0.11" @@ -1527,6 +1673,9 @@ name = "lazy_static" version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +dependencies = [ + "spin 0.5.2", +] [[package]] name = "libc" @@ -1574,6 +1723,15 @@ version = "0.4.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4" +[[package]] +name = "lz4_flex" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5" +dependencies = [ + "twox-hash", +] + [[package]] name = "matchers" version = "0.1.0" @@ -1734,17 +1892,56 @@ dependencies = [ "winapi", ] +[[package]] +name = "num" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35bd024e8b2ff75562e5f34e7f4905839deb4b22955ef5e73d2fea1b9813cb23" +dependencies = [ + "num-bigint", + "num-complex", + "num-integer", + "num-iter", + "num-rational", + "num-traits", +] + [[package]] name = "num-bigint" -version = "0.4.4" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "608e7659b5c3d7cba262d894801b9ec9d00de989e8a82bd4bef91d08da45cdc0" +checksum = "c165a9ab64cf766f73521c0dd2cfdff64f488b8f0b3e621face3462d3db536d7" dependencies = [ - "autocfg", "num-integer", "num-traits", ] +[[package]] +name = "num-bigint-dig" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc84195820f291c7697304f3cbdadd1cb7199c0efc917ff5eafd71225c136151" +dependencies = [ + "byteorder", + "lazy_static", + "libm", + "num-integer", + "num-iter", + "num-traits", + "rand", + "smallvec", + "zeroize", +] + +[[package]] +name = "num-complex" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73f88a1307638156682bada9d7604135552957b7818057dcef22705b4d509495" +dependencies = [ + "num-traits", +] + [[package]] name = "num-conv" version = "0.1.0" @@ -1763,21 +1960,43 @@ dependencies = [ [[package]] name = "num-integer" +version = "0.1.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" +dependencies = [ + "num-traits", +] + +[[package]] +name = "num-iter" version = "0.1.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9" +checksum = "1429034a0490724d0075ebb2bc9e875d6503c3cf69e235a8941aa757d83ef5bf" dependencies = [ "autocfg", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-rational" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f83d14da390562dca69fc84082e73e548e1ad308d24accdedd2720017cb37824" +dependencies = [ + "num-bigint", + "num-integer", "num-traits", ] [[package]] name = "num-traits" -version = "0.2.17" +version = "0.2.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" dependencies = [ "autocfg", + "libm", ] [[package]] @@ -1855,6 +2074,15 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "ordered-float" +version = "2.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68f19d67e5a2795c94e73e0bb1cc1a7edeb2e28efd39e2e1c9b7a40c1108b11c" +dependencies = [ + "num-traits", +] + [[package]] name = "overload" version = "0.1.1" @@ -1884,6 +2112,46 @@ dependencies = [ "windows-targets 0.48.1", ] +[[package]] +name = "parquet" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29c3b5322cc1bbf67f11c079c42be41a55949099b78732f7dba9e15edde40eab" +dependencies = [ + "ahash", + "bytes", + "chrono", + "futures", + "half", + "hashbrown 0.14.0", + "lz4_flex", + "num", + "num-bigint", + "paste", + "seq-macro", + "thrift", + "tokio", + "twox-hash", +] + +[[package]] +name = "parquet_derive" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05a70674ad0e9e49f583a03e477c23cc0116cc49a001c52178f00fb25eb0a882" +dependencies = [ + "parquet", + "proc-macro2", + "quote", + "syn 2.0.48", +] + +[[package]] +name = "paste" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" + [[package]] name = "pbjson" version = "0.5.1" @@ -1903,6 +2171,15 @@ dependencies = [ "base64 0.13.1", ] +[[package]] +name = "pem-rfc7468" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01de5d978f34aa4b2296576379fcc416034702fd94117c56ffd8a1a767cefb30" +dependencies = [ + "base64ct", +] + [[package]] name = "percent-encoding" version = "2.3.0" @@ -1959,6 +2236,28 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkcs1" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a78f66c04ccc83dd4486fd46c33896f4e17b24a7a3a6400dedc48ed0ddd72320" +dependencies = [ + "der", + "pkcs8", + "zeroize", +] + +[[package]] +name = "pkcs8" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cabda3fb821068a9a4fab19a683eac3af12edf0f34b94a8be53c4972b8149d0" +dependencies = [ + "der", + "spki", + "zeroize", +] + [[package]] name = "pkg-config" version = "0.3.27" @@ -2065,6 +2364,8 @@ name = "processor" version = "1.0.0" dependencies = [ "ahash", + "allocative", + "allocative_derive", "anyhow", "aptos-moving-average", "aptos-protos", @@ -2072,6 +2373,7 @@ dependencies = [ "bcs", "bigdecimal", "bitflags 2.5.0", + "canonical_json", "chrono", "clap", "diesel", @@ -2083,14 +2385,19 @@ dependencies = [ "futures-util", "google-cloud-googleapis", "google-cloud-pubsub", + "google-cloud-storage", "hex", + "hyper", "itertools 0.12.1", "jemallocator", "kanal", "lazy_static", "native-tls", + "num", "num_cpus", "once_cell", + "parquet", + "parquet_derive", "postgres-native-tls", "prometheus", "prost 0.12.3", @@ -2108,6 +2415,7 @@ dependencies = [ "tracing", "unescape", "url", + "uuid", ] [[package]] @@ -2343,6 +2651,7 @@ dependencies = [ "js-sys", "log", "mime", + "mime_guess", "native-tls", "once_cell", "percent-encoding", @@ -2401,6 +2710,26 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rsa" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cf22754c49613d2b3b119f0e5d46e34a2c628a937e3024b8762de4e7d8c710b" +dependencies = [ + "byteorder", + "digest 0.10.7", + "num-bigint-dig", + "num-integer", + "num-iter", + "num-traits", + "pkcs1", + "pkcs8", + "rand_core", + "smallvec", + "subtle", + "zeroize", +] + [[package]] name = "rstack" version = "0.3.3" @@ -2577,9 +2906,9 @@ checksum = "dc31bd9b61a32c31f9650d18add92aa83a49ba979c143eefd27fe7177b05bd5f" [[package]] name = "ryu" -version = "1.0.14" +version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe232bdf6be8c8de797b22184ee71118d63780ea42ac85b61d1baa6d3b782ae9" +checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" [[package]] name = "schannel" @@ -2645,6 +2974,12 @@ dependencies = [ "libc", ] +[[package]] +name = "seq-macro" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" + [[package]] name = "serde" version = "1.0.193" @@ -2860,12 +3195,28 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +[[package]] +name = "spki" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44d01ac02a6ccf3e07db148d2be087da624fea0221a16152ed01f0496a6b0a27" +dependencies = [ + "base64ct", + "der", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "str_stack" version = "0.1.0" @@ -3012,6 +3363,17 @@ dependencies = [ "once_cell", ] +[[package]] +name = "thrift" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09" +dependencies = [ + "byteorder", + "integer-encoding", + "ordered-float", +] + [[package]] name = "time" version = "0.3.36" @@ -3456,6 +3818,16 @@ dependencies = [ "utf-8", ] +[[package]] +name = "twox-hash" +version = "1.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" +dependencies = [ + "cfg-if", + "static_assertions", +] + [[package]] name = "typenum" version = "1.16.0" @@ -3545,6 +3917,9 @@ name = "uuid" version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0" +dependencies = [ + "getrandom", +] [[package]] name = "valuable" @@ -3974,9 +4349,9 @@ dependencies = [ [[package]] name = "zstd-sys" -version = "2.0.10+zstd.1.5.6" +version = "2.0.9+zstd.1.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c253a4914af5bafc8fa8c86ee400827e83cf6ec01195ec1f1ed8441bf00d65aa" +checksum = "9e16efa8a874a0481a574084d34cc26fdb3b99627480f785888deb6386506656" dependencies = [ "cc", "pkg-config", diff --git a/rust/Cargo.toml b/rust/Cargo.toml index fc4270a3f..6443c7a8c 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -57,7 +57,6 @@ futures-util = "0.3.21" gcloud-sdk = { version = "0.20.4", features = [ "google-cloud-bigquery-storage-v1", ] } -cloud-storage = { version = "0.11.1", features = ["global-client"] } google-cloud-googleapis = "0.10.0" google-cloud-pubsub = "0.18.0" hex = "0.4.3" @@ -110,3 +109,14 @@ warp = { version = "0.3.5", features = ["tls"] } native-tls = "0.2.11" postgres-native-tls = "0.5.0" tokio-postgres = "0.7.10" + +# Parquet support +parquet = { version = "52.0.0", default-features = false, features = ["async", "lz4"] } +num = "0.4.0" +google-cloud-storage = "0.13.0" +hyper = { version = "0.14.18", features = ["full"] } +parquet_derive = { version = "52.0.0" } +canonical_json = "0.5.0" +allocative = "0.3.3" +allocative_derive = "0.3.3" +uuid = { version = "1.8.0", features = ["v4"] } diff --git a/rust/indexer-metrics/src/main.rs b/rust/indexer-metrics/src/main.rs index 9296ab423..7c60c2c5b 100644 --- a/rust/indexer-metrics/src/main.rs +++ b/rust/indexer-metrics/src/main.rs @@ -155,11 +155,14 @@ async fn start_processor_status_fetch(url: String, chain_name: String) { .set(processor.last_success_version as i64); HASURA_API_LATEST_VERSION_TIMESTAMP .with_label_values(&[&processor.processor, &chain_name]) - .set(processor.last_updated.timestamp_micros() as f64 * 1e-6); + .set(processor.last_updated.and_utc().timestamp_micros() as f64 * 1e-6); HASURA_API_LATEST_TRANSACTION_TIMESTAMP .with_label_values(&[&processor.processor, &chain_name]) .set( - processor.last_transaction_timestamp.timestamp_micros() as f64 + processor + .last_transaction_timestamp + .and_utc() + .timestamp_micros() as f64 * 1e-6, ); let latency = system_time_now - processor.last_transaction_timestamp; diff --git a/rust/moving-average/src/lib.rs b/rust/moving-average/src/lib.rs index 847dd06ae..826949de5 100644 --- a/rust/moving-average/src/lib.rs +++ b/rust/moving-average/src/lib.rs @@ -15,7 +15,7 @@ pub struct MovingAverage { impl MovingAverage { pub fn new(window_millis: u64) -> Self { - let now = chrono::Utc::now().naive_utc().timestamp_millis() as u64; + let now = chrono::Utc::now().naive_utc().and_utc().timestamp_millis() as u64; let mut queue = VecDeque::new(); queue.push_back((now, 0)); Self { @@ -26,7 +26,7 @@ impl MovingAverage { } pub fn tick_now(&mut self, value: u64) { - let now = chrono::Utc::now().naive_utc().timestamp_millis() as u64; + let now = chrono::Utc::now().naive_utc().and_utc().timestamp_millis() as u64; self.tick(now, value); } diff --git a/rust/parquet-bq-scripts/move_resources-create.sql b/rust/parquet-bq-scripts/move_resources-create.sql index 055dca9c0..67197dd02 100644 --- a/rust/parquet-bq-scripts/move_resources-create.sql +++ b/rust/parquet-bq-scripts/move_resources-create.sql @@ -10,7 +10,7 @@ CREATE TABLE `{}` fun STRING, is_deleted BOOL, generic_type_params STRING, - data STRING, + data STRING, state_key_hash STRING, bq_inserted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP(), diff --git a/rust/processor/Cargo.toml b/rust/processor/Cargo.toml index 07c923433..92a967057 100644 --- a/rust/processor/Cargo.toml +++ b/rust/processor/Cargo.toml @@ -62,6 +62,18 @@ tokio-postgres = { workspace = true } [target.'cfg(unix)'.dependencies] jemallocator = { workspace = true } +# Parquet support +parquet = { workspace = true } +num = { workspace = true } +google-cloud-storage = { workspace = true } +hyper = { workspace = true } +lazy_static = { workspace = true } +parquet_derive = { workspace = true } +canonical_json = { workspace = true } +allocative = { workspace = true } +allocative_derive = { workspace = true } +uuid = { workspace = true } + [features] libpq = ["diesel/postgres"] # When using the default features we enable the diesel/postgres feature. We configure diff --git a/rust/processor/parser.yaml b/rust/processor/parser.yaml index 37d1c73b6..25ed06b54 100644 --- a/rust/processor/parser.yaml +++ b/rust/processor/parser.yaml @@ -7,4 +7,4 @@ server_config: type: default_processor postgres_connection_string: postgresql://postgres:@localhost:5432/default_processor indexer_grpc_data_service_address: http://127.0.0.1:50051 - auth_token: AUTH_TOKEN + auth_token: AUTH_TOKEN \ No newline at end of file diff --git a/rust/processor/src/bq_analytics/gcs_handler.rs b/rust/processor/src/bq_analytics/gcs_handler.rs new file mode 100644 index 000000000..c038152a6 --- /dev/null +++ b/rust/processor/src/bq_analytics/gcs_handler.rs @@ -0,0 +1,121 @@ +use crate::bq_analytics::ParquetProcessorError; +use anyhow::{anyhow, Result}; +use chrono::{Datelike, Timelike}; +use google_cloud_storage::{ + client::Client as GCSClient, + http::objects::upload::{Media, UploadObjectRequest, UploadType}, +}; +use hyper::Body; +use std::path::PathBuf; +use tokio::io::AsyncReadExt; // for read_to_end() +use tokio::{ + fs::File as TokioFile, + time::{sleep, timeout, Duration}, +}; +use tracing::{debug, error, info}; +const BUCKET_REGULAR_TRAFFIC: &str = "devnet-airflow-continue"; +const MAX_RETRIES: usize = 3; +const INITIAL_DELAY_MS: u64 = 500; +const TIMEOUT_SECONDS: u64 = 300; +pub async fn upload_parquet_to_gcs( + client: &GCSClient, + file_path: &PathBuf, + table_name: &str, + bucket_name: &str, +) -> Result<(), ParquetProcessorError> { + let mut file = TokioFile::open(&file_path) + .await + .map_err(|e| anyhow!("Failed to open file for reading: {}", e))?; + + let mut buffer = Vec::new(); + file.read_to_end(&mut buffer) + .await + .map_err(|e| anyhow!("Failed to read file: {}", e))?; + + if buffer.is_empty() { + error!("The file is empty and has no data to upload.",); + return Err(ParquetProcessorError::Other( + "The file is empty and has no data to upload.".to_string(), + )); + } + + let now = chrono::Utc::now(); + let start_of_month = now + .with_day(1) + .unwrap() + .with_hour(0) + .unwrap() + .with_minute(0) + .unwrap() + .with_second(0) + .unwrap() + .with_nanosecond(0) + .unwrap(); + let highwater_s = start_of_month.timestamp_millis(); + let highwater_ms = now.timestamp_millis(); + let counter = 0; // THIS NEED TO BE REPLACED OR REIMPLEMENTED WITH AN ACTUAL LOGIC TO ENSURE FILE UNIQUENESS. + let object_name: PathBuf = generate_parquet_file_path( + BUCKET_REGULAR_TRAFFIC, + table_name, + highwater_s, + highwater_ms, + counter, + ); + + let file_name = object_name.to_str().unwrap().to_owned(); + let upload_type: UploadType = UploadType::Simple(Media::new(file_name.clone())); + + let upload_request = UploadObjectRequest { + bucket: bucket_name.to_string(), + ..Default::default() + }; + + let mut retry_count = 0; + let mut delay = INITIAL_DELAY_MS; + + loop { + let data = Body::from(buffer.clone()); + let upload_result = timeout( + Duration::from_secs(TIMEOUT_SECONDS), + client.upload_object(&upload_request, data, &upload_type), + ) + .await; + + match upload_result { + Ok(Ok(result)) => { + info!("File uploaded successfully to GCS: {}", result.name); + return Ok(()); + }, + Ok(Err(e)) => { + error!("Failed to upload file to GCS: {}", e); + if retry_count >= MAX_RETRIES { + return Err(ParquetProcessorError::StorageError(e)); + } + }, + Err(e) => { + error!("Upload timed out: {}", e); + if retry_count >= MAX_RETRIES { + return Err(ParquetProcessorError::TimeoutError(e)); + } + }, + } + + retry_count += 1; + sleep(Duration::from_millis(delay)).await; + delay *= 2; + debug!("Retrying upload operation. Retry count: {}", retry_count); + } +} + +fn generate_parquet_file_path( + gcs_bucket_root: &str, + table: &str, + highwater_s: i64, + highwater_ms: i64, + counter: u32, +) -> PathBuf { + PathBuf::from(format!( + "{}/{}/{}/{}_{}.parquet", + gcs_bucket_root, table, highwater_s, highwater_ms, counter + )) +} diff --git a/rust/processor/src/bq_analytics/generic_parquet_processor.rs b/rust/processor/src/bq_analytics/generic_parquet_processor.rs new file mode 100644 index 000000000..f10c21259 --- /dev/null +++ b/rust/processor/src/bq_analytics/generic_parquet_processor.rs @@ -0,0 +1,241 @@ +use super::ParquetProcessingResult; +use crate::{ + bq_analytics::gcs_handler::upload_parquet_to_gcs, + gap_detectors::ProcessingResult, + utils::counters::{PARQUET_HANDLER_BUFFER_SIZE, PARQUET_STRUCT_SIZE}, +}; +use ahash::AHashMap; +use allocative::Allocative; +use anyhow::{anyhow, Result}; +use google_cloud_storage::client::Client as GCSClient; +use parquet::{ + file::{properties::WriterProperties, writer::SerializedFileWriter}, + record::RecordWriter, + schema::types::Type, +}; +use std::{ + fs::{remove_file, rename, File}, + path::PathBuf, + sync::Arc, +}; +use tracing::{debug, error}; +use uuid::Uuid; + +#[derive(Debug, Default, Clone)] +pub struct ParquetDataGeneric { + pub data: Vec, + pub first_txn_version: u64, + pub last_txn_version: u64, + pub last_transaction_timestamp: Option, + pub transaction_version_to_struct_count: AHashMap, +} + +pub trait NamedTable { + const TABLE_NAME: &'static str; +} + +pub trait HasVersion { + fn version(&self) -> i64; +} + +pub trait HasParquetSchema { + fn schema() -> Arc; +} + +/// Auto-implement this for all types that implement `Default` and `RecordWriter` +impl HasParquetSchema for ParquetType +where + ParquetType: std::fmt::Debug + Default + Sync + Send, + for<'a> &'a [ParquetType]: RecordWriter, +{ + fn schema() -> Arc { + let example: Self = Default::default(); + [example].as_slice().schema().unwrap() + } +} + +pub struct ParquetHandler +where + ParquetType: NamedTable + HasVersion + HasParquetSchema + 'static + Allocative, + for<'a> &'a [ParquetType]: RecordWriter, +{ + pub schema: Arc, + pub writer: SerializedFileWriter, + pub buffer: Vec, + pub buffer_size_bytes: usize, + + pub transaction_version_to_struct_count: AHashMap, + pub bucket_name: String, + pub gap_detector_sender: kanal::AsyncSender, + pub file_path: String, +} + +fn create_new_writer( + file_path: &str, + schema: Arc, +) -> Result> { + let props = WriterProperties::builder() + .set_compression(parquet::basic::Compression::LZ4) + .build(); + let props_arc = Arc::new(props); + let file: File = File::options() + .create(true) + .truncate(true) + .write(true) + .open(file_path)?; + + Ok(SerializedFileWriter::new( + file.try_clone()?, + schema, + props_arc, + )?) +} + +impl ParquetHandler +where + ParquetType: NamedTable + HasVersion + HasParquetSchema + 'static + Allocative, + for<'a> &'a [ParquetType]: RecordWriter, +{ + fn create_new_writer(&self) -> Result> { + let file_path = &self.file_path; + create_new_writer(file_path, self.schema.clone()) + } + + fn close_writer(&mut self) -> Result<()> { + let mut writer = self.create_new_writer()?; + std::mem::swap(&mut self.writer, &mut writer); + writer.close()?; + Ok(()) + } + + pub fn new( + bucket_name: String, + gap_detector_sender: kanal::AsyncSender, + schema: Arc, + ) -> Result { + // had to append unique id to avoid concurrent write issues + let file_path = format!("{}_{}.parquet", ParquetType::TABLE_NAME, Uuid::new_v4()); + let writer = create_new_writer(&file_path, schema.clone())?; + + Ok(Self { + writer, + buffer: Vec::new(), + buffer_size_bytes: 0, + transaction_version_to_struct_count: AHashMap::new(), + bucket_name, + gap_detector_sender, + schema, + file_path, + }) + } + + pub async fn handle( + &mut self, + gcs_client: &GCSClient, + changes: ParquetDataGeneric, + max_buffer_size: usize, + ) -> Result<()> { + let last_transaction_timestamp = changes.last_transaction_timestamp; + let parquet_structs = changes.data; + self.transaction_version_to_struct_count + .extend(changes.transaction_version_to_struct_count); + + for parquet_struct in parquet_structs { + let size_of_struct = allocative::size_of_unique(&parquet_struct); + PARQUET_STRUCT_SIZE + .with_label_values(&[ParquetType::TABLE_NAME]) + .set(size_of_struct as i64); + self.buffer_size_bytes += size_of_struct; + self.buffer.push(parquet_struct); + } + + // for now, it's okay to go little above the buffer_size, given that we will keep max size as 200 MB + if self.buffer_size_bytes >= max_buffer_size { + let start_version = self.buffer.first().unwrap().version(); + let end_version = self.buffer.last().unwrap().version(); + + let txn_version_to_struct_count = process_struct_count_map( + &self.buffer, + &mut self.transaction_version_to_struct_count, + ); + + let new_file_path: PathBuf = PathBuf::from(format!( + "{}_{}.parquet", + ParquetType::TABLE_NAME, + Uuid::new_v4() + )); + rename(&self.file_path, &new_file_path)?; // this fixes an issue with concurrent file access issues + + let struct_buffer = std::mem::take(&mut self.buffer); + + let mut row_group_writer = self.writer.next_row_group()?; + struct_buffer + .as_slice() + .write_to_row_group(&mut row_group_writer) + .unwrap(); + row_group_writer.close()?; + self.close_writer()?; + + debug!( + table_name = ParquetType::TABLE_NAME, + start_version = start_version, + end_version = end_version, + "Max buffer size reached, uploading to GCS." + ); + let upload_result = upload_parquet_to_gcs( + gcs_client, + &new_file_path, + ParquetType::TABLE_NAME, + &self.bucket_name, + ) + .await; + self.buffer_size_bytes = 0; + remove_file(&new_file_path)?; + + return match upload_result { + Ok(_) => { + let parquet_processing_result = ParquetProcessingResult { + start_version, + end_version, + last_transaction_timestamp: last_transaction_timestamp.clone(), + txn_version_to_struct_count, + }; + + self.gap_detector_sender + .send(ProcessingResult::ParquetProcessingResult( + parquet_processing_result, + )) + .await + .expect("[Parser] Failed to send versions to gap detector"); + Ok(()) + }, + Err(e) => { + error!("Failed to upload file to GCS: {}", e); + Err(anyhow!("Failed to upload file to GCS: {}", e)) + }, + }; + } + + PARQUET_HANDLER_BUFFER_SIZE + .with_label_values(&[ParquetType::TABLE_NAME]) + .set(self.buffer.len() as i64); + Ok(()) + } +} + +fn process_struct_count_map( + buffer: &[ParquetType], + txn_version_to_struct_count: &mut AHashMap, +) -> AHashMap { + let mut txn_version_to_struct_count_for_gap_detector = AHashMap::new(); + + for item in buffer.iter() { + let version = item.version(); + + if let Some(count) = txn_version_to_struct_count.get(&(version)) { + txn_version_to_struct_count_for_gap_detector.insert(version, *count); + txn_version_to_struct_count.remove(&(version)); + } + } + txn_version_to_struct_count_for_gap_detector +} diff --git a/rust/processor/src/bq_analytics/mod.rs b/rust/processor/src/bq_analytics/mod.rs new file mode 100644 index 000000000..2bb132fca --- /dev/null +++ b/rust/processor/src/bq_analytics/mod.rs @@ -0,0 +1,68 @@ +pub mod gcs_handler; +pub mod generic_parquet_processor; +pub mod parquet_handler; + +use ahash::AHashMap; +use google_cloud_storage::http::Error as StorageError; +use serde::{Deserialize, Serialize}; +use std::fmt::{Debug, Display, Formatter, Result as FormatResult}; +use tokio::io; + +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] +pub struct ParquetProcessingResult { + pub start_version: i64, + pub end_version: i64, + pub last_transaction_timestamp: Option, + pub txn_version_to_struct_count: AHashMap, +} + +#[derive(Debug)] +pub enum ParquetProcessorError { + ParquetError(parquet::errors::ParquetError), + StorageError(StorageError), + TimeoutError(tokio::time::error::Elapsed), + IoError(io::Error), + Other(String), +} + +impl std::error::Error for ParquetProcessorError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match *self { + ParquetProcessorError::ParquetError(ref err) => Some(err), + ParquetProcessorError::StorageError(ref err) => Some(err), + ParquetProcessorError::TimeoutError(ref err) => Some(err), + ParquetProcessorError::IoError(ref err) => Some(err), + ParquetProcessorError::Other(_) => None, + } + } +} + +impl Display for ParquetProcessorError { + fn fmt(&self, f: &mut Formatter<'_>) -> FormatResult { + match *self { + ParquetProcessorError::ParquetError(ref err) => write!(f, "Parquet error: {}", err), + ParquetProcessorError::StorageError(ref err) => write!(f, "Storage error: {}", err), + ParquetProcessorError::TimeoutError(ref err) => write!(f, "Timeout error: {}", err), + ParquetProcessorError::IoError(ref err) => write!(f, "IO error: {}", err), + ParquetProcessorError::Other(ref desc) => write!(f, "Error: {}", desc), + } + } +} + +impl From for ParquetProcessorError { + fn from(err: std::io::Error) -> Self { + ParquetProcessorError::IoError(err) + } +} + +impl From for ParquetProcessorError { + fn from(err: anyhow::Error) -> Self { + ParquetProcessorError::Other(err.to_string()) + } +} + +impl From for ParquetProcessorError { + fn from(err: parquet::errors::ParquetError) -> Self { + ParquetProcessorError::ParquetError(err) + } +} diff --git a/rust/processor/src/bq_analytics/parquet_handler.rs b/rust/processor/src/bq_analytics/parquet_handler.rs new file mode 100644 index 000000000..785bdb8bd --- /dev/null +++ b/rust/processor/src/bq_analytics/parquet_handler.rs @@ -0,0 +1,80 @@ +use crate::{ + bq_analytics::generic_parquet_processor::{ + HasParquetSchema, HasVersion, NamedTable, ParquetDataGeneric, + ParquetHandler as GenericParquetHandler, + }, + gap_detectors::ProcessingResult, + worker::PROCESSOR_SERVICE_TYPE, +}; +use allocative::Allocative; +use google_cloud_storage::client::{Client as GCSClient, ClientConfig as GcsClientConfig}; +use kanal::AsyncSender; +use parquet::record::RecordWriter; +use std::sync::Arc; +use tracing::{debug, error, info}; + +pub fn create_parquet_handler_loop( + new_gap_detector_sender: AsyncSender, + processor_name: &str, + bucket_name: String, + parquet_handler_response_channel_size: usize, + max_buffer_size: usize, +) -> AsyncSender> +where + ParquetType: NamedTable + HasVersion + HasParquetSchema + Send + Sync + 'static + Allocative, + for<'a> &'a [ParquetType]: RecordWriter, +{ + let processor_name = processor_name.to_owned(); + + let (parquet_sender, parquet_receiver) = kanal::bounded_async::>( + parquet_handler_response_channel_size, + ); + + debug!( + processor_name = processor_name.clone(), + service_type = PROCESSOR_SERVICE_TYPE, + "[Parquet Handler] Starting parquet handler loop", + ); + + let mut parquet_manager = GenericParquetHandler::new( + bucket_name.clone(), + new_gap_detector_sender.clone(), + ParquetType::schema(), + ) + .expect("Failed to create parquet manager"); + + tokio::spawn(async move { + let gcs_config = GcsClientConfig::default() + .with_auth() + .await + .expect("Failed to create GCS client config"); + let gcs_client = Arc::new(GCSClient::new(gcs_config)); + + loop { + let txn_pb_res = parquet_receiver.recv().await.unwrap(); // handle error properly + + let result = parquet_manager + .handle(&gcs_client, txn_pb_res, max_buffer_size) + .await; + match result { + Ok(_) => { + info!( + processor_name = processor_name.clone(), + service_type = PROCESSOR_SERVICE_TYPE, + "[Parquet Handler] Successfully processed parquet files", + ); + }, + Err(e) => { + error!( + processor_name = processor_name.clone(), + service_type = PROCESSOR_SERVICE_TYPE, + "[Parquet Handler] Error processing parquet files: {:?}", + e + ); + }, + } + } + }); + + parquet_sender +} diff --git a/rust/processor/src/config.rs b/rust/processor/src/config.rs index 9412f357a..785130334 100644 --- a/rust/processor/src/config.rs +++ b/rust/processor/src/config.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - gap_detector::DEFAULT_GAP_DETECTION_BATCH_SIZE, processors::ProcessorConfig, + gap_detectors::DEFAULT_GAP_DETECTION_BATCH_SIZE, processors::ProcessorConfig, transaction_filter::TransactionFilter, worker::Worker, }; use ahash::AHashMap; @@ -36,6 +36,9 @@ pub struct IndexerGrpcProcessorConfig { // Maximum number of batches "missing" before we assume we have an issue with gaps and abort #[serde(default = "IndexerGrpcProcessorConfig::default_gap_detection_batch_size")] pub gap_detection_batch_size: u64, + // Maximum number of batches "missing" before we assume we have an issue with gaps and abort + #[serde(default = "IndexerGrpcProcessorConfig::default_gap_detection_batch_size")] + pub parquet_gap_detection_batch_size: u64, // Number of protobuff transactions to send per chunk to the processor tasks #[serde(default = "IndexerGrpcProcessorConfig::default_pb_channel_txn_chunk_size")] pub pb_channel_txn_chunk_size: usize, @@ -93,6 +96,7 @@ impl RunnableConfig for IndexerGrpcProcessorConfig { self.number_concurrent_processing_tasks, self.db_pool_size, self.gap_detection_batch_size, + self.parquet_gap_detection_batch_size, self.pb_channel_txn_chunk_size, self.per_table_chunk_sizes.clone(), self.enable_verbose_logging, diff --git a/rust/processor/src/db/common/models/coin_models/coin_activities.rs b/rust/processor/src/db/common/models/coin_models/coin_activities.rs index de81cd49c..e1f1cdf1d 100644 --- a/rust/processor/src/db/common/models/coin_models/coin_activities.rs +++ b/rust/processor/src/db/common/models/coin_models/coin_activities.rs @@ -124,6 +124,7 @@ impl CoinActivity { .as_ref() .expect("Transaction timestamp doesn't exist!") .seconds; + #[allow(deprecated)] let txn_timestamp = NaiveDateTime::from_timestamp_opt(txn_timestamp, 0).expect("Txn Timestamp is invalid!"); diff --git a/rust/processor/src/db/common/models/default_models/mod.rs b/rust/processor/src/db/common/models/default_models/mod.rs index 22f3722d4..d3d54d58f 100644 --- a/rust/processor/src/db/common/models/default_models/mod.rs +++ b/rust/processor/src/db/common/models/default_models/mod.rs @@ -7,3 +7,9 @@ pub mod move_resources; pub mod move_tables; pub mod transactions; pub mod write_set_changes; + +// parquet models +pub mod parquet_move_resources; +pub mod parquet_move_tables; +pub mod parquet_transactions; +pub mod parquet_write_set_changes; diff --git a/rust/processor/src/db/common/models/default_models/parquet_move_resources.rs b/rust/processor/src/db/common/models/default_models/parquet_move_resources.rs new file mode 100644 index 000000000..134127add --- /dev/null +++ b/rust/processor/src/db/common/models/default_models/parquet_move_resources.rs @@ -0,0 +1,153 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +#![allow(clippy::extra_unused_lifetimes)] + +use crate::{ + bq_analytics::generic_parquet_processor::{HasVersion, NamedTable}, + utils::util::standardize_address, +}; +use allocative_derive::Allocative; +use anyhow::{Context, Result}; +use aptos_protos::transaction::v1::{ + DeleteResource, MoveStructTag as MoveStructTagPB, WriteResource, +}; +use field_count::FieldCount; +use parquet_derive::ParquetRecordWriter; +use serde::{Deserialize, Serialize}; + +#[derive( + Allocative, Clone, Debug, Default, Deserialize, FieldCount, Serialize, ParquetRecordWriter, +)] +pub struct MoveResource { + pub txn_version: i64, + pub write_set_change_index: i64, + pub block_height: i64, + #[allocative(skip)] + pub block_timestamp: chrono::NaiveDateTime, + pub resource_address: String, + pub resource_type: String, + pub module: String, + pub fun: String, + pub is_deleted: bool, + pub generic_type_params: Option, + pub data: Option, + pub state_key_hash: String, +} + +impl NamedTable for MoveResource { + const TABLE_NAME: &'static str = "move_resources"; +} + +impl HasVersion for MoveResource { + fn version(&self) -> i64 { + self.txn_version + } +} + +pub struct MoveStructTag { + resource_address: String, + pub module: String, + pub fun: String, + pub generic_type_params: Option, +} + +impl MoveResource { + pub fn from_write_resource( + write_resource: &WriteResource, + write_set_change_index: i64, + txn_version: i64, + block_height: i64, + block_timestamp: chrono::NaiveDateTime, + ) -> Self { + let parsed_data = Self::convert_move_struct_tag( + write_resource + .r#type + .as_ref() + .expect("MoveStructTag Not Exists."), + ); + Self { + txn_version, + block_height, + write_set_change_index, + resource_type: write_resource.type_str.clone(), + fun: parsed_data.fun.clone(), + resource_address: standardize_address(&write_resource.address.to_string()), + module: parsed_data.module.clone(), + generic_type_params: parsed_data.generic_type_params, + data: Some(write_resource.data.clone()), + is_deleted: false, + state_key_hash: standardize_address( + hex::encode(write_resource.state_key_hash.as_slice()).as_str(), + ), + block_timestamp, + } + } + + pub fn from_delete_resource( + delete_resource: &DeleteResource, + write_set_change_index: i64, + txn_version: i64, + block_height: i64, + block_timestamp: chrono::NaiveDateTime, + ) -> Self { + let parsed_data = Self::convert_move_struct_tag( + delete_resource + .r#type + .as_ref() + .expect("MoveStructTag Not Exists."), + ); + Self { + txn_version, + block_height, + write_set_change_index, + resource_type: delete_resource.type_str.clone(), + fun: parsed_data.fun.clone(), + resource_address: standardize_address(&delete_resource.address.to_string()), + module: parsed_data.module.clone(), + generic_type_params: parsed_data.generic_type_params, + data: None, + is_deleted: true, + state_key_hash: standardize_address( + hex::encode(delete_resource.state_key_hash.as_slice()).as_str(), + ), + block_timestamp, + } + } + + pub fn convert_move_struct_tag(struct_tag: &MoveStructTagPB) -> MoveStructTag { + MoveStructTag { + resource_address: standardize_address(struct_tag.address.as_str()), + module: struct_tag.module.to_string(), + fun: struct_tag.name.to_string(), + generic_type_params: struct_tag + .generic_type_params + .iter() + .map(|move_type| -> Result> { + Ok(Some( + serde_json::to_string(move_type).context("Failed to parse move type")?, + )) + }) + .collect::>>() + .unwrap_or(None), + } + } + + pub fn get_outer_type_from_resource(write_resource: &WriteResource) -> String { + let move_struct_tag = + Self::convert_move_struct_tag(write_resource.r#type.as_ref().unwrap()); + + format!( + "{}::{}::{}", + move_struct_tag.get_address(), + move_struct_tag.module, + move_struct_tag.fun, + ) + } +} + +impl MoveStructTag { + pub fn get_address(&self) -> String { + standardize_address(self.resource_address.as_str()) + } +} diff --git a/rust/processor/src/db/common/models/default_models/parquet_move_tables.rs b/rust/processor/src/db/common/models/default_models/parquet_move_tables.rs new file mode 100644 index 000000000..014f00fef --- /dev/null +++ b/rust/processor/src/db/common/models/default_models/parquet_move_tables.rs @@ -0,0 +1,139 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +#![allow(clippy::extra_unused_lifetimes)] + +use crate::{ + bq_analytics::generic_parquet_processor::{HasVersion, NamedTable}, + utils::util::{hash_str, standardize_address}, +}; +use allocative_derive::Allocative; +use aptos_protos::transaction::v1::{DeleteTableItem, WriteTableItem}; +use field_count::FieldCount; +use parquet_derive::ParquetRecordWriter; +use serde::{Deserialize, Serialize}; + +#[derive( + Allocative, Clone, Debug, Default, Deserialize, FieldCount, Serialize, ParquetRecordWriter, +)] +pub struct TableItem { + pub txn_version: i64, + #[allocative(skip)] + pub block_timestamp: chrono::NaiveDateTime, + pub write_set_change_index: i64, + pub transaction_block_height: i64, + pub table_key: String, + pub table_handle: String, + pub decoded_key: String, + pub decoded_value: Option, + pub is_deleted: bool, +} + +impl NamedTable for TableItem { + const TABLE_NAME: &'static str = "table_items"; +} + +impl HasVersion for TableItem { + fn version(&self) -> i64 { + self.txn_version + } +} +#[derive(Clone, Debug, Deserialize, FieldCount, Serialize)] +pub struct CurrentTableItem { + pub table_handle: String, + pub key_hash: String, + pub key: String, + pub decoded_key: serde_json::Value, + pub decoded_value: Option, + pub last_transaction_version: i64, + pub is_deleted: bool, +} +#[derive(Clone, Debug, Deserialize, FieldCount, Serialize)] +pub struct TableMetadata { + pub handle: String, + pub key_type: String, + pub value_type: String, +} + +impl TableItem { + pub fn from_write_table_item( + write_table_item: &WriteTableItem, + write_set_change_index: i64, + txn_version: i64, + transaction_block_height: i64, + block_timestamp: chrono::NaiveDateTime, + ) -> (Self, CurrentTableItem) { + ( + Self { + txn_version, + write_set_change_index, + transaction_block_height, + table_key: write_table_item.key.to_string(), + table_handle: standardize_address(&write_table_item.handle.to_string()), + decoded_key: write_table_item.data.as_ref().unwrap().key.clone(), + decoded_value: Some(write_table_item.data.as_ref().unwrap().value.clone()), + is_deleted: false, + block_timestamp, + }, + CurrentTableItem { + table_handle: standardize_address(&write_table_item.handle.to_string()), + key_hash: hash_str(&write_table_item.key.to_string()), + key: write_table_item.key.to_string(), + decoded_key: serde_json::from_str( + write_table_item.data.as_ref().unwrap().key.as_str(), + ) + .unwrap(), + decoded_value: serde_json::from_str( + write_table_item.data.as_ref().unwrap().value.as_str(), + ) + .unwrap(), + last_transaction_version: txn_version, + is_deleted: false, + }, + ) + } + + pub fn from_delete_table_item( + delete_table_item: &DeleteTableItem, + write_set_change_index: i64, + txn_version: i64, + transaction_block_height: i64, + block_timestamp: chrono::NaiveDateTime, + ) -> (Self, CurrentTableItem) { + ( + Self { + txn_version, + write_set_change_index, + transaction_block_height, + table_key: delete_table_item.key.to_string(), + table_handle: standardize_address(&delete_table_item.handle.to_string()), + decoded_key: delete_table_item.data.as_ref().unwrap().key.clone(), + decoded_value: None, + is_deleted: true, + block_timestamp, + }, + CurrentTableItem { + table_handle: standardize_address(&delete_table_item.handle.to_string()), + key_hash: hash_str(&delete_table_item.key.to_string()), + key: delete_table_item.key.to_string(), + decoded_key: serde_json::from_str( + delete_table_item.data.as_ref().unwrap().key.as_str(), + ) + .unwrap(), + decoded_value: None, + last_transaction_version: txn_version, + is_deleted: true, + }, + ) + } +} + +impl TableMetadata { + pub fn from_write_table_item(table_item: &WriteTableItem) -> Self { + Self { + handle: table_item.handle.to_string(), + key_type: table_item.data.as_ref().unwrap().key_type.clone(), + value_type: table_item.data.as_ref().unwrap().value_type.clone(), + } + } +} diff --git a/rust/processor/src/db/common/models/default_models/parquet_transactions.rs b/rust/processor/src/db/common/models/default_models/parquet_transactions.rs new file mode 100644 index 000000000..7a18d621a --- /dev/null +++ b/rust/processor/src/db/common/models/default_models/parquet_transactions.rs @@ -0,0 +1,374 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +// This is required because a diesel macro makes clippy sad +#![allow(clippy::extra_unused_lifetimes)] +#![allow(clippy::unused_unit)] + +use super::{ + block_metadata_transactions::BlockMetadataTransaction, + parquet_write_set_changes::{WriteSetChangeDetail, WriteSetChangeModel}, +}; +use crate::{ + bq_analytics::generic_parquet_processor::{HasVersion, NamedTable}, + utils::{ + counters::PROCESSOR_UNKNOWN_TYPE_COUNT, + util::{get_clean_payload, get_clean_writeset, get_payload_type, standardize_address}, + }, +}; +use ahash::AHashMap; +use allocative_derive::Allocative; +use aptos_protos::transaction::v1::{ + transaction::{TransactionType, TxnData}, + Transaction as TransactionPB, TransactionInfo, +}; +use field_count::FieldCount; +use parquet_derive::ParquetRecordWriter; +use serde::{Deserialize, Serialize}; + +#[derive( + Allocative, Clone, Debug, Default, Deserialize, FieldCount, Serialize, ParquetRecordWriter, +)] +pub struct Transaction { + pub txn_version: i64, + pub block_height: i64, + pub epoch: i64, + pub txn_type: String, + pub payload: Option, + pub payload_type: Option, + pub gas_used: u64, + pub success: bool, + pub vm_status: String, + pub num_events: i64, + pub num_write_set_changes: i64, + pub txn_hash: String, + pub state_change_hash: String, + pub event_root_hash: String, + pub state_checkpoint_hash: Option, + pub accumulator_root_hash: String, + #[allocative(skip)] + pub block_timestamp: chrono::NaiveDateTime, +} + +impl NamedTable for Transaction { + const TABLE_NAME: &'static str = "transactions"; +} + +impl HasVersion for Transaction { + fn version(&self) -> i64 { + self.txn_version + } +} + +impl Transaction { + fn from_transaction_info( + info: &TransactionInfo, + txn_version: i64, + epoch: i64, + block_height: i64, + ) -> Self { + Self { + txn_version, + block_height, + txn_hash: standardize_address(hex::encode(info.hash.as_slice()).as_str()), + state_change_hash: standardize_address( + hex::encode(info.state_change_hash.as_slice()).as_str(), + ), + event_root_hash: standardize_address( + hex::encode(info.event_root_hash.as_slice()).as_str(), + ), + state_checkpoint_hash: info + .state_checkpoint_hash + .as_ref() + .map(|hash| standardize_address(hex::encode(hash).as_str())), + gas_used: info.gas_used, + success: info.success, + vm_status: info.vm_status.clone(), + accumulator_root_hash: standardize_address( + hex::encode(info.accumulator_root_hash.as_slice()).as_str(), + ), + num_write_set_changes: info.changes.len() as i64, + epoch, + ..Default::default() + } + } + + fn from_transaction_info_with_data( + info: &TransactionInfo, + payload: Option, + payload_type: Option, + txn_version: i64, + txn_type: String, + num_events: i64, + block_height: i64, + epoch: i64, + block_timestamp: chrono::NaiveDateTime, + ) -> Self { + Self { + txn_type, + payload, + txn_version, + block_height, + txn_hash: standardize_address(hex::encode(info.hash.as_slice()).as_str()), + state_change_hash: standardize_address( + hex::encode(info.state_change_hash.as_slice()).as_str(), + ), + event_root_hash: standardize_address( + hex::encode(info.event_root_hash.as_slice()).as_str(), + ), + state_checkpoint_hash: info + .state_checkpoint_hash + .as_ref() + .map(|hash| standardize_address(hex::encode(hash).as_str())), + gas_used: info.gas_used, + success: info.success, + vm_status: info.vm_status.clone(), + accumulator_root_hash: standardize_address( + hex::encode(info.accumulator_root_hash.as_slice()).as_str(), + ), + num_events, + num_write_set_changes: info.changes.len() as i64, + epoch, + payload_type, + block_timestamp, + } + } + + pub fn from_transaction( + transaction: &TransactionPB, + ) -> ( + Self, + Option, + Vec, + Vec, + ) { + let block_height = transaction.block_height as i64; + let epoch = transaction.epoch as i64; + let transaction_info = transaction + .info + .as_ref() + .expect("Transaction info doesn't exist!"); + let txn_data = match transaction.txn_data.as_ref() { + Some(txn_data) => txn_data, + None => { + PROCESSOR_UNKNOWN_TYPE_COUNT + .with_label_values(&["Transaction"]) + .inc(); + tracing::warn!( + transaction_version = transaction.version, + "Transaction data doesn't exist", + ); + let transaction_out = Self::from_transaction_info( + transaction_info, + transaction.version as i64, + epoch, + block_height, + ); + return (transaction_out, None, Vec::new(), Vec::new()); + }, + }; + let txn_version = transaction.version as i64; + let transaction_type = TransactionType::try_from(transaction.r#type) + .expect("Transaction type doesn't exist!") + .as_str_name() + .to_string(); + let timestamp = transaction + .timestamp + .as_ref() + .expect("Transaction timestamp doesn't exist!"); + #[allow(deprecated)] + let block_timestamp = chrono::NaiveDateTime::from_timestamp_opt(timestamp.seconds, 0) + .expect("Txn Timestamp is invalid!"); + match txn_data { + TxnData::User(user_txn) => { + let (wsc, wsc_detail) = WriteSetChangeModel::from_write_set_changes( + &transaction_info.changes, + txn_version, + block_height, + block_timestamp, + ); + let payload = user_txn + .request + .as_ref() + .expect("Getting user request failed.") + .payload + .as_ref() + .expect("Getting payload failed."); + let payload_cleaned = get_clean_payload(payload, txn_version); + let payload_type = get_payload_type(payload); + + // let serialized_payload = serde_json::to_string(&payload_cleaned).unwrap(); // Handle errors as needed) + let serialized_payload = + payload_cleaned.map(|payload| canonical_json::to_string(&payload).unwrap()); + ( + Self::from_transaction_info_with_data( + transaction_info, + serialized_payload, + Some(payload_type), + txn_version, + transaction_type, + user_txn.events.len() as i64, + block_height, + epoch, + block_timestamp, + ), + None, + wsc, + wsc_detail, + ) + }, + TxnData::Genesis(genesis_txn) => { + let (wsc, wsc_detail) = WriteSetChangeModel::from_write_set_changes( + &transaction_info.changes, + txn_version, + block_height, + block_timestamp, + ); + let payload = genesis_txn.payload.as_ref().unwrap(); + let payload_cleaned = get_clean_writeset(payload, txn_version); + // It's genesis so no big deal + // let serialized_payload = serde_json::to_string(&payload_cleaned).unwrap(); // Handle errors as needed + let serialized_payload = + payload_cleaned.map(|payload| canonical_json::to_string(&payload).unwrap()); + + let payload_type = None; + ( + Self::from_transaction_info_with_data( + transaction_info, + serialized_payload, + payload_type, + txn_version, + transaction_type, + genesis_txn.events.len() as i64, + block_height, + epoch, + block_timestamp, + ), + None, + wsc, + wsc_detail, + ) + }, + TxnData::BlockMetadata(block_metadata_txn) => { + let (wsc, wsc_detail) = WriteSetChangeModel::from_write_set_changes( + &transaction_info.changes, + txn_version, + block_height, + block_timestamp, + ); + ( + Self::from_transaction_info_with_data( + transaction_info, + None, + None, + txn_version, + transaction_type, + block_metadata_txn.events.len() as i64, + block_height, + epoch, + block_timestamp, + ), + Some(BlockMetadataTransaction::from_transaction( + block_metadata_txn, + txn_version, + block_height, + epoch, + timestamp, + )), + wsc, + wsc_detail, + ) + }, + TxnData::StateCheckpoint(_) => ( + Self::from_transaction_info_with_data( + transaction_info, + None, + None, + txn_version, + transaction_type, + 0, + block_height, + epoch, + block_timestamp, + ), + None, + vec![], + vec![], + ), + TxnData::Validator(_) => ( + Self::from_transaction_info_with_data( + transaction_info, + None, + None, + txn_version, + transaction_type, + 0, + block_height, + epoch, + block_timestamp, + ), + None, + vec![], + vec![], + ), + TxnData::BlockEpilogue(_) => ( + Self::from_transaction_info_with_data( + transaction_info, + None, + None, + txn_version, + transaction_type, + 0, + block_height, + epoch, + block_timestamp, + ), + None, + vec![], + vec![], + ), + } + } + + pub fn from_transactions( + transactions: &[TransactionPB], + transaction_version_to_struct_count: &mut AHashMap, + ) -> ( + Vec, + Vec, + Vec, + Vec, + ) { + let mut txns = vec![]; + let mut block_metadata_txns = vec![]; + let mut wscs = vec![]; + let mut wsc_details = vec![]; + + for txn in transactions { + let (txn, block_metadata, mut wsc_list, mut wsc_detail_list) = + Self::from_transaction(txn); + txns.push(txn.clone()); + transaction_version_to_struct_count + .entry(txn.txn_version) + .and_modify(|e| *e += 1) + .or_insert(1); + + if let Some(a) = block_metadata { + block_metadata_txns.push(a.clone()); + // transaction_version_to_struct_count.entry(a.version).and_modify(|e| *e += 1); + } + wscs.append(&mut wsc_list); + + if !wsc_list.is_empty() { + transaction_version_to_struct_count + .entry(wsc_list[0].txn_version) + .and_modify(|e| *e += wsc_list.len() as i64); + } + wsc_details.append(&mut wsc_detail_list); + } + (txns, block_metadata_txns, wscs, wsc_details) + } +} + +// Prevent conflicts with other things named `Transaction` +pub type TransactionModel = Transaction; diff --git a/rust/processor/src/db/common/models/default_models/parquet_write_set_changes.rs b/rust/processor/src/db/common/models/default_models/parquet_write_set_changes.rs new file mode 100644 index 000000000..8507a20cb --- /dev/null +++ b/rust/processor/src/db/common/models/default_models/parquet_write_set_changes.rs @@ -0,0 +1,254 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +#![allow(clippy::extra_unused_lifetimes)] + +use super::{ + move_modules::MoveModule, + parquet_move_resources::MoveResource, + parquet_move_tables::{CurrentTableItem, TableItem, TableMetadata}, +}; +use crate::{ + bq_analytics::generic_parquet_processor::{HasVersion, NamedTable}, + utils::util::standardize_address, +}; +use allocative_derive::Allocative; +use aptos_protos::transaction::v1::{ + write_set_change::{Change as WriteSetChangeEnum, Type as WriteSetChangeTypeEnum}, + WriteSetChange as WriteSetChangePB, +}; +use field_count::FieldCount; +use parquet_derive::ParquetRecordWriter; +use serde::{Deserialize, Serialize}; + +#[derive(Allocative, Clone, Debug, Deserialize, FieldCount, Serialize, ParquetRecordWriter)] +pub struct WriteSetChange { + pub txn_version: i64, + pub write_set_change_index: i64, + pub state_key_hash: String, + pub change_type: String, + pub resource_address: String, + pub block_height: i64, + #[allocative(skip)] + pub block_timestamp: chrono::NaiveDateTime, +} + +impl NamedTable for WriteSetChange { + const TABLE_NAME: &'static str = "write_set_changes"; +} + +impl HasVersion for WriteSetChange { + fn version(&self) -> i64 { + self.txn_version + } +} + +impl Default for WriteSetChange { + fn default() -> Self { + Self { + txn_version: 0, + write_set_change_index: 0, + state_key_hash: "".to_string(), + change_type: "".to_string(), + resource_address: "".to_string(), + block_height: 0, + #[allow(deprecated)] + block_timestamp: chrono::NaiveDateTime::from_timestamp(0, 0), + } + } +} + +impl WriteSetChange { + pub fn from_write_set_change( + write_set_change: &WriteSetChangePB, + write_set_change_index: i64, + txn_version: i64, + block_height: i64, + block_timestamp: chrono::NaiveDateTime, + ) -> (Self, WriteSetChangeDetail) { + let change_type = Self::get_write_set_change_type(write_set_change); + let change = write_set_change + .change + .as_ref() + .expect("WriteSetChange must have a change"); + match change { + WriteSetChangeEnum::WriteModule(inner) => ( + Self { + txn_version, + state_key_hash: standardize_address( + hex::encode(inner.state_key_hash.as_slice()).as_str(), + ), + block_height, + change_type, + resource_address: standardize_address(&inner.address.to_string()), + write_set_change_index, + block_timestamp, + }, + WriteSetChangeDetail::Module(MoveModule::from_write_module( + inner, + write_set_change_index, + txn_version, + block_height, + )), + ), + WriteSetChangeEnum::DeleteModule(inner) => ( + Self { + txn_version, + state_key_hash: standardize_address( + hex::encode(inner.state_key_hash.as_slice()).as_str(), + ), + block_height, + change_type, + resource_address: standardize_address(&inner.address.to_string()), + write_set_change_index, + block_timestamp, + }, + WriteSetChangeDetail::Module(MoveModule::from_delete_module( + inner, + write_set_change_index, + txn_version, + block_height, + )), + ), + WriteSetChangeEnum::WriteResource(inner) => ( + Self { + txn_version, + state_key_hash: standardize_address( + hex::encode(inner.state_key_hash.as_slice()).as_str(), + ), + block_height, + change_type, + resource_address: standardize_address(&inner.address.to_string()), + write_set_change_index, + block_timestamp, + }, + WriteSetChangeDetail::Resource(MoveResource::from_write_resource( + inner, + write_set_change_index, + txn_version, + block_height, + block_timestamp, + )), + ), + WriteSetChangeEnum::DeleteResource(inner) => ( + Self { + txn_version, + state_key_hash: standardize_address( + hex::encode(inner.state_key_hash.as_slice()).as_str(), + ), + block_height, + change_type, + resource_address: standardize_address(&inner.address.to_string()), + write_set_change_index, + block_timestamp, + }, + WriteSetChangeDetail::Resource(MoveResource::from_delete_resource( + inner, + write_set_change_index, + txn_version, + block_height, + block_timestamp, + )), + ), + WriteSetChangeEnum::WriteTableItem(inner) => { + let (ti, cti) = TableItem::from_write_table_item( + inner, + write_set_change_index, + txn_version, + block_height, + block_timestamp, + ); + ( + Self { + txn_version, + state_key_hash: standardize_address( + hex::encode(inner.state_key_hash.as_slice()).as_str(), + ), + block_height, + change_type, + resource_address: String::default(), + write_set_change_index, + block_timestamp, + }, + WriteSetChangeDetail::Table( + ti, + cti, + Some(TableMetadata::from_write_table_item(inner)), + ), + ) + }, + WriteSetChangeEnum::DeleteTableItem(inner) => { + let (ti, cti) = TableItem::from_delete_table_item( + inner, + write_set_change_index, + txn_version, + block_height, + block_timestamp, + ); + ( + Self { + txn_version, + state_key_hash: standardize_address( + hex::encode(inner.state_key_hash.as_slice()).as_str(), + ), + block_height, + change_type, + resource_address: String::default(), + write_set_change_index, + block_timestamp, + }, + WriteSetChangeDetail::Table(ti, cti, None), + ) + }, + } + } + + pub fn from_write_set_changes( + write_set_changes: &[WriteSetChangePB], + txn_version: i64, + block_height: i64, + timestamp: chrono::NaiveDateTime, + ) -> (Vec, Vec) { + write_set_changes + .iter() + .enumerate() + .map(|(write_set_change_index, write_set_change)| { + Self::from_write_set_change( + write_set_change, + write_set_change_index as i64, + txn_version, + block_height, + timestamp, + ) + }) + .collect::>() + .into_iter() + .unzip() + } + + fn get_write_set_change_type(t: &WriteSetChangePB) -> String { + match WriteSetChangeTypeEnum::try_from(t.r#type) + .expect("WriteSetChange must have a valid type.") + { + WriteSetChangeTypeEnum::DeleteModule => "delete_module".to_string(), + WriteSetChangeTypeEnum::DeleteResource => "delete_resource".to_string(), + WriteSetChangeTypeEnum::DeleteTableItem => "delete_table_item".to_string(), + WriteSetChangeTypeEnum::WriteModule => "write_module".to_string(), + WriteSetChangeTypeEnum::WriteResource => "write_resource".to_string(), + WriteSetChangeTypeEnum::WriteTableItem => "write_table_item".to_string(), + WriteSetChangeTypeEnum::Unspecified => { + panic!("WriteSetChange type must be specified.") + }, + } + } +} + +#[derive(Deserialize, Serialize)] +pub enum WriteSetChangeDetail { + Module(MoveModule), + Resource(MoveResource), + Table(TableItem, CurrentTableItem, Option), +} + +// Prevent conflicts with other things named `WriteSetChange` +pub type WriteSetChangeModel = WriteSetChange; diff --git a/rust/processor/src/gap_detector.rs b/rust/processor/src/gap_detector.rs deleted file mode 100644 index 96f84b3cc..000000000 --- a/rust/processor/src/gap_detector.rs +++ /dev/null @@ -1,194 +0,0 @@ -// Copyright © Aptos Foundation -// SPDX-License-Identifier: Apache-2.0 - -use crate::{ - processors::{ProcessingResult, Processor, ProcessorTrait}, - utils::counters::PROCESSOR_DATA_GAP_COUNT, - worker::PROCESSOR_SERVICE_TYPE, -}; -use ahash::AHashMap; -use kanal::AsyncReceiver; -use tracing::{error, info}; - -// Size of a gap (in txn version) before gap detected -pub const DEFAULT_GAP_DETECTION_BATCH_SIZE: u64 = 500; -// Number of seconds between each processor status update -const UPDATE_PROCESSOR_STATUS_SECS: u64 = 1; - -pub struct GapDetector { - next_version_to_process: u64, - seen_versions: AHashMap, - last_success_batch: Option, -} - -pub struct GapDetectorResult { - pub next_version_to_process: u64, - pub num_gaps: u64, - pub last_success_batch: Option, -} - -impl GapDetector { - pub fn new(starting_version: u64) -> Self { - Self { - next_version_to_process: starting_version, - seen_versions: AHashMap::new(), - last_success_batch: None, - } - } - - pub fn process_versions( - &mut self, - result: ProcessingResult, - ) -> anyhow::Result { - // Check for gaps - if self.next_version_to_process != result.start_version { - self.seen_versions.insert(result.start_version, result); - tracing::debug!("Gap detected"); - } else { - // If no gap is detected, find the latest processed batch without gaps - self.update_prev_batch(result); - tracing::debug!("No gap detected"); - } - - Ok(GapDetectorResult { - next_version_to_process: self.next_version_to_process, - num_gaps: self.seen_versions.len() as u64, - last_success_batch: self.last_success_batch.clone(), - }) - } - - fn update_prev_batch(&mut self, result: ProcessingResult) { - let mut new_prev_batch = result; - while let Some(next_version) = self.seen_versions.remove(&(new_prev_batch.end_version + 1)) - { - new_prev_batch = next_version; - } - self.next_version_to_process = new_prev_batch.end_version + 1; - self.last_success_batch = Some(new_prev_batch); - } -} - -pub async fn create_gap_detector_status_tracker_loop( - gap_detector_receiver: AsyncReceiver, - processor: Processor, - starting_version: u64, - gap_detection_batch_size: u64, -) { - let processor_name = processor.name(); - info!( - processor_name = processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - "[Parser] Starting gap detector task", - ); - - let mut gap_detector = GapDetector::new(starting_version); - let mut last_update_time = std::time::Instant::now(); - - loop { - let result = match gap_detector_receiver.recv().await { - Ok(result) => result, - Err(e) => { - info!( - processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - error = ?e, - "[Parser] Gap detector channel has been closed", - ); - return; - }, - }; - - match gap_detector.process_versions(result) { - Ok(res) => { - PROCESSOR_DATA_GAP_COUNT - .with_label_values(&[processor_name]) - .set(res.num_gaps as i64); - if res.num_gaps >= gap_detection_batch_size { - tracing::debug!( - processor_name, - gap_start_version = res.next_version_to_process, - num_gaps = res.num_gaps, - "[Parser] Processed {gap_detection_batch_size} batches with a gap", - ); - // We don't panic as everything downstream will panic if it doesn't work/receive - } - - if let Some(res_last_success_batch) = res.last_success_batch { - if last_update_time.elapsed().as_secs() >= UPDATE_PROCESSOR_STATUS_SECS { - processor - .update_last_processed_version( - res_last_success_batch.end_version, - res_last_success_batch.last_transaction_timestamp.clone(), - ) - .await - .unwrap(); - last_update_time = std::time::Instant::now(); - } - } - }, - Err(e) => { - error!( - processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - error = ?e, - "[Parser] Gap detector task has panicked" - ); - panic!("[Parser] Gap detector task has panicked: {:?}", e); - }, - } - } -} - -#[cfg(test)] -mod test { - use super::*; - - #[tokio::test] - async fn detect_gap_test() { - let starting_version = 0; - let mut gap_detector = GapDetector::new(starting_version); - - // Processing batches with gaps - for i in 0..DEFAULT_GAP_DETECTION_BATCH_SIZE { - let result = ProcessingResult { - start_version: 100 + i * 100, - end_version: 199 + i * 100, - last_transaction_timestamp: None, - processing_duration_in_secs: 0.0, - db_insertion_duration_in_secs: 0.0, - }; - let gap_detector_result = gap_detector.process_versions(result).unwrap(); - assert_eq!(gap_detector_result.num_gaps, i + 1); - assert_eq!(gap_detector_result.next_version_to_process, 0); - assert_eq!(gap_detector_result.last_success_batch, None); - } - - // Process a batch without a gap - let gap_detector_result = gap_detector - .process_versions(ProcessingResult { - start_version: 0, - end_version: 99, - last_transaction_timestamp: None, - processing_duration_in_secs: 0.0, - db_insertion_duration_in_secs: 0.0, - }) - .unwrap(); - assert_eq!(gap_detector_result.num_gaps, 0); - assert_eq!( - gap_detector_result.next_version_to_process, - 100 + (DEFAULT_GAP_DETECTION_BATCH_SIZE) * 100 - ); - assert_eq!( - gap_detector_result - .last_success_batch - .clone() - .unwrap() - .start_version, - 100 + (DEFAULT_GAP_DETECTION_BATCH_SIZE - 1) * 100 - ); - assert_eq!( - gap_detector_result.last_success_batch.unwrap().end_version, - 199 + (DEFAULT_GAP_DETECTION_BATCH_SIZE - 1) * 100 - ); - } -} diff --git a/rust/processor/src/gap_detectors/gap_detector.rs b/rust/processor/src/gap_detectors/gap_detector.rs new file mode 100644 index 000000000..c5707f798 --- /dev/null +++ b/rust/processor/src/gap_detectors/gap_detector.rs @@ -0,0 +1,144 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + gap_detectors::{GapDetectorResult, ProcessingResult}, + processors::DefaultProcessingResult, +}; +use ahash::AHashMap; + +pub trait GapDetectorTrait { + fn process_versions(&mut self, result: ProcessingResult) -> anyhow::Result; +} + +pub struct DefaultGapDetector { + next_version_to_process: u64, + seen_versions: AHashMap, + last_success_batch: Option, +} + +pub struct DefaultGapDetectorResult { + pub next_version_to_process: u64, + pub num_gaps: u64, + pub last_success_batch: Option, +} + +impl GapDetectorTrait for DefaultGapDetector { + fn process_versions(&mut self, result: ProcessingResult) -> anyhow::Result { + match result { + ProcessingResult::DefaultProcessingResult(result) => { + // Check for gaps + if self.next_version_to_process != result.start_version { + self.seen_versions.insert(result.start_version, result); + tracing::debug!("Gap detected"); + } else { + // If no gap is detected, find the latest processed batch without gaps + self.update_prev_batch(result); + tracing::debug!("No gap detected"); + } + + Ok(GapDetectorResult::DefaultGapDetectorResult( + DefaultGapDetectorResult { + next_version_to_process: self.next_version_to_process, + num_gaps: self.seen_versions.len() as u64, + last_success_batch: self.last_success_batch.clone(), + }, + )) + }, + _ => { + panic!("Invalid result type"); + }, + } + } +} + +impl DefaultGapDetector { + pub fn new(starting_version: u64) -> Self { + Self { + next_version_to_process: starting_version, + seen_versions: AHashMap::new(), + last_success_batch: None, + } + } + + fn update_prev_batch(&mut self, result: DefaultProcessingResult) { + let mut new_prev_batch = result; + while let Some(next_version) = self.seen_versions.remove(&(new_prev_batch.end_version + 1)) + { + new_prev_batch = next_version; + } + self.next_version_to_process = new_prev_batch.end_version + 1; + self.last_success_batch = Some(new_prev_batch); + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::gap_detectors::DEFAULT_GAP_DETECTION_BATCH_SIZE; + + #[tokio::test] + async fn detect_gap_test() { + let starting_version = 0; + let mut default_gap_detector = DefaultGapDetector::new(starting_version); + + // Processing batches with gaps + for i in 0..DEFAULT_GAP_DETECTION_BATCH_SIZE { + let result = DefaultProcessingResult { + start_version: 100 + i * 100, + end_version: 199 + i * 100, + last_transaction_timestamp: None, + processing_duration_in_secs: 0.0, + db_insertion_duration_in_secs: 0.0, + }; + let default_gap_detector_result = default_gap_detector + .process_versions(ProcessingResult::DefaultProcessingResult(result)) + .unwrap(); + let default_gap_detector_result = match default_gap_detector_result { + GapDetectorResult::DefaultGapDetectorResult(res) => res, + _ => panic!("Invalid result type"), + }; + + assert_eq!(default_gap_detector_result.num_gaps, i + 1); + assert_eq!(default_gap_detector_result.next_version_to_process, 0); + assert_eq!(default_gap_detector_result.last_success_batch, None); + } + + // Process a batch without a gap + let default_gap_detector_result = default_gap_detector + .process_versions(ProcessingResult::DefaultProcessingResult( + DefaultProcessingResult { + start_version: 0, + end_version: 99, + last_transaction_timestamp: None, + processing_duration_in_secs: 0.0, + db_insertion_duration_in_secs: 0.0, + }, + )) + .unwrap(); + let default_gap_detector_result = match default_gap_detector_result { + GapDetectorResult::DefaultGapDetectorResult(res) => res, + _ => panic!("Invalid result type"), + }; + assert_eq!(default_gap_detector_result.num_gaps, 0); + assert_eq!( + default_gap_detector_result.next_version_to_process, + 100 + (DEFAULT_GAP_DETECTION_BATCH_SIZE) * 100 + ); + assert_eq!( + default_gap_detector_result + .last_success_batch + .clone() + .unwrap() + .start_version, + 100 + (DEFAULT_GAP_DETECTION_BATCH_SIZE - 1) * 100 + ); + assert_eq!( + default_gap_detector_result + .last_success_batch + .unwrap() + .end_version, + 199 + (DEFAULT_GAP_DETECTION_BATCH_SIZE - 1) * 100 + ); + } +} diff --git a/rust/processor/src/gap_detectors/mod.rs b/rust/processor/src/gap_detectors/mod.rs new file mode 100644 index 000000000..20bba694d --- /dev/null +++ b/rust/processor/src/gap_detectors/mod.rs @@ -0,0 +1,182 @@ +use crate::{ + bq_analytics::ParquetProcessingResult, + gap_detectors::{ + gap_detector::{DefaultGapDetector, DefaultGapDetectorResult, GapDetectorTrait}, + parquet_gap_detector::{ParquetFileGapDetector, ParquetFileGapDetectorResult}, + }, + processors::{DefaultProcessingResult, Processor, ProcessorTrait}, + utils::counters::{PARQUET_PROCESSOR_DATA_GAP_COUNT, PROCESSOR_DATA_GAP_COUNT}, + worker::PROCESSOR_SERVICE_TYPE, +}; +use enum_dispatch::enum_dispatch; +use kanal::AsyncReceiver; +use tracing::{error, info}; + +pub mod gap_detector; +pub mod parquet_gap_detector; + +// Size of a gap (in txn version) before gap detected +pub const DEFAULT_GAP_DETECTION_BATCH_SIZE: u64 = 500; +// Number of seconds between each processor status update +const UPDATE_PROCESSOR_STATUS_SECS: u64 = 1; + +#[enum_dispatch(GapDetectorTrait)] +pub enum GapDetector { + DefaultGapDetector, + ParquetFileGapDetector, +} + +#[enum_dispatch(GapDetectorTrait)] +pub enum GapDetectorResult { + DefaultGapDetectorResult, + ParquetFileGapDetectorResult, +} +pub enum ProcessingResult { + DefaultProcessingResult(DefaultProcessingResult), + ParquetProcessingResult(ParquetProcessingResult), +} + +pub async fn create_gap_detector_status_tracker_loop( + gap_detector_receiver: AsyncReceiver, + processor: Processor, + starting_version: u64, + gap_detection_batch_size: u64, +) { + let processor_name = processor.name(); + info!( + processor_name = processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + "[Parser] Starting gap detector task", + ); + + let mut default_gap_detector = DefaultGapDetector::new(starting_version); + let mut parquet_gap_detector = ParquetFileGapDetector::new(starting_version); + + loop { + match gap_detector_receiver.recv().await { + Ok(ProcessingResult::DefaultProcessingResult(result)) => { + let last_update_time = std::time::Instant::now(); + match default_gap_detector + .process_versions(ProcessingResult::DefaultProcessingResult(result)) + { + Ok(res) => { + match res { + GapDetectorResult::DefaultGapDetectorResult(res) => { + PROCESSOR_DATA_GAP_COUNT + .with_label_values(&[processor_name]) + .set(res.num_gaps as i64); + if res.num_gaps >= gap_detection_batch_size { + tracing::debug!( + processor_name, + gap_start_version = res.next_version_to_process, + num_gaps = res.num_gaps, + "[Parser] Processed {gap_detection_batch_size} batches with a gap", + ); + // We don't panic as everything downstream will panic if it doesn't work/receive + } + if let Some(res_last_success_batch) = res.last_success_batch { + if last_update_time.elapsed().as_secs() + >= UPDATE_PROCESSOR_STATUS_SECS + { + processor + .update_last_processed_version( + res_last_success_batch.end_version, + res_last_success_batch + .last_transaction_timestamp + .clone(), + ) + .await + .unwrap(); + } + } + }, + _ => { + panic!("Invalid result type"); + }, + } + }, + Err(e) => { + error!( + processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + error = ?e, + "[Parser] Gap detector task has panicked" + ); + panic!("[Parser] Gap detector task has panicked: {:?}", e); + }, + } + }, + Ok(ProcessingResult::ParquetProcessingResult(result)) => { + info!( + processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + "[ParquetGapDetector] received parquet gap detector task", + ); + let last_update_time = std::time::Instant::now(); + match parquet_gap_detector + .process_versions(ProcessingResult::ParquetProcessingResult(result)) + { + Ok(res) => { + match res { + GapDetectorResult::ParquetFileGapDetectorResult(res) => { + PARQUET_PROCESSOR_DATA_GAP_COUNT + .with_label_values(&[processor_name]) + .set(res.num_gaps as i64); + // we need a new gap detection batch size + if res.num_gaps >= gap_detection_batch_size { + tracing::debug!( + processor_name, + gap_start_version = res.next_version_to_process, + num_gaps = res.num_gaps, + "[Parser] Processed {gap_detection_batch_size} batches with a gap", + ); + // We don't panic as everything downstream will panic if it doesn't work/receive + } + + if let Some(res_last_success_batch) = res.last_success_batch { + if last_update_time.elapsed().as_secs() + >= UPDATE_PROCESSOR_STATUS_SECS + { + tracing::info!("Updating last processed version"); + processor + .update_last_processed_version( + res_last_success_batch.end_version as u64, + res_last_success_batch + .last_transaction_timestamp + .clone(), + ) + .await + .unwrap(); + } else { + tracing::info!("Not Updating last processed version"); + } + } + }, + _ => { + panic!("Invalid result type"); + }, + } + }, + Err(e) => { + error!( + processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + error = ?e, + "[Parser] Gap detector task has panicked" + ); + panic!("[Parser] Gap detector task has panicked: {:?}", e); + }, + } + }, + Err(e) => { + info!( + processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + error = ?e, + "[Parser] Gap detector channel has been closed", + ); + return; + }, + }; + } +} diff --git a/rust/processor/src/gap_detectors/parquet_gap_detector.rs b/rust/processor/src/gap_detectors/parquet_gap_detector.rs new file mode 100644 index 000000000..c0b5ae6cc --- /dev/null +++ b/rust/processor/src/gap_detectors/parquet_gap_detector.rs @@ -0,0 +1,89 @@ +// // Copyright © Aptos Foundation +// // SPDX-License-Identifier: Apache-2.0 + +use crate::{ + bq_analytics::ParquetProcessingResult, + gap_detectors::{gap_detector::GapDetectorTrait, GapDetectorResult, ProcessingResult}, +}; +use ahash::AHashMap; +use std::cmp::max; +use tracing::{debug, info}; + +pub struct ParquetFileGapDetector { + next_version_to_process: i64, + last_success_batch: Option, + version_counters: AHashMap, + max_version: i64, +} + +pub struct ParquetFileGapDetectorResult { + pub next_version_to_process: u64, + pub num_gaps: u64, + pub last_success_batch: Option, +} + +impl ParquetFileGapDetector { + pub fn new(starting_version: u64) -> Self { + Self { + next_version_to_process: starting_version as i64, + last_success_batch: None, + version_counters: AHashMap::new(), + max_version: 0, + } + } +} +impl GapDetectorTrait for ParquetFileGapDetector { + fn process_versions(&mut self, result: ProcessingResult) -> anyhow::Result { + // Update counts of structures for each transaction version + let result = match result { + ProcessingResult::ParquetProcessingResult(r) => r, + _ => panic!("Invalid result type"), + }; + for (version, count) in result.txn_version_to_struct_count.iter() { + if !self.version_counters.contains_key(version) { + // info!("Inserting version {} with count {} into parquet gap detector", version, count); + self.version_counters.insert(*version, *count); + } + self.max_version = max(self.max_version, *version); + + *self.version_counters.entry(*version).or_default() -= 1; + } + + // Update next version to process and move forward + let mut current_version = result.start_version; + + while current_version <= result.end_version { + match self.version_counters.get_mut(¤t_version) { + Some(count) => { + if *count == 0 && current_version == self.next_version_to_process { + while let Some(&count) = + self.version_counters.get(&self.next_version_to_process) + { + if count == 0 { + self.version_counters.remove(&self.next_version_to_process); // Remove the fully processed version + self.next_version_to_process += 1; // Increment to the next version + info!("Version {} fully processed. Next version to process updated to {}", self.next_version_to_process - 1, self.next_version_to_process); + } else { + break; + } + } + } + }, + None => { + // TODO: validate this that we shouldn't reach this b/c we already added default count. + // or it could mean that we have duplicates. + debug!("No struct count found for version {}", current_version); + }, + } + current_version += 1; // Move to the next version in sequence + } + + Ok(GapDetectorResult::ParquetFileGapDetectorResult( + ParquetFileGapDetectorResult { + next_version_to_process: self.next_version_to_process as u64, + num_gaps: (self.max_version - self.next_version_to_process) as u64, + last_success_batch: self.last_success_batch.clone(), + }, + )) + } +} diff --git a/rust/processor/src/lib.rs b/rust/processor/src/lib.rs index ce744c590..8dcb43fa3 100644 --- a/rust/processor/src/lib.rs +++ b/rust/processor/src/lib.rs @@ -11,11 +11,17 @@ #[macro_use] extern crate diesel; +// for parquet_derive +extern crate canonical_json; +extern crate parquet; +extern crate parquet_derive; + pub use config::IndexerGrpcProcessorConfig; +pub mod bq_analytics; mod config; mod db; -pub mod gap_detector; +pub mod gap_detectors; pub mod grpc_stream; pub mod processors; #[path = "db/postgres/schema.rs"] diff --git a/rust/processor/src/processors/account_transactions_processor.rs b/rust/processor/src/processors/account_transactions_processor.rs index 66955c32c..f7ef88344 100644 --- a/rust/processor/src/processors/account_transactions_processor.rs +++ b/rust/processor/src/processors/account_transactions_processor.rs @@ -1,9 +1,10 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use super::{ProcessingResult, ProcessorName, ProcessorTrait}; +use super::{DefaultProcessingResult, ProcessorName, ProcessorTrait}; use crate::{ db::common::models::account_transaction_models::account_transactions::AccountTransaction, + gap_detectors::ProcessingResult, schema, utils::database::{execute_in_chunks, get_config_table_chunk_size, ArcDbPool}, }; @@ -129,13 +130,15 @@ impl ProcessorTrait for AccountTransactionsProcessor { let db_insertion_duration_in_secs = db_insertion_start.elapsed().as_secs_f64(); match tx_result { - Ok(_) => Ok(ProcessingResult { - start_version, - end_version, - processing_duration_in_secs, - db_insertion_duration_in_secs, - last_transaction_timestamp, - }), + Ok(_) => Ok(ProcessingResult::DefaultProcessingResult( + DefaultProcessingResult { + start_version, + end_version, + processing_duration_in_secs, + db_insertion_duration_in_secs, + last_transaction_timestamp, + }, + )), Err(err) => { error!( start_version = start_version, diff --git a/rust/processor/src/processors/ans_processor.rs b/rust/processor/src/processors/ans_processor.rs index 835cac42b..a984c8de7 100644 --- a/rust/processor/src/processors/ans_processor.rs +++ b/rust/processor/src/processors/ans_processor.rs @@ -1,7 +1,7 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use super::{ProcessingResult, ProcessorName, ProcessorTrait}; +use super::{DefaultProcessingResult, ProcessorName, ProcessorTrait}; use crate::{ db::common::models::ans_models::{ ans_lookup::{AnsLookup, AnsPrimaryName, CurrentAnsLookup, CurrentAnsPrimaryName}, @@ -10,6 +10,7 @@ use crate::{ }, ans_utils::{RenewNameEvent, SubdomainExtV2}, }, + gap_detectors::ProcessingResult, schema, utils::{ counters::PROCESSOR_UNKNOWN_TYPE_COUNT, @@ -410,13 +411,15 @@ impl ProcessorTrait for AnsProcessor { let db_insertion_duration_in_secs = db_insertion_start.elapsed().as_secs_f64(); match tx_result { - Ok(_) => Ok(ProcessingResult { - start_version, - end_version, - processing_duration_in_secs, - db_insertion_duration_in_secs, - last_transaction_timestamp, - }), + Ok(_) => Ok(ProcessingResult::DefaultProcessingResult( + DefaultProcessingResult { + start_version, + end_version, + processing_duration_in_secs, + db_insertion_duration_in_secs, + last_transaction_timestamp, + }, + )), Err(e) => { error!( start_version = start_version, diff --git a/rust/processor/src/processors/coin_processor.rs b/rust/processor/src/processors/coin_processor.rs index b16e4398b..f351ce40d 100644 --- a/rust/processor/src/processors/coin_processor.rs +++ b/rust/processor/src/processors/coin_processor.rs @@ -1,7 +1,7 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use super::{ProcessingResult, ProcessorName, ProcessorTrait}; +use super::{DefaultProcessingResult, ProcessorName, ProcessorTrait}; use crate::{ db::common::models::{ coin_models::{ @@ -11,6 +11,7 @@ use crate::{ }, fungible_asset_models::v2_fungible_asset_activities::CurrentCoinBalancePK, }, + gap_detectors::ProcessingResult, schema, utils::database::{execute_in_chunks, get_config_table_chunk_size, ArcDbPool}, }; @@ -277,13 +278,15 @@ impl ProcessorTrait for CoinProcessor { let db_insertion_duration_in_secs = db_insertion_start.elapsed().as_secs_f64(); match tx_result { - Ok(_) => Ok(ProcessingResult { - start_version, - end_version, - processing_duration_in_secs, - db_insertion_duration_in_secs, - last_transaction_timestamp, - }), + Ok(_) => Ok(ProcessingResult::DefaultProcessingResult( + DefaultProcessingResult { + start_version, + end_version, + processing_duration_in_secs, + db_insertion_duration_in_secs, + last_transaction_timestamp, + }, + )), Err(err) => { error!( start_version = start_version, diff --git a/rust/processor/src/processors/default_processor.rs b/rust/processor/src/processors/default_processor.rs index 017d434ee..cc2c76664 100644 --- a/rust/processor/src/processors/default_processor.rs +++ b/rust/processor/src/processors/default_processor.rs @@ -1,7 +1,7 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use super::{ProcessingResult, ProcessorName, ProcessorTrait}; +use super::{DefaultProcessingResult, ProcessorName, ProcessorTrait}; use crate::{ db::common::models::default_models::{ block_metadata_transactions::{BlockMetadataTransaction, BlockMetadataTransactionModel}, @@ -11,6 +11,7 @@ use crate::{ transactions::TransactionModel, write_set_changes::{WriteSetChangeDetail, WriteSetChangeModel}, }, + gap_detectors::ProcessingResult, schema, utils::database::{execute_in_chunks, get_config_table_chunk_size, ArcDbPool}, worker::TableFlags, @@ -148,11 +149,11 @@ async fn insert_to_db( get_config_table_chunk_size::("table_metadatas", per_table_chunk_sizes), ); - let (txns_res, bmt_res, wst_res, mm_res, mr_res, ti_res, cti_res, tm_res) = - join!(txns_res, bmt_res, wst_res, mm_res, mr_res, ti_res, cti_res, tm_res); + let (txns_res, wst_res, bmt_res, mm_res, mr_res, ti_res, cti_res, tm_res) = + join!(txns_res, wst_res, bmt_res, mm_res, mr_res, ti_res, cti_res, tm_res); for res in [ - txns_res, bmt_res, wst_res, mm_res, mr_res, ti_res, cti_res, tm_res, + txns_res, wst_res, bmt_res, mm_res, mr_res, ti_res, cti_res, tm_res, ] { res?; } @@ -356,13 +357,15 @@ impl ProcessorTrait for DefaultProcessor { let db_insertion_duration_in_secs = db_insertion_start.elapsed().as_secs_f64(); match tx_result { - Ok(_) => Ok(ProcessingResult { - start_version, - end_version, - processing_duration_in_secs, - db_insertion_duration_in_secs, - last_transaction_timestamp, - }), + Ok(_) => Ok(ProcessingResult::DefaultProcessingResult( + DefaultProcessingResult { + start_version, + end_version, + processing_duration_in_secs, + db_insertion_duration_in_secs, + last_transaction_timestamp, + }, + )), Err(e) => { error!( start_version = start_version, diff --git a/rust/processor/src/processors/events_processor.rs b/rust/processor/src/processors/events_processor.rs index b5ff0831c..409914275 100644 --- a/rust/processor/src/processors/events_processor.rs +++ b/rust/processor/src/processors/events_processor.rs @@ -1,9 +1,10 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use super::{ProcessingResult, ProcessorName, ProcessorTrait}; +use super::{DefaultProcessingResult, ProcessorName, ProcessorTrait}; use crate::{ db::common::models::events_models::events::EventModel, + gap_detectors::ProcessingResult, schema, utils::{ counters::PROCESSOR_UNKNOWN_TYPE_COUNT, @@ -151,13 +152,15 @@ impl ProcessorTrait for EventsProcessor { let db_insertion_duration_in_secs = db_insertion_start.elapsed().as_secs_f64(); match tx_result { - Ok(_) => Ok(ProcessingResult { - start_version, - end_version, - processing_duration_in_secs, - db_insertion_duration_in_secs, - last_transaction_timestamp, - }), + Ok(_) => Ok(ProcessingResult::DefaultProcessingResult( + DefaultProcessingResult { + start_version, + end_version, + processing_duration_in_secs, + db_insertion_duration_in_secs, + last_transaction_timestamp, + }, + )), Err(e) => { error!( start_version = start_version, diff --git a/rust/processor/src/processors/fungible_asset_processor.rs b/rust/processor/src/processors/fungible_asset_processor.rs index 14b6f2613..12c36600a 100644 --- a/rust/processor/src/processors/fungible_asset_processor.rs +++ b/rust/processor/src/processors/fungible_asset_processor.rs @@ -1,7 +1,7 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use super::{ProcessingResult, ProcessorName, ProcessorTrait}; +use super::{DefaultProcessingResult, ProcessorName, ProcessorTrait}; use crate::{ db::common::models::{ coin_models::coin_supply::CoinSupply, @@ -21,6 +21,7 @@ use crate::{ ObjectAggregatedData, ObjectAggregatedDataMapping, ObjectWithMetadata, Untransferable, }, }, + gap_detectors::ProcessingResult, schema, utils::{ counters::PROCESSOR_UNKNOWN_TYPE_COUNT, @@ -382,13 +383,15 @@ impl ProcessorTrait for FungibleAssetProcessor { .await; let db_insertion_duration_in_secs = db_insertion_start.elapsed().as_secs_f64(); match tx_result { - Ok(_) => Ok(ProcessingResult { - start_version, - end_version, - processing_duration_in_secs, - db_insertion_duration_in_secs, - last_transaction_timestamp, - }), + Ok(_) => Ok(ProcessingResult::DefaultProcessingResult( + DefaultProcessingResult { + start_version, + end_version, + processing_duration_in_secs, + db_insertion_duration_in_secs, + last_transaction_timestamp, + }, + )), Err(err) => { error!( start_version = start_version, @@ -449,6 +452,7 @@ async fn parse_v2_coin( .as_ref() .expect("Transaction timestamp doesn't exist!") .seconds; + #[allow(deprecated)] let txn_timestamp = NaiveDateTime::from_timestamp_opt(txn_timestamp, 0).expect("Txn Timestamp is invalid!"); let txn_epoch = txn.epoch as i64; diff --git a/rust/processor/src/processors/mod.rs b/rust/processor/src/processors/mod.rs index 07257d6dc..b3d33de5d 100644 --- a/rust/processor/src/processors/mod.rs +++ b/rust/processor/src/processors/mod.rs @@ -13,6 +13,7 @@ pub mod fungible_asset_processor; pub mod monitoring_processor; pub mod nft_metadata_processor; pub mod objects_processor; +pub mod parquet_default_processor; pub mod stake_processor; pub mod token_processor; pub mod token_v2_processor; @@ -29,6 +30,7 @@ use self::{ monitoring_processor::MonitoringProcessor, nft_metadata_processor::{NftMetadataProcessor, NftMetadataProcessorConfig}, objects_processor::{ObjectsProcessor, ObjectsProcessorConfig}, + parquet_default_processor::DefaultParquetProcessorConfig, stake_processor::{StakeProcessor, StakeProcessorConfig}, token_processor::{TokenProcessor, TokenProcessorConfig}, token_v2_processor::{TokenV2Processor, TokenV2ProcessorConfig}, @@ -37,6 +39,8 @@ use self::{ }; use crate::{ db::common::models::processor_status::ProcessorStatus, + gap_detectors::ProcessingResult, + processors::parquet_default_processor::DefaultParquetProcessor, schema::processor_status, utils::{ counters::{GOT_CONNECTION_COUNT, UNABLE_TO_GET_CONNECTION_COUNT}, @@ -52,7 +56,7 @@ use serde::{Deserialize, Serialize}; use std::fmt::Debug; #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] -pub struct ProcessingResult { +pub struct DefaultProcessingResult { pub start_version: u64, pub end_version: u64, pub last_transaction_timestamp: Option, @@ -190,6 +194,7 @@ pub enum ProcessorConfig { TokenV2Processor(TokenV2ProcessorConfig), TransactionMetadataProcessor, UserTransactionProcessor, + DefaultParquetProcessor(DefaultParquetProcessorConfig), } impl ProcessorConfig { @@ -198,6 +203,10 @@ impl ProcessorConfig { pub fn name(&self) -> &'static str { self.into() } + + pub fn is_parquet_processor(&self) -> bool { + matches!(self, ProcessorConfig::DefaultParquetProcessor(_)) + } } /// This enum contains all the processors defined in this crate. We use enum_dispatch @@ -232,6 +241,7 @@ pub enum Processor { TokenV2Processor, TransactionMetadataProcessor, UserTransactionProcessor, + DefaultParquetProcessor, } #[cfg(test)] diff --git a/rust/processor/src/processors/monitoring_processor.rs b/rust/processor/src/processors/monitoring_processor.rs index 4fda0e9e8..c7e750f82 100644 --- a/rust/processor/src/processors/monitoring_processor.rs +++ b/rust/processor/src/processors/monitoring_processor.rs @@ -1,8 +1,8 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use super::{ProcessingResult, ProcessorName, ProcessorTrait}; -use crate::utils::database::ArcDbPool; +use super::{DefaultProcessingResult, ProcessorName, ProcessorTrait}; +use crate::{gap_detectors::ProcessingResult, utils::database::ArcDbPool}; use aptos_protos::transaction::v1::Transaction; use async_trait::async_trait; use std::fmt::Debug; @@ -41,13 +41,15 @@ impl ProcessorTrait for MonitoringProcessor { end_version: u64, _: Option, ) -> anyhow::Result { - Ok(ProcessingResult { - start_version, - end_version, - processing_duration_in_secs: 0.0, - db_insertion_duration_in_secs: 0.0, - last_transaction_timestamp: transactions.last().unwrap().timestamp.clone(), - }) + Ok(ProcessingResult::DefaultProcessingResult( + DefaultProcessingResult { + start_version, + end_version, + processing_duration_in_secs: 0.0, + db_insertion_duration_in_secs: 0.0, + last_transaction_timestamp: transactions.last().unwrap().timestamp.clone(), + }, + )) } fn connection_pool(&self) -> &ArcDbPool { diff --git a/rust/processor/src/processors/nft_metadata_processor.rs b/rust/processor/src/processors/nft_metadata_processor.rs index 58f63181f..4fcb9a922 100644 --- a/rust/processor/src/processors/nft_metadata_processor.rs +++ b/rust/processor/src/processors/nft_metadata_processor.rs @@ -1,7 +1,7 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use super::{ProcessingResult, ProcessorName, ProcessorTrait}; +use super::{DefaultProcessingResult, ProcessorName, ProcessorTrait}; use crate::{ db::common::models::{ object_models::v2_object_utils::{ @@ -13,6 +13,7 @@ use crate::{ v2_token_datas::{CurrentTokenDataV2, CurrentTokenDataV2PK, TokenDataV2}, }, }, + gap_detectors::ProcessingResult, utils::{ database::{ArcDbPool, DbPoolConnection}, util::{parse_timestamp, remove_null_bytes, standardize_address}, @@ -177,13 +178,15 @@ impl ProcessorTrait for NftMetadataProcessor { let db_insertion_duration_in_secs = db_insertion_start.elapsed().as_secs_f64(); - Ok(ProcessingResult { - start_version, - end_version, - processing_duration_in_secs, - db_insertion_duration_in_secs, - last_transaction_timestamp, - }) + Ok(ProcessingResult::DefaultProcessingResult( + DefaultProcessingResult { + start_version, + end_version, + processing_duration_in_secs, + db_insertion_duration_in_secs, + last_transaction_timestamp, + }, + )) } fn connection_pool(&self) -> &ArcDbPool { diff --git a/rust/processor/src/processors/objects_processor.rs b/rust/processor/src/processors/objects_processor.rs index 7ab871c3a..9228692ab 100644 --- a/rust/processor/src/processors/objects_processor.rs +++ b/rust/processor/src/processors/objects_processor.rs @@ -1,12 +1,13 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use super::{ProcessingResult, ProcessorName, ProcessorTrait}; +use super::{DefaultProcessingResult, ProcessorName, ProcessorTrait}; use crate::{ db::common::models::object_models::{ v2_object_utils::{ObjectAggregatedData, ObjectAggregatedDataMapping, ObjectWithMetadata}, v2_objects::{CurrentObject, Object}, }, + gap_detectors::ProcessingResult, schema, utils::{ database::{execute_in_chunks, get_config_table_chunk_size, ArcDbPool}, @@ -279,13 +280,15 @@ impl ProcessorTrait for ObjectsProcessor { let db_insertion_duration_in_secs = db_insertion_start.elapsed().as_secs_f64(); match tx_result { - Ok(_) => Ok(ProcessingResult { - start_version, - end_version, - processing_duration_in_secs, - db_insertion_duration_in_secs, - last_transaction_timestamp, - }), + Ok(_) => Ok(ProcessingResult::DefaultProcessingResult( + DefaultProcessingResult { + start_version, + end_version, + processing_duration_in_secs, + db_insertion_duration_in_secs, + last_transaction_timestamp, + }, + )), Err(e) => { error!( start_version = start_version, diff --git a/rust/processor/src/processors/parquet_default_processor.rs b/rust/processor/src/processors/parquet_default_processor.rs new file mode 100644 index 000000000..0b8a30890 --- /dev/null +++ b/rust/processor/src/processors/parquet_default_processor.rs @@ -0,0 +1,271 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use super::{ProcessorName, ProcessorTrait}; +use crate::{ + bq_analytics::{ + generic_parquet_processor::ParquetDataGeneric, + parquet_handler::create_parquet_handler_loop, ParquetProcessingResult, + }, + db::common::models::default_models::{ + parquet_move_resources::MoveResource, + parquet_move_tables::{CurrentTableItem, TableItem, TableMetadata}, + parquet_transactions::{Transaction as ParquetTransaction, TransactionModel}, + parquet_write_set_changes::{WriteSetChangeDetail, WriteSetChangeModel}, + }, + gap_detectors::ProcessingResult, + utils::database::ArcDbPool, +}; +use ahash::AHashMap; +use anyhow::anyhow; +use aptos_protos::transaction::v1::Transaction; +use async_trait::async_trait; +use kanal::AsyncSender; +use serde::{Deserialize, Serialize}; +use std::fmt::{Debug, Formatter, Result}; + +const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS"; + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub struct DefaultParquetProcessorConfig { + pub google_application_credentials: Option, + pub bucket_name: String, + pub parquet_handler_response_channel_size: usize, + pub max_buffer_size: usize, +} + +pub struct DefaultParquetProcessor { + connection_pool: ArcDbPool, + transaction_sender: AsyncSender>, + move_resource_sender: AsyncSender>, + wsc_sender: AsyncSender>, + ti_sender: AsyncSender>, +} + +// TODO: Since each table item has different size allocated, the pace of being backfilled to PQ varies a lot. +// Maybe we can have also have a way to configure different starting version for each table later. +impl DefaultParquetProcessor { + pub fn new( + connection_pool: ArcDbPool, + config: DefaultParquetProcessorConfig, + new_gap_detector_sender: AsyncSender, + ) -> Self { + if let Some(credentials) = config.google_application_credentials.clone() { + std::env::set_var(GOOGLE_APPLICATION_CREDENTIALS, credentials); + } + + let transaction_sender = create_parquet_handler_loop::( + new_gap_detector_sender.clone(), + ProcessorName::DefaultParquetProcessor.into(), + config.bucket_name.clone(), + config.parquet_handler_response_channel_size, + config.max_buffer_size, + ); + + let move_resource_sender = create_parquet_handler_loop::( + new_gap_detector_sender.clone(), + ProcessorName::DefaultParquetProcessor.into(), + config.bucket_name.clone(), + config.parquet_handler_response_channel_size, + config.max_buffer_size, + ); + + let wsc_sender = create_parquet_handler_loop::( + new_gap_detector_sender.clone(), + ProcessorName::DefaultParquetProcessor.into(), + config.bucket_name.clone(), + config.parquet_handler_response_channel_size, + config.max_buffer_size, + ); + + let ti_sender = create_parquet_handler_loop::( + new_gap_detector_sender.clone(), + ProcessorName::DefaultParquetProcessor.into(), + config.bucket_name.clone(), + config.parquet_handler_response_channel_size, + config.max_buffer_size, + ); + + Self { + connection_pool, + transaction_sender, + move_resource_sender, + wsc_sender, + ti_sender, + } + } +} + +impl Debug for DefaultParquetProcessor { + fn fmt(&self, f: &mut Formatter<'_>) -> Result { + write!( + f, + "ParquetProcessor {{ capacity of t channel: {:?}, capacity of mr channel: {:?}, capacity of wsc channel: {:?}, capacity of ti channel: {:?} }}", + &self.transaction_sender.capacity(), + &self.move_resource_sender.capacity(), + &self.wsc_sender.capacity(), + &self.ti_sender.capacity(), + ) + } +} + +#[async_trait] +impl ProcessorTrait for DefaultParquetProcessor { + fn name(&self) -> &'static str { + ProcessorName::DefaultParquetProcessor.into() + } + + async fn process_transactions( + &self, + transactions: Vec, + start_version: u64, + end_version: u64, + _: Option, + ) -> anyhow::Result { + let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); + + let ((mr, wsc, t, ti), transaction_version_to_struct_count) = + tokio::task::spawn_blocking(move || process_transactions(transactions)) + .await + .expect("Failed to spawn_blocking for TransactionModel::from_transactions"); + + let mr_parquet_data = ParquetDataGeneric { + data: mr, + last_transaction_timestamp: last_transaction_timestamp.clone(), + transaction_version_to_struct_count: transaction_version_to_struct_count.clone(), + first_txn_version: start_version, + last_txn_version: end_version, + }; + + self.move_resource_sender + .send(mr_parquet_data) + .await + .map_err(|e| anyhow!("Failed to send to parquet manager: {}", e))?; + + let wsc_parquet_data = ParquetDataGeneric { + data: wsc, + last_transaction_timestamp: last_transaction_timestamp.clone(), + transaction_version_to_struct_count: transaction_version_to_struct_count.clone(), + first_txn_version: start_version, + last_txn_version: end_version, + }; + self.wsc_sender + .send(wsc_parquet_data) + .await + .map_err(|e| anyhow!("Failed to send to parquet manager: {}", e))?; + + let t_parquet_data = ParquetDataGeneric { + data: t, + last_transaction_timestamp: last_transaction_timestamp.clone(), + transaction_version_to_struct_count: transaction_version_to_struct_count.clone(), + first_txn_version: start_version, + last_txn_version: end_version, + }; + self.transaction_sender + .send(t_parquet_data) + .await + .map_err(|e| anyhow!("Failed to send to parquet manager: {}", e))?; + + let ti_parquet_data = ParquetDataGeneric { + data: ti, + last_transaction_timestamp: last_transaction_timestamp.clone(), + transaction_version_to_struct_count: transaction_version_to_struct_count.clone(), + first_txn_version: start_version, + last_txn_version: end_version, + }; + + self.ti_sender + .send(ti_parquet_data) + .await + .map_err(|e| anyhow!("Failed to send to parquet manager: {}", e))?; + + Ok(ProcessingResult::ParquetProcessingResult( + ParquetProcessingResult { + start_version: start_version as i64, + end_version: end_version as i64, + last_transaction_timestamp: last_transaction_timestamp.clone(), + txn_version_to_struct_count: AHashMap::new(), + }, + )) + } + + fn connection_pool(&self) -> &ArcDbPool { + &self.connection_pool + } +} + +pub fn process_transactions( + transactions: Vec, +) -> ( + ( + Vec, + Vec, + Vec, + Vec, + ), + AHashMap, +) { + let mut transaction_version_to_struct_count: AHashMap = AHashMap::new(); + let (txns, _block_metadata_txns, write_set_changes, wsc_details) = + TransactionModel::from_transactions( + &transactions, + &mut transaction_version_to_struct_count, + ); + + let mut move_modules = vec![]; + let mut move_resources = vec![]; + let mut table_items = vec![]; + let mut current_table_items = AHashMap::new(); + let mut table_metadata: AHashMap = AHashMap::new(); + + for detail in wsc_details { + match detail { + WriteSetChangeDetail::Module(module) => { + move_modules.push(module.clone()); + // transaction_version_to_struct_count.entry(module.transaction_version).and_modify(|e| *e += 1); // TODO: uncomment in Tranche2 + }, + WriteSetChangeDetail::Resource(resource) => { + transaction_version_to_struct_count + .entry(resource.txn_version) + .and_modify(|e| *e += 1); + move_resources.push(resource); + }, + WriteSetChangeDetail::Table(item, current_item, metadata) => { + transaction_version_to_struct_count + .entry(item.txn_version) + .and_modify(|e| *e += 1); + table_items.push(item); + + current_table_items.insert( + ( + current_item.table_handle.clone(), + current_item.key_hash.clone(), + ), + current_item, + ); + // transaction_version_to_struct_count.entry(current_item.last_transaction_version).and_modify(|e| *e += 1); // TODO: uncomment in Tranche2 + + if let Some(meta) = metadata { + table_metadata.insert(meta.handle.clone(), meta); + // transaction_version_to_struct_count.entry(current_item.last_transaction_version).and_modify(|e| *e += 1); // TODO: uncomment in Tranche2 + } + }, + } + } + + // Getting list of values and sorting by pk in order to avoid postgres deadlock since we're doing multi threaded db writes + let mut current_table_items = current_table_items + .into_values() + .collect::>(); + let mut table_metadata = table_metadata.into_values().collect::>(); + // Sort by PK + current_table_items + .sort_by(|a, b| (&a.table_handle, &a.key_hash).cmp(&(&b.table_handle, &b.key_hash))); + table_metadata.sort_by(|a, b| a.handle.cmp(&b.handle)); + + ( + (move_resources, write_set_changes, txns, table_items), + transaction_version_to_struct_count, + ) +} diff --git a/rust/processor/src/processors/stake_processor.rs b/rust/processor/src/processors/stake_processor.rs index e1268c4af..d623704d1 100644 --- a/rust/processor/src/processors/stake_processor.rs +++ b/rust/processor/src/processors/stake_processor.rs @@ -1,7 +1,7 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use super::{ProcessingResult, ProcessorName, ProcessorTrait}; +use super::{DefaultProcessingResult, ProcessorName, ProcessorTrait}; use crate::{ db::common::models::stake_models::{ current_delegated_voter::CurrentDelegatedVoter, @@ -16,6 +16,7 @@ use crate::{ stake_utils::DelegationVoteGovernanceRecordsResource, staking_pool_voter::{CurrentStakingPoolVoter, StakingPoolVoterMap}, }, + gap_detectors::ProcessingResult, schema, utils::{ database::{execute_in_chunks, get_config_table_chunk_size, ArcDbPool}, @@ -578,13 +579,15 @@ impl ProcessorTrait for StakeProcessor { .await; let db_insertion_duration_in_secs = db_insertion_start.elapsed().as_secs_f64(); match tx_result { - Ok(_) => Ok(ProcessingResult { - start_version, - end_version, - processing_duration_in_secs, - db_insertion_duration_in_secs, - last_transaction_timestamp, - }), + Ok(_) => Ok(ProcessingResult::DefaultProcessingResult( + DefaultProcessingResult { + start_version, + end_version, + processing_duration_in_secs, + db_insertion_duration_in_secs, + last_transaction_timestamp, + }, + )), Err(e) => { error!( start_version = start_version, diff --git a/rust/processor/src/processors/token_processor.rs b/rust/processor/src/processors/token_processor.rs index f19fb1797..cd5411b28 100644 --- a/rust/processor/src/processors/token_processor.rs +++ b/rust/processor/src/processors/token_processor.rs @@ -1,7 +1,7 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use super::{ProcessingResult, ProcessorName, ProcessorTrait}; +use super::{DefaultProcessingResult, ProcessorName, ProcessorTrait}; use crate::{ db::common::models::token_models::{ collection_datas::{CollectionData, CurrentCollectionData}, @@ -15,6 +15,7 @@ use crate::{ TokenDataIdHash, }, }, + gap_detectors::ProcessingResult, schema, utils::database::{execute_in_chunks, get_config_table_chunk_size, ArcDbPool}, IndexerGrpcProcessorConfig, @@ -581,13 +582,15 @@ impl ProcessorTrait for TokenProcessor { let db_insertion_duration_in_secs = db_insertion_start.elapsed().as_secs_f64(); match tx_result { - Ok(_) => Ok(ProcessingResult { - start_version, - end_version, - processing_duration_in_secs, - db_insertion_duration_in_secs, - last_transaction_timestamp, - }), + Ok(_) => Ok(ProcessingResult::DefaultProcessingResult( + DefaultProcessingResult { + start_version, + end_version, + processing_duration_in_secs, + db_insertion_duration_in_secs, + last_transaction_timestamp, + }, + )), Err(e) => { error!( start_version = start_version, diff --git a/rust/processor/src/processors/token_v2_processor.rs b/rust/processor/src/processors/token_v2_processor.rs index d9b66e9ba..32530f9bb 100644 --- a/rust/processor/src/processors/token_v2_processor.rs +++ b/rust/processor/src/processors/token_v2_processor.rs @@ -1,7 +1,7 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use super::{ProcessingResult, ProcessorName, ProcessorTrait}; +use super::{DefaultProcessingResult, ProcessorName, ProcessorTrait}; use crate::{ db::common::models::{ fungible_asset_models::v2_fungible_asset_utils::FungibleAssetMetadata, @@ -26,6 +26,7 @@ use crate::{ }, }, }, + gap_detectors::ProcessingResult, schema, utils::{ counters::PROCESSOR_UNKNOWN_TYPE_COUNT, @@ -591,13 +592,15 @@ impl ProcessorTrait for TokenV2Processor { let db_insertion_duration_in_secs = db_insertion_start.elapsed().as_secs_f64(); match tx_result { - Ok(_) => Ok(ProcessingResult { - start_version, - end_version, - processing_duration_in_secs, - db_insertion_duration_in_secs, - last_transaction_timestamp, - }), + Ok(_) => Ok(ProcessingResult::DefaultProcessingResult( + DefaultProcessingResult { + start_version, + end_version, + processing_duration_in_secs, + db_insertion_duration_in_secs, + last_transaction_timestamp, + }, + )), Err(e) => { error!( start_version = start_version, diff --git a/rust/processor/src/processors/transaction_metadata_processor.rs b/rust/processor/src/processors/transaction_metadata_processor.rs index cab5cff2e..615dacd09 100644 --- a/rust/processor/src/processors/transaction_metadata_processor.rs +++ b/rust/processor/src/processors/transaction_metadata_processor.rs @@ -1,12 +1,13 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use super::{ProcessingResult, ProcessorName, ProcessorTrait}; +use super::{DefaultProcessingResult, ProcessorName, ProcessorTrait}; use crate::{ db::common::models::transaction_metadata_model::{ event_size_info::EventSize, transaction_size_info::TransactionSize, write_set_size_info::WriteSetSize, }, + gap_detectors::ProcessingResult, schema, utils::database::{execute_in_chunks, get_config_table_chunk_size, ArcDbPool}, }; @@ -198,13 +199,15 @@ impl ProcessorTrait for TransactionMetadataProcessor { .await; let db_insertion_duration_in_secs = db_insertion_start.elapsed().as_secs_f64(); match tx_result { - Ok(_) => Ok(ProcessingResult { - start_version, - end_version, - processing_duration_in_secs, - db_insertion_duration_in_secs, - last_transaction_timestamp: transactions.last().unwrap().timestamp.clone(), - }), + Ok(_) => Ok(ProcessingResult::DefaultProcessingResult( + DefaultProcessingResult { + start_version, + end_version, + processing_duration_in_secs, + db_insertion_duration_in_secs, + last_transaction_timestamp: transactions.last().unwrap().timestamp.clone(), + }, + )), Err(e) => { error!( start_version = start_version, diff --git a/rust/processor/src/processors/user_transaction_processor.rs b/rust/processor/src/processors/user_transaction_processor.rs index ba6bba95a..08416488e 100644 --- a/rust/processor/src/processors/user_transaction_processor.rs +++ b/rust/processor/src/processors/user_transaction_processor.rs @@ -1,11 +1,12 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use super::{ProcessingResult, ProcessorName, ProcessorTrait}; +use super::{DefaultProcessingResult, ProcessorName, ProcessorTrait}; use crate::{ db::common::models::user_transactions_models::{ signatures::Signature, user_transactions::UserTransactionModel, }, + gap_detectors::ProcessingResult, schema, utils::{ counters::PROCESSOR_UNKNOWN_TYPE_COUNT, @@ -191,13 +192,15 @@ impl ProcessorTrait for UserTransactionProcessor { .await; let db_insertion_duration_in_secs = db_insertion_start.elapsed().as_secs_f64(); match tx_result { - Ok(_) => Ok(ProcessingResult { - start_version, - end_version, - processing_duration_in_secs, - db_insertion_duration_in_secs, - last_transaction_timestamp, - }), + Ok(_) => Ok(ProcessingResult::DefaultProcessingResult( + DefaultProcessingResult { + start_version, + end_version, + processing_duration_in_secs, + db_insertion_duration_in_secs, + last_transaction_timestamp, + }, + )), Err(e) => { error!( start_version = start_version, diff --git a/rust/processor/src/utils/counters.rs b/rust/processor/src/utils/counters.rs index ee9431a8d..f4a6a57e1 100644 --- a/rust/processor/src/utils/counters.rs +++ b/rust/processor/src/utils/counters.rs @@ -235,6 +235,16 @@ pub static PROCESSOR_DATA_GAP_COUNT: Lazy = Lazy::new(|| { .unwrap() }); +/// Data gap warnings for parquet +pub static PARQUET_PROCESSOR_DATA_GAP_COUNT: Lazy = Lazy::new(|| { + register_int_gauge_vec!( + "indexer_parquet_processor_data_gap_count", + "Data gap count", + &["processor_name"] + ) + .unwrap() +}); + /// GRPC latency. pub static GRPC_LATENCY_BY_PROCESSOR_IN_SECS: Lazy = Lazy::new(|| { register_gauge_vec!( @@ -254,3 +264,21 @@ pub static PROCESSOR_UNKNOWN_TYPE_COUNT: Lazy = Lazy::new(|| { ) .unwrap() }); + +/// Parquet struct size +pub static PARQUET_STRUCT_SIZE: Lazy = Lazy::new(|| { + register_int_gauge_vec!("indexer_parquet_struct_size", "Parquet struct size", &[ + "parquet_type" + ]) + .unwrap() +}); + +/// Parquet handler buffer size +pub static PARQUET_HANDLER_BUFFER_SIZE: Lazy = Lazy::new(|| { + register_int_gauge_vec!( + "indexer_parquet_handler_buffer_size", + "Parquet handler buffer size", + &["parquet_type"] // TODO: add something like task_index + ) + .unwrap() +}); diff --git a/rust/processor/src/utils/util.rs b/rust/processor/src/utils/util.rs index 4b19fa675..844eaf565 100644 --- a/rust/processor/src/utils/util.rs +++ b/rust/processor/src/utils/util.rs @@ -274,11 +274,13 @@ pub fn parse_timestamp(ts: &Timestamp, version: i64) -> chrono::NaiveDateTime { } else { ts.clone() }; + #[allow(deprecated)] chrono::NaiveDateTime::from_timestamp_opt(final_ts.seconds, final_ts.nanos as u32) .unwrap_or_else(|| panic!("Could not parse timestamp {:?} for version {}", ts, version)) } pub fn parse_timestamp_secs(ts: u64, version: i64) -> chrono::NaiveDateTime { + #[allow(deprecated)] chrono::NaiveDateTime::from_timestamp_opt( std::cmp::min(ts, MAX_TIMESTAMP_SECS as u64) as i64, 0, @@ -512,14 +514,14 @@ mod tests { }, 1, ); - assert_eq!(ts.timestamp(), 1649560602); + assert_eq!(ts.and_utc().timestamp(), 1649560602); assert_eq!(ts.year(), 2022); let ts2 = parse_timestamp_secs(600000000000000, 2); assert_eq!(ts2.year(), 9999); let ts3 = parse_timestamp_secs(1659386386, 2); - assert_eq!(ts3.timestamp(), 1659386386); + assert_eq!(ts3.and_utc().timestamp(), 1659386386); } #[test] diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index c1cf021ce..08a9451a0 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -4,16 +4,18 @@ use crate::{ config::IndexerGrpcHttp2Config, db::common::models::{ledger_info::LedgerInfo, processor_status::ProcessorStatusQuery}, + gap_detectors::{create_gap_detector_status_tracker_loop, ProcessingResult}, grpc_stream::TransactionsPBResponse, processors::{ account_transactions_processor::AccountTransactionsProcessor, ans_processor::AnsProcessor, coin_processor::CoinProcessor, default_processor::DefaultProcessor, events_processor::EventsProcessor, fungible_asset_processor::FungibleAssetProcessor, monitoring_processor::MonitoringProcessor, nft_metadata_processor::NftMetadataProcessor, - objects_processor::ObjectsProcessor, stake_processor::StakeProcessor, - token_processor::TokenProcessor, token_v2_processor::TokenV2Processor, + objects_processor::ObjectsProcessor, parquet_default_processor::DefaultParquetProcessor, + stake_processor::StakeProcessor, token_processor::TokenProcessor, + token_v2_processor::TokenV2Processor, transaction_metadata_processor::TransactionMetadataProcessor, - user_transaction_processor::UserTransactionProcessor, ProcessingResult, Processor, + user_transaction_processor::UserTransactionProcessor, DefaultProcessingResult, Processor, ProcessorConfig, ProcessorTrait, }, schema::ledger_infos, @@ -38,6 +40,7 @@ use ahash::AHashMap; use anyhow::{Context, Result}; use aptos_moving_average::MovingAverage; use bitflags::bitflags; +use kanal::AsyncSender; use std::collections::HashSet; use tokio::task::JoinHandle; use tracing::{debug, error, info}; @@ -46,7 +49,9 @@ use url::Url; // this is how large the fetch queue should be. Each bucket should have a max of 80MB or so, so a batch // of 50 means that we could potentially have at least 4.8GB of data in memory at any given time and that we should provision // machines accordingly. -pub const BUFFER_SIZE: usize = 100; + +// TODO: Make this configurable +pub const BUFFER_SIZE: usize = 300; pub const PROCESSOR_SERVICE_TYPE: &str = "processor"; bitflags! { @@ -69,6 +74,7 @@ pub struct Worker { pub ending_version: Option, pub number_concurrent_processing_tasks: usize, pub gap_detection_batch_size: u64, + pub parquet_gap_detection_batch_size: u64, pub grpc_chain_id: Option, pub pb_channel_txn_chunk_size: usize, pub per_table_chunk_sizes: AHashMap, @@ -91,6 +97,7 @@ impl Worker { number_concurrent_processing_tasks: Option, db_pool_size: Option, gap_detection_batch_size: u64, + parquet_gap_detection_batch_size: u64, // The number of transactions per protobuf batch pb_channel_txn_chunk_size: usize, per_table_chunk_sizes: AHashMap, @@ -135,6 +142,7 @@ impl Worker { auth_token, number_concurrent_processing_tasks, gap_detection_batch_size, + parquet_gap_detection_batch_size, grpc_chain_id: None, pb_channel_txn_chunk_size, per_table_chunk_sizes, @@ -258,15 +266,41 @@ impl Worker { // Create a gap detector task that will panic if there is a gap in the processing let (gap_detector_sender, gap_detector_receiver) = kanal::bounded_async::(BUFFER_SIZE); - let gap_detection_batch_size = self.gap_detection_batch_size; - let processor = build_processor( - &self.processor_config, - self.per_table_chunk_sizes.clone(), - self.deprecated_tables, - self.db_pool.clone(), - ); + let (processor, gap_detection_batch_size, gap_detector_sender) = + if self.processor_config.is_parquet_processor() { + let processor = build_processor( + &self.processor_config, + self.per_table_chunk_sizes.clone(), + self.deprecated_tables, + self.db_pool.clone(), + Some(gap_detector_sender.clone()), + ); + let gap_detection_batch_size: u64 = self.parquet_gap_detection_batch_size; + + ( + processor, + gap_detection_batch_size, + Some(gap_detector_sender), + ) + } else { + let processor = build_processor( + &self.processor_config, + self.per_table_chunk_sizes.clone(), + self.deprecated_tables, + self.db_pool.clone(), + None, + ); + let gap_detection_batch_size = self.gap_detection_batch_size; + + ( + processor, + gap_detection_batch_size, + Some(gap_detector_sender), + ) + }; + tokio::spawn(async move { - crate::gap_detector::create_gap_detector_status_tracker_loop( + create_gap_detector_status_tracker_loop( gap_detector_receiver, processor, starting_version, @@ -292,7 +326,7 @@ impl Worker { let mut processor_tasks = vec![fetcher_task]; for task_index in 0..concurrent_tasks { - let join_handle = self + let join_handle: JoinHandle<()> = self .launch_processor_task(task_index, receiver.clone(), gap_detector_sender.clone()) .await; processor_tasks.push(join_handle); @@ -316,7 +350,7 @@ impl Worker { &self, task_index: usize, receiver: kanal::AsyncReceiver, - gap_detector_sender: kanal::AsyncSender, + gap_detector_sender: Option>, ) -> JoinHandle<()> { let processor_name = self.processor_config.name(); let stream_address = self.indexer_grpc_data_service_address.to_string(); @@ -329,6 +363,7 @@ impl Worker { self.per_table_chunk_sizes.clone(), self.deprecated_tables, self.db_pool.clone(), + gap_detector_sender.clone(), ); let concurrent_tasks = self.number_concurrent_processing_tasks; @@ -336,6 +371,7 @@ impl Worker { let chain_id = self .grpc_chain_id .expect("GRPC chain ID has not been fetched yet!"); + tokio::spawn(async move { let task_index_str = task_index.to_string(); let step = ProcessorStep::ProcessedBatch.get_step(); @@ -344,7 +380,6 @@ impl Worker { loop { let txn_channel_fetch_latency = std::time::Instant::now(); - match fetch_transactions( processor_name, &stream_address, @@ -458,76 +493,105 @@ impl Worker { }, }; - let processing_time = processing_time.elapsed().as_secs_f64(); - - // We've processed things: do some data and metrics + match processing_result { + ProcessingResult::DefaultProcessingResult(processing_result) => { + let processing_time = processing_time.elapsed().as_secs_f64(); - ma.tick_now((last_txn_version - first_txn_version) + 1); - let tps = ma.avg().ceil() as u64; + // We've processed things: do some data and metrics + ma.tick_now((last_txn_version - first_txn_version) + 1); + let tps = ma.avg().ceil() as u64; - let num_processed = (last_txn_version - first_txn_version) + 1; + let num_processed = (last_txn_version - first_txn_version) + 1; - debug!( - processor_name = processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - first_txn_version, - batch_first_txn_version, - last_txn_version, - batch_last_txn_version, - start_txn_timestamp_iso, - end_txn_timestamp_iso, - num_of_transactions = num_processed, - concurrent_tasks, - task_index, - size_in_bytes, - processing_duration_in_secs = - processing_result.processing_duration_in_secs, - db_insertion_duration_in_secs = - processing_result.db_insertion_duration_in_secs, - duration_in_secs = processing_time, - tps = tps, - bytes_per_sec = size_in_bytes / processing_time, - step = &step, - "{}", - label, - ); + debug!( + processor_name = processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + first_txn_version, + batch_first_txn_version, + last_txn_version, + batch_last_txn_version, + start_txn_timestamp_iso, + end_txn_timestamp_iso, + num_of_transactions = num_processed, + concurrent_tasks, + task_index, + size_in_bytes, + processing_duration_in_secs = + processing_result.processing_duration_in_secs, + db_insertion_duration_in_secs = + processing_result.db_insertion_duration_in_secs, + duration_in_secs = processing_time, + tps = tps, + bytes_per_sec = size_in_bytes / processing_time, + step = &step, + "{}", + label, + ); - // TODO: For these three, do an atomic thing, or ideally move to an async metrics collector! - GRPC_LATENCY_BY_PROCESSOR_IN_SECS - .with_label_values(&[processor_name, &task_index_str]) - .set(time_diff_since_pb_timestamp_in_secs( - end_txn_timestamp.as_ref().unwrap(), - )); - LATEST_PROCESSED_VERSION - .with_label_values(&[processor_name, step, label, &task_index_str]) - .set(last_txn_version as i64); - TRANSACTION_UNIX_TIMESTAMP - .with_label_values(&[processor_name, step, label, &task_index_str]) - .set(start_txn_timestamp_unix); - - // Single batch metrics - PROCESSED_BYTES_COUNT - .with_label_values(&[processor_name, step, label, &task_index_str]) - .inc_by(size_in_bytes as u64); - NUM_TRANSACTIONS_PROCESSED_COUNT - .with_label_values(&[processor_name, step, label, &task_index_str]) - .inc_by(num_processed); - - SINGLE_BATCH_PROCESSING_TIME_IN_SECS - .with_label_values(&[processor_name, &task_index_str]) - .set(processing_time); - SINGLE_BATCH_PARSING_TIME_IN_SECS - .with_label_values(&[processor_name, &task_index_str]) - .set(processing_result.processing_duration_in_secs); - SINGLE_BATCH_DB_INSERTION_TIME_IN_SECS - .with_label_values(&[processor_name, &task_index_str]) - .set(processing_result.db_insertion_duration_in_secs); - - // Send the result to the gap detector - gap_detector_sender - .send(processing_result) - .await - .expect("[Parser] Failed to send versions to gap detector"); + // TODO: For these three, do an atomic thing, or ideally move to an async metrics collector! + GRPC_LATENCY_BY_PROCESSOR_IN_SECS + .with_label_values(&[processor_name, &task_index_str]) + .set(time_diff_since_pb_timestamp_in_secs( + end_txn_timestamp.as_ref().unwrap(), + )); + LATEST_PROCESSED_VERSION + .with_label_values(&[ + processor_name, + step, + label, + &task_index_str, + ]) + .set(last_txn_version as i64); + TRANSACTION_UNIX_TIMESTAMP + .with_label_values(&[ + processor_name, + step, + label, + &task_index_str, + ]) + .set(start_txn_timestamp_unix); + + // Single batch metrics + PROCESSED_BYTES_COUNT + .with_label_values(&[ + processor_name, + step, + label, + &task_index_str, + ]) + .inc_by(size_in_bytes as u64); + NUM_TRANSACTIONS_PROCESSED_COUNT + .with_label_values(&[ + processor_name, + step, + label, + &task_index_str, + ]) + .inc_by(num_processed); + + SINGLE_BATCH_PROCESSING_TIME_IN_SECS + .with_label_values(&[processor_name, &task_index_str]) + .set(processing_time); + SINGLE_BATCH_PARSING_TIME_IN_SECS + .with_label_values(&[processor_name, &task_index_str]) + .set(processing_result.processing_duration_in_secs); + SINGLE_BATCH_DB_INSERTION_TIME_IN_SECS + .with_label_values(&[processor_name, &task_index_str]) + .set(processing_result.db_insertion_duration_in_secs); + + gap_detector_sender + .as_ref() + .unwrap() + .send(ProcessingResult::DefaultProcessingResult( + processing_result, + )) + .await + .expect("[Parser] Failed to send versions to gap detector"); + }, + ProcessingResult::ParquetProcessingResult(_) => { + debug!("parquet processing result doesn't need to be handled here"); + }, + } }, // Could not fetch transactions from channel. This happens when there are // no more transactions to fetch and the channel is closed. @@ -687,13 +751,15 @@ pub async fn do_processor( // Fake this as it's possible we have filtered out all of the txns in this batch if transactions_pb.transactions.is_empty() { - return Ok(ProcessingResult { - start_version, - end_version, - processing_duration_in_secs: 0.0, - db_insertion_duration_in_secs: 0.0, - last_transaction_timestamp: transactions_pb.end_txn_timestamp, - }); + return Ok(ProcessingResult::DefaultProcessingResult( + DefaultProcessingResult { + start_version, + end_version, + processing_duration_in_secs: 0.0, + db_insertion_duration_in_secs: 0.0, + last_transaction_timestamp: transactions_pb.end_txn_timestamp, + }, + )); } let txn_time = transactions_pb.start_txn_timestamp; @@ -746,6 +812,7 @@ pub fn build_processor( per_table_chunk_sizes: AHashMap, deprecated_tables: TableFlags, db_pool: ArcDbPool, + gap_detector_sender: Option>, // Parquet only ) -> Processor { match config { ProcessorConfig::AccountTransactionsProcessor => Processor::from( @@ -800,5 +867,12 @@ pub fn build_processor( ProcessorConfig::UserTransactionProcessor => Processor::from( UserTransactionProcessor::new(db_pool, per_table_chunk_sizes), ), + ProcessorConfig::DefaultParquetProcessor(config) => { + Processor::from(DefaultParquetProcessor::new( + db_pool, + config.clone(), + gap_detector_sender.expect("Parquet processor requires a gap detector sender"), + )) + }, } }