diff --git a/rust/Cargo.lock b/rust/Cargo.lock index ee848ca..4e42728 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -151,7 +151,6 @@ dependencies = [ [[package]] name = "aptos-indexer-transaction-stream" version = "0.1.0" -source = "git+https://github.com/aptos-labs/aptos-indexer-processors.git?rev=3c8896b489062ce0510c5e4162366db753b59499#3c8896b489062ce0510c5e4162366db753b59499" dependencies = [ "anyhow", "aptos-moving-average", @@ -159,7 +158,7 @@ dependencies = [ "bigdecimal", "chrono", "futures-util", - "itertools", + "itertools 0.13.0", "kanal", "once_cell", "prometheus", @@ -168,14 +167,12 @@ dependencies = [ "tokio", "tonic", "tracing", - "transaction-filter", "url", ] [[package]] name = "aptos-moving-average" version = "0.1.0" -source = "git+https://github.com/aptos-labs/aptos-indexer-processors.git?rev=3c8896b489062ce0510c5e4162366db753b59499#3c8896b489062ce0510c5e4162366db753b59499" dependencies = [ "chrono", ] @@ -1601,6 +1598,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.11" @@ -2431,7 +2437,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" dependencies = [ "anyhow", - "itertools", + "itertools 0.12.1", "proc-macro2", "quote", "syn 2.0.66", @@ -3543,16 +3549,6 @@ dependencies = [ "tracing-serde", ] -[[package]] -name = "transaction-filter" -version = "0.1.0" -source = "git+https://github.com/aptos-labs/aptos-indexer-processors.git?rev=3c8896b489062ce0510c5e4162366db753b59499#3c8896b489062ce0510c5e4162366db753b59499" -dependencies = [ - "ahash", - "aptos-protos", - "serde", -] - [[package]] name = "try-lock" version = "0.2.5" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 8ceab65..f52700d 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -1,7 +1,13 @@ [workspace] resolver = "2" -members = ["instrumented-channel", "sdk", "sdk-examples"] +members = [ + "instrumented-channel", + "moving-average", + "sdk", + "sdk-examples", + "transaction-stream", +] [workspace.package] authors = ["Aptos Labs "] @@ -14,12 +20,13 @@ rust-version = "1.75" [workspace.dependencies] aptos-indexer-processor-sdk = { path = "sdk" } +aptos-indexer-transaction-stream = { path = "transaction-stream" } instrumented-channel = { path = "instrumented-channel" } +aptos-moving-average = { path = "moving-average" } sdk-server-framework = { path = 'sdk-server-framework' } ahash = { version = "0.8.7", features = ["serde"] } anyhow = "1.0.86" -aptos-indexer-transaction-stream = { git = 'https://github.com/aptos-labs/aptos-indexer-processors.git', rev = '3c8896b489062ce0510c5e4162366db753b59499' } aptos-protos = { git = "https://github.com/aptos-labs/aptos-core.git", tag = "aptos-node-v1.12.1" } aptos-system-utils = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "4541add3fd29826ec57f22658ca286d2d6134b93" } async-trait = "0.1.80" @@ -54,6 +61,7 @@ field_count = "0.1.1" futures = "0.3.30" futures-util = "0.3.21" hex = "0.4.3" +itertools = "0.13.0" jemallocator = { version = "0.5.0", features = [ "profiling", "unprefixed_malloc_on_supported_platforms", @@ -66,6 +74,7 @@ once_cell = { version = "1.19.0" } petgraph = "0.6.5" prometheus = "0.13.4" prometheus-client = "0.22.2" +prost = { version = "0.12.3", features = ["no-recursion-limit"] } rayon = "1.10.0" serde = { version = "1.0.193", features = ["derive", "rc"] } serde_json = { version = "1.0.81", features = ["preserve_order"] } @@ -78,6 +87,15 @@ tiny-keccak = { version = "2.0.2", features = ["keccak", "sha3"] } tracing = "0.1.34" tokio = { version = "1.37.0", features = ["full"] } toml = "0.7.4" +tonic = { version = "0.11.0", features = [ + "tls", + "tls-roots", + "transport", + "prost", + "gzip", + "codegen", + "zstd", +] } tracing-subscriber = { version = "0.3.17", features = ["json", "env-filter"] } url = { version = "2.5.1", features = ["serde"] } warp = { version = "0.3.5", features = ["tls"] } diff --git a/rust/moving-average/Cargo.toml b/rust/moving-average/Cargo.toml new file mode 100644 index 0000000..657a335 --- /dev/null +++ b/rust/moving-average/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "aptos-moving-average" +description = "Utility to calculate moving average such as TPS" +version = "0.1.0" + +# Workspace inherited keys +authors = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +publish = { workspace = true } +repository = { workspace = true } +rust-version = { workspace = true } + +[dependencies] +chrono = { workspace = true } diff --git a/rust/moving-average/src/lib.rs b/rust/moving-average/src/lib.rs new file mode 100644 index 0000000..826949d --- /dev/null +++ b/rust/moving-average/src/lib.rs @@ -0,0 +1,84 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +#![forbid(unsafe_code)] + +use std::collections::VecDeque; + +// TPS data +pub struct MovingAverage { + window_millis: u64, + // (timestamp_millis, value) + values: VecDeque<(u64, u64)>, + sum: u64, +} + +impl MovingAverage { + pub fn new(window_millis: u64) -> Self { + let now = chrono::Utc::now().naive_utc().and_utc().timestamp_millis() as u64; + let mut queue = VecDeque::new(); + queue.push_back((now, 0)); + Self { + window_millis, + values: queue, + sum: 0, + } + } + + pub fn tick_now(&mut self, value: u64) { + let now = chrono::Utc::now().naive_utc().and_utc().timestamp_millis() as u64; + self.tick(now, value); + } + + pub fn tick(&mut self, timestamp_millis: u64, value: u64) -> f64 { + self.values.push_back((timestamp_millis, value)); + self.sum += value; + while self.values.len() > 2 { + match self.values.front() { + None => break, + Some((ts, val)) => { + if timestamp_millis - ts > self.window_millis { + self.sum -= val; + self.values.pop_front(); + } else { + break; + } + }, + } + } + self.avg() + } + + // Only be called after tick_now/tick is called. + pub fn avg(&self) -> f64 { + if self.values.len() < 2 { + 0.0 + } else { + let elapsed = self.values.back().unwrap().0 - self.values.front().unwrap().0; + (self.sum * 1000) as f64 / elapsed as f64 + } + } + + pub fn sum(&self) -> u64 { + self.sum + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_moving_average() { + // 10 Second window. + let mut ma = MovingAverage::new(10_000); + // 9 seconds spent at 100 TPS. + for _ in 0..9 { + ma.tick_now(100); + std::thread::sleep(std::time::Duration::from_secs(1)); + } + // No matter what algorithm we use, the average should be 99 at least. + let avg = ma.avg(); + assert!(avg >= 99.0, "Average is too low: {}", avg); + } +} diff --git a/rust/sdk-examples/src/common_steps/latest_processed_version_tracker.rs b/rust/sdk-examples/src/common_steps/latest_processed_version_tracker.rs index 3b1fb6c..1a56da7 100644 --- a/rust/sdk-examples/src/common_steps/latest_processed_version_tracker.rs +++ b/rust/sdk-examples/src/common_steps/latest_processed_version_tracker.rs @@ -6,11 +6,11 @@ use crate::{ }; use ahash::AHashMap; use anyhow::{Context, Result}; -use aptos_indexer_processor_sdk::utils::time::parse_timestamp; use aptos_indexer_processor_sdk::{ steps::{pollable_async_step::PollableAsyncRunType, PollableAsyncStep}, traits::{NamedStep, Processable}, types::transaction_context::TransactionContext, + utils::time::parse_timestamp, }; use async_trait::async_trait; use diesel::{upsert::excluded, ExpressionMethods}; @@ -92,17 +92,15 @@ where "Gap detected starting from version: {}", current_batch.start_version ); - self.seen_versions.insert( - current_batch.start_version, - TransactionContext { + self.seen_versions + .insert(current_batch.start_version, TransactionContext { data: vec![], // No data is needed for tracking. This is to avoid clone. start_version: current_batch.start_version, end_version: current_batch.end_version, start_transaction_timestamp: current_batch.start_transaction_timestamp.clone(), end_transaction_timestamp: current_batch.end_transaction_timestamp.clone(), total_size_in_bytes: current_batch.total_size_in_bytes, - }, - ); + }); } else { tracing::debug!("No gap detected"); // If the current_batch is the next expected version, update the last success batch diff --git a/rust/sdk-examples/src/processors/events/events_processor.rs b/rust/sdk-examples/src/processors/events/events_processor.rs index ea405b9..c89c657 100644 --- a/rust/sdk-examples/src/processors/events/events_processor.rs +++ b/rust/sdk-examples/src/processors/events/events_processor.rs @@ -9,7 +9,7 @@ use aptos_indexer_processor_sdk::{ steps::{TimedBuffer, TransactionStreamStep}, traits::{IntoRunnableStep, RunnableStepWithInputReceiver}, }; -use aptos_indexer_transaction_stream::config::TransactionStreamConfig; +use aptos_indexer_transaction_stream::TransactionStreamConfig; use instrumented_channel::instrumented_bounded_channel; use std::time::Duration; @@ -40,7 +40,7 @@ impl EventsProcessor { let timed_buffer = TimedBuffer::new(Duration::from_secs(1)); let version_tracker = LatestVersionProcessedTracker::new( self.db_config, - self.transaction_stream_config.starting_version, + self.transaction_stream_config.starting_version.unwrap(), "events_processor".to_string(), ) .await?; diff --git a/rust/sdk-examples/src/processors/events/events_storer.rs b/rust/sdk-examples/src/processors/events/events_storer.rs index 8cc3fb2..f53b407 100644 --- a/rust/sdk-examples/src/processors/events/events_storer.rs +++ b/rust/sdk-examples/src/processors/events/events_storer.rs @@ -1,6 +1,6 @@ -use crate::db::models::events_models::EventModel; use crate::{ config::indexer_processor_config::DbConfig, + db::models::events_models::EventModel, schema, utils::database::{execute_in_chunks, get_config_table_chunk_size, new_db_pool, ArcDbPool}, }; diff --git a/rust/sdk/src/main.rs b/rust/sdk/src/main.rs index 2327cea..ed812a1 100644 --- a/rust/sdk/src/main.rs +++ b/rust/sdk/src/main.rs @@ -4,7 +4,7 @@ use aptos_indexer_processor_sdk::{ steps::{TimedBuffer, TransactionStreamStep}, traits::IntoRunnableStep, }; -use aptos_indexer_transaction_stream::config::TransactionStreamConfig; +use aptos_indexer_transaction_stream::TransactionStreamConfig; use std::time::Duration; use url::Url; @@ -31,7 +31,7 @@ async fn run_processor() -> Result<()> { // let (input_sender, input_receiver) = kanal::bounded_async(1); let transaction_stream_config = TransactionStreamConfig { indexer_grpc_data_service_address: Url::parse("https://grpc.devnet.aptoslabs.com:443")?, - starting_version: 0, + starting_version: Some(0), request_ending_version: None, auth_token: String::from("aptoslabs_TJs4NQU8Xf5_EJMNnZFPXRH6YNpWM7bCcurMBEUtZtRb6"), request_name_header: String::from("sdk_processor"), diff --git a/rust/sdk/src/steps/transaction_stream_step.rs b/rust/sdk/src/steps/transaction_stream_step.rs index ef511b4..646cef0 100644 --- a/rust/sdk/src/steps/transaction_stream_step.rs +++ b/rust/sdk/src/steps/transaction_stream_step.rs @@ -5,8 +5,7 @@ use crate::{ }; use anyhow::Result; use aptos_indexer_transaction_stream::{ - config::TransactionStreamConfig, - transaction_stream::TransactionStream as TransactionStreamInternal, + TransactionStream as TransactionStreamInternal, TransactionStreamConfig, }; use aptos_protos::transaction::v1::Transaction; use async_trait::async_trait; diff --git a/rust/sdk/src/types/transaction_context.rs b/rust/sdk/src/types/transaction_context.rs index 49e6a16..6b92ca1 100644 --- a/rust/sdk/src/types/transaction_context.rs +++ b/rust/sdk/src/types/transaction_context.rs @@ -1,4 +1,4 @@ -use aptos_indexer_transaction_stream::utils::util::timestamp_to_unixtime; +use aptos_indexer_transaction_stream::utils::timestamp_to_unixtime; /// TransactionContext is a struct that holds data processed from a set of transactions /// and includes metadata about the transactions that the data is associated with. diff --git a/rust/transaction-stream/Cargo.toml b/rust/transaction-stream/Cargo.toml new file mode 100644 index 0000000..0c3c64b --- /dev/null +++ b/rust/transaction-stream/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "aptos-indexer-transaction-stream" +version = "0.1.0" + +# Workspace inherited keys +authors = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +publish = { workspace = true } +repository = { workspace = true } +rust-version = { workspace = true } +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = { workspace = true } +aptos-moving-average = { workspace = true } +aptos-protos = { workspace = true } +bigdecimal = { workspace = true } +chrono = { workspace = true } +futures-util = { workspace = true } +itertools = { workspace = true } +kanal = { workspace = true } +once_cell = { workspace = true } +prometheus = { workspace = true } +prost = { workspace = true } +serde = { workspace = true } +tokio = { workspace = true } +tonic = { workspace = true } +tracing = { workspace = true } +url = { workspace = true } diff --git a/rust/transaction-stream/src/config.rs b/rust/transaction-stream/src/config.rs new file mode 100644 index 0000000..1234e89 --- /dev/null +++ b/rust/transaction-stream/src/config.rs @@ -0,0 +1,60 @@ +use serde::{Deserialize, Serialize}; +use std::time::Duration; +use url::Url; + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub struct TransactionStreamConfig { + pub indexer_grpc_data_service_address: Url, + pub starting_version: Option, + pub request_ending_version: Option, + pub auth_token: String, + pub request_name_header: String, + #[serde(default = "TransactionStreamConfig::default_indexer_grpc_http2_ping_interval")] + pub indexer_grpc_http2_ping_interval_secs: u64, + #[serde(default = "TransactionStreamConfig::default_indexer_grpc_http2_ping_timeout")] + pub indexer_grpc_http2_ping_timeout_secs: u64, + #[serde(default = "TransactionStreamConfig::default_indexer_grpc_reconnection_timeout")] + pub indexer_grpc_reconnection_timeout_secs: u64, + #[serde(default = "TransactionStreamConfig::default_indexer_grpc_response_item_timeout")] + pub indexer_grpc_response_item_timeout_secs: u64, +} + +impl TransactionStreamConfig { + pub const fn indexer_grpc_http2_ping_interval(&self) -> Duration { + Duration::from_secs(self.indexer_grpc_http2_ping_interval_secs) + } + + pub const fn indexer_grpc_http2_ping_timeout(&self) -> Duration { + Duration::from_secs(self.indexer_grpc_http2_ping_timeout_secs) + } + + pub const fn indexer_grpc_reconnection_timeout(&self) -> Duration { + Duration::from_secs(self.indexer_grpc_reconnection_timeout_secs) + } + + pub const fn indexer_grpc_response_item_timeout(&self) -> Duration { + Duration::from_secs(self.indexer_grpc_response_item_timeout_secs) + } + + /// Indexer GRPC http2 ping interval in seconds. Defaults to 30. + /// Tonic ref: https://docs.rs/tonic/latest/tonic/transport/channel/struct.Endpoint.html#method.http2_keep_alive_interval + pub const fn default_indexer_grpc_http2_ping_interval() -> u64 { + 30 + } + + /// Indexer GRPC http2 ping timeout in seconds. Defaults to 10. + pub const fn default_indexer_grpc_http2_ping_timeout() -> u64 { + 10 + } + + /// Default timeout for establishing a grpc connection. Defaults to 5 seconds. + pub const fn default_indexer_grpc_reconnection_timeout() -> u64 { + 5 + } + + /// Default timeout for receiving an item from grpc stream. Defaults to 60 seconds. + pub const fn default_indexer_grpc_response_item_timeout() -> u64 { + 60 + } +} diff --git a/rust/transaction-stream/src/lib.rs b/rust/transaction-stream/src/lib.rs new file mode 100644 index 0000000..a2ac14e --- /dev/null +++ b/rust/transaction-stream/src/lib.rs @@ -0,0 +1,6 @@ +pub mod config; +pub mod transaction_stream; +pub mod utils; + +pub use config::TransactionStreamConfig; +pub use transaction_stream::{TransactionStream, TransactionsPBResponse}; diff --git a/rust/transaction-stream/src/transaction_stream.rs b/rust/transaction-stream/src/transaction_stream.rs new file mode 100644 index 0000000..76fb146 --- /dev/null +++ b/rust/transaction-stream/src/transaction_stream.rs @@ -0,0 +1,581 @@ +use crate::{config::TransactionStreamConfig, utils::timestamp_to_iso}; +use anyhow::{anyhow, Result}; +use aptos_moving_average::MovingAverage; +use aptos_protos::{ + indexer::v1::{raw_data_client::RawDataClient, GetTransactionsRequest, TransactionsResponse}, + transaction::v1::Transaction, + util::timestamp::Timestamp, +}; +use futures_util::StreamExt; +use prost::Message; +use std::time::Duration; +use tokio::time::timeout; +use tonic::{Response, Streaming}; +use tracing::{error, info}; + +/// GRPC request metadata key for the token ID. +const GRPC_API_GATEWAY_API_KEY_HEADER: &str = "authorization"; +/// GRPC request metadata key for the request name. This is used to identify the +/// data destination. +const GRPC_REQUEST_NAME_HEADER: &str = "x-aptos-request-name"; +/// GRPC connection id +const GRPC_CONNECTION_ID: &str = "x-aptos-connection-id"; +/// We will try to reconnect to GRPC 5 times in case upstream connection is being updated +pub const RECONNECTION_MAX_RETRIES: u64 = 5; +/// 256MB +pub const MAX_RESPONSE_SIZE: usize = 1024 * 1024 * 256; + +/// TransactionsPBResponse is a struct that holds the transactions fetched from the stream. +/// It also includes some contextual information about the transactions. +#[derive(Clone)] +pub struct TransactionsPBResponse { + pub transactions: Vec, + pub chain_id: u64, + // We put start/end versions here as filtering means there are potential "gaps" here now + pub start_version: u64, + pub end_version: u64, + pub start_txn_timestamp: Option, + pub end_txn_timestamp: Option, + pub size_in_bytes: u64, +} + +/// Helper function to build a GRPC request for fetching transactions. +pub fn grpc_request_builder( + starting_version: Option, + transactions_count: Option, + grpc_auth_token: String, + request_name_header: String, +) -> tonic::Request { + let mut request = tonic::Request::new(GetTransactionsRequest { + starting_version, + transactions_count, + ..GetTransactionsRequest::default() + }); + request.metadata_mut().insert( + GRPC_API_GATEWAY_API_KEY_HEADER, + format!("Bearer {}", grpc_auth_token.clone()) + .parse() + .unwrap(), + ); + request.metadata_mut().insert( + GRPC_REQUEST_NAME_HEADER, + request_name_header.parse().unwrap(), + ); + request +} + +/// Given a `TransactionStreamConfig`, this function will return a stream of transactions. +/// It also handles timeouts and retries. +pub async fn get_stream( + transaction_stream_config: TransactionStreamConfig, +) -> Result>> { + info!( + stream_address = transaction_stream_config + .indexer_grpc_data_service_address + .to_string(), + start_version = transaction_stream_config.starting_version, + end_version = transaction_stream_config.request_ending_version, + "[Transaction Stream] Setting up rpc channel" + ); + + let channel = tonic::transport::Channel::from_shared( + transaction_stream_config + .indexer_grpc_data_service_address + .to_string(), + ) + .expect( + "[Transaction Stream] Failed to build GRPC channel, perhaps because the data service URL is invalid", + ) + .http2_keep_alive_interval(transaction_stream_config.indexer_grpc_http2_ping_interval()) + .keep_alive_timeout(transaction_stream_config.indexer_grpc_http2_ping_timeout()); + + // If the scheme is https, add a TLS config. + let channel = if transaction_stream_config + .indexer_grpc_data_service_address + .scheme() + == "https" + { + let config = tonic::transport::channel::ClientTlsConfig::new(); + channel + .tls_config(config) + .expect("[Transaction Stream] Failed to create TLS config") + } else { + channel + }; + + info!( + stream_address = transaction_stream_config + .indexer_grpc_data_service_address + .to_string(), + start_version = transaction_stream_config.starting_version, + end_version = transaction_stream_config.request_ending_version, + "[Transaction Stream] Setting up GRPC client" + ); + + // TODO: move this to a config file + // Retry this connection a few times before giving up + let mut connect_retries = 0; + let res = loop { + let res = timeout( + transaction_stream_config.indexer_grpc_reconnection_timeout(), + RawDataClient::connect(channel.clone()), + ) + .await; + match res { + Ok(connect_res) => match connect_res { + Ok(client) => break Ok(client), + Err(e) => { + error!( + stream_address = transaction_stream_config.indexer_grpc_data_service_address.to_string(), + start_version = transaction_stream_config.starting_version, + end_version = transaction_stream_config.request_ending_version, + error = ?e, + "[Transaction Stream] Error connecting to GRPC client" + ); + connect_retries += 1; + if connect_retries >= RECONNECTION_MAX_RETRIES { + break Err(anyhow!("Error connecting to GRPC client").context(e)); + } + }, + }, + Err(e) => { + error!( + stream_address = transaction_stream_config.indexer_grpc_data_service_address.to_string(), + start_version = transaction_stream_config.starting_version, + end_version = transaction_stream_config.request_ending_version, + retries = connect_retries, + error = ?e, + "[Transaction Stream] Timed out connecting to GRPC client" + ); + connect_retries += 1; + if connect_retries >= RECONNECTION_MAX_RETRIES { + break Err(anyhow!("Timed out connecting to GRPC client")); + } + }, + } + }; + + let raw_data_client = res?; + + let mut rpc_client = raw_data_client + .accept_compressed(tonic::codec::CompressionEncoding::Gzip) + .accept_compressed(tonic::codec::CompressionEncoding::Zstd) + .send_compressed(tonic::codec::CompressionEncoding::Zstd) + .max_decoding_message_size(MAX_RESPONSE_SIZE) + .max_encoding_message_size(MAX_RESPONSE_SIZE); + + let count = transaction_stream_config.request_ending_version.map(|v| { + (v as i64 + - transaction_stream_config + .starting_version + .unwrap_or_else(|| { + panic!("starting_version is required when using request_ending_version") + }) as i64 + + 1) as u64 + }); + + info!( + stream_address = transaction_stream_config.indexer_grpc_data_service_address.to_string(), + start_version = transaction_stream_config.starting_version, + end_version = transaction_stream_config.request_ending_version, + num_of_transactions = ?count, + "[Transaction Stream] Setting up GRPC stream", + ); + + // TODO: move this to a config file + // Retry this connection a few times before giving up + let mut connect_retries = 0; + loop { + let timeout_res = timeout( + transaction_stream_config.indexer_grpc_reconnection_timeout(), + async { + let request = grpc_request_builder( + transaction_stream_config.starting_version, + count, + transaction_stream_config.auth_token.clone(), + transaction_stream_config.request_name_header.clone(), + ); + rpc_client.get_transactions(request).await + }, + ) + .await; + match timeout_res { + Ok(response_res) => match response_res { + Ok(response) => break Ok(response), + Err(e) => { + error!( + stream_address = transaction_stream_config.indexer_grpc_data_service_address.to_string(), + start_version = transaction_stream_config.starting_version, + end_version = transaction_stream_config.request_ending_version, + error = ?e, + "[Transaction Stream] Error making grpc request. Retrying..." + ); + connect_retries += 1; + if connect_retries >= RECONNECTION_MAX_RETRIES { + break Err(anyhow!("Error making grpc request").context(e)); + } + }, + }, + Err(e) => { + error!( + stream_address = transaction_stream_config.indexer_grpc_data_service_address.to_string(), + start_version = transaction_stream_config.starting_version, + end_version = transaction_stream_config.request_ending_version, + retries = connect_retries, + error = ?e, + "[Transaction Stream] Timeout making grpc request. Retrying...", + ); + connect_retries += 1; + if connect_retries >= RECONNECTION_MAX_RETRIES { + break Err(anyhow!("Timeout making grpc request").context(e)); + } + }, + } + } +} + +/// Helper function to get the chain id from the stream. +pub async fn get_chain_id(transaction_stream_config: TransactionStreamConfig) -> Result { + info!( + stream_address = transaction_stream_config + .indexer_grpc_data_service_address + .to_string(), + "[Transaction Stream] Connecting to GRPC stream to get chain id", + ); + + let transaction_stream_config_for_chain_id = TransactionStreamConfig { + starting_version: Some(1), + request_ending_version: Some(2), + ..transaction_stream_config.clone() + }; + let response = get_stream(transaction_stream_config_for_chain_id).await?; + let connection_id = match response.metadata().get(GRPC_CONNECTION_ID) { + Some(connection_id) => connection_id.to_str().unwrap().to_string(), + None => "".to_string(), + }; + let mut resp_stream = response.into_inner(); + info!( + stream_address = transaction_stream_config + .indexer_grpc_data_service_address + .to_string(), + connection_id, "[Transaction Stream] Successfully connected to GRPC stream to get chain id", + ); + + match resp_stream.next().await { + Some(Ok(r)) => match r.chain_id { + Some(chain_id) => Ok(chain_id), + None => { + error!( + stream_address = transaction_stream_config + .indexer_grpc_data_service_address + .to_string(), + connection_id, "[Transaction Stream] Chain Id doesn't exist." + ); + Err(anyhow!("Chain Id doesn't exist")) + }, + }, + Some(Err(rpc_error)) => { + error!( + stream_address = transaction_stream_config.indexer_grpc_data_service_address.to_string(), + connection_id, + error = ?rpc_error, + "[Transaction Stream] Error receiving datastream response for chain id" + ); + Err(anyhow!("Error receiving datastream response for chain id").context(rpc_error)) + }, + None => { + error!( + stream_address = transaction_stream_config + .indexer_grpc_data_service_address + .to_string(), + connection_id, + "[Transaction Stream] Stream ended before getting response fo for chain id" + ); + Err(anyhow!("Stream ended before getting response for chain id")) + }, + } +} + +/// TransactionStream is a struct that holds the state of the stream and provides methods to fetch transactions +/// from the stream. +/// - init_stream: Initializes the stream and returns the stream and connection id +/// - get_next_transaction_batch: Fetches the next batch of transactions from the stream +/// - is_end_of_stream: Checks if we've reached the end of the stream. This is determined by the ending version set in `TransactionStreamConfig` +/// - reconnect_to_grpc: Reconnects to the GRPC stream +/// - get_chain_id: Fetches the chain id from the stream +pub struct TransactionStream { + transaction_stream_config: TransactionStreamConfig, + stream: Streaming, + connection_id: String, + next_version_to_fetch: Option, + reconnection_retries: u64, + last_fetched_version: Option, + fetch_ma: MovingAverage, +} + +impl TransactionStream { + pub async fn new(transaction_stream_config: TransactionStreamConfig) -> Result { + let (stream, connection_id) = Self::init_stream(transaction_stream_config.clone()).await?; + Ok(Self { + transaction_stream_config: transaction_stream_config.clone(), + stream, + connection_id, + next_version_to_fetch: transaction_stream_config.starting_version, + reconnection_retries: 0, + last_fetched_version: transaction_stream_config + .starting_version + .map(|v| v as i64 - 1), + fetch_ma: MovingAverage::new(3000), + }) + } + + async fn init_stream( + transaction_stream_config: TransactionStreamConfig, + ) -> Result<(Streaming, String)> { + info!( + stream_address = transaction_stream_config + .indexer_grpc_data_service_address + .to_string(), + start_version = transaction_stream_config.starting_version, + end_version = transaction_stream_config.request_ending_version, + "[Transaction Stream] Connecting to GRPC stream", + ); + let resp_stream = get_stream(transaction_stream_config.clone()).await?; + let connection_id = match resp_stream.metadata().get(GRPC_CONNECTION_ID) { + Some(connection_id) => connection_id.to_str().unwrap().to_string(), + None => "".to_string(), + }; + info!( + stream_address = transaction_stream_config + .indexer_grpc_data_service_address + .to_string(), + connection_id = connection_id, + start_version = transaction_stream_config.starting_version, + end_version = transaction_stream_config.request_ending_version, + "[Transaction Stream] Successfully connected to GRPC stream", + ); + Ok((resp_stream.into_inner(), connection_id)) + } + + /// Gets a batch of transactions from the stream. Batch size is set in the grpc server. + /// The number of batches depends on our config + /// There could be several special scenarios: + /// 1. If we lose the connection, we will try reconnecting X times within Y seconds before crashing. + /// 2. If we specified an end version and we hit that, we will stop fetching, but we will make sure that + /// all existing transactions are processed + /// + /// Returns + /// - true if should continue fetching + /// - false if we reached the end of the stream or there is an error and the loop should stop + pub async fn get_next_transaction_batch(&mut self) -> Result { + let grpc_channel_recv_latency = std::time::Instant::now(); + + let txn_pb_res = match tokio::time::timeout( + self.transaction_stream_config + .indexer_grpc_response_item_timeout(), + self.stream.next(), + ) + .await + { + // Received datastream response + Ok(response) => { + match response { + Some(Ok(r)) => { + self.reconnection_retries = 0; + let start_version = r.transactions.as_slice().first().unwrap().version; + let start_txn_timestamp = + r.transactions.as_slice().first().unwrap().timestamp.clone(); + let end_version = r.transactions.as_slice().last().unwrap().version; + let end_txn_timestamp = + r.transactions.as_slice().last().unwrap().timestamp.clone(); + + self.next_version_to_fetch = Some(end_version + 1); + + let size_in_bytes = r.encoded_len() as u64; + let chain_id: u64 = r + .chain_id + .expect("[Transaction Stream] Chain Id doesn't exist."); + let num_txns = r.transactions.len(); + let duration_in_secs = grpc_channel_recv_latency.elapsed().as_secs_f64(); + self.fetch_ma.tick_now(num_txns as u64); + + info!( + stream_address = self + .transaction_stream_config + .indexer_grpc_data_service_address + .to_string(), + connection_id = self.connection_id, + start_version, + end_version, + start_txn_timestamp_iso = start_txn_timestamp + .as_ref() + .map(timestamp_to_iso) + .unwrap_or_default(), + end_txn_timestamp_iso = end_txn_timestamp + .as_ref() + .map(timestamp_to_iso) + .unwrap_or_default(), + num_of_transactions = end_version - start_version + 1, + size_in_bytes, + duration_in_secs, + tps = self.fetch_ma.avg().ceil() as u64, + bytes_per_sec = size_in_bytes as f64 / duration_in_secs, + "[Transaction Stream] Received transactions from GRPC.", + ); + + if let Some(last_fetched_version) = self.last_fetched_version { + if last_fetched_version + 1 != start_version as i64 { + error!( + batch_start_version = self.last_fetched_version.map(|v| v + 1), + self.last_fetched_version, + current_fetched_version = start_version, + "[Transaction Stream] Received batch with gap from GRPC stream" + ); + return Err(anyhow!("Received batch with gap from GRPC stream")); + } + } + self.last_fetched_version = Some(end_version as i64); + + let txn_pb = TransactionsPBResponse { + transactions: r.transactions, + chain_id, + start_version, + end_version, + start_txn_timestamp, + end_txn_timestamp, + size_in_bytes, + }; + + Ok(txn_pb) + }, + // Error receiving datastream response + Some(Err(rpc_error)) => { + tracing::warn!( + stream_address = self.transaction_stream_config.indexer_grpc_data_service_address.to_string(), + self.connection_id, + start_version = self.transaction_stream_config.starting_version, + end_version = self.transaction_stream_config.request_ending_version, + error = ?rpc_error, + "[Transaction Stream] Error receiving datastream response." + ); + Err(anyhow!("Error receiving datastream response")) + }, + // Stream is finished + None => { + tracing::warn!( + stream_address = self + .transaction_stream_config + .indexer_grpc_data_service_address + .to_string(), + connection_id = self.connection_id, + start_version = self.transaction_stream_config.starting_version, + end_version = self.transaction_stream_config.request_ending_version, + "[Transaction Stream] Stream ended." + ); + Err(anyhow!("Stream ended")) + }, + } + }, + // Timeout receiving datastream response + Err(e) => { + tracing::warn!( + stream_address = self.transaction_stream_config.indexer_grpc_data_service_address.to_string(), + connection_id = self.connection_id, + start_version = self.transaction_stream_config.starting_version, + end_version = self.transaction_stream_config.request_ending_version, + error = ?e, + "[Transaction Stream] Timeout receiving datastream response." + ); + Err(anyhow!("Timeout receiving datastream response")) + }, + }; + txn_pb_res + } + + /// Helper function to signal that we've fetched all the transactions up to the ending version that was requested. + pub fn is_end_of_stream(&self) -> bool { + if let (Some(ending_version), Some(next_version_to_fetch)) = ( + self.transaction_stream_config.request_ending_version, + self.next_version_to_fetch, + ) { + next_version_to_fetch > ending_version + } else { + false + } + } + + pub async fn reconnect_to_grpc_with_retries(&mut self) -> Result<()> { + let mut reconnection_retries = 0; + + loop { + // Sleep for 100ms between reconnect tries + // TODO: Turn this into exponential backoff + tokio::time::sleep(Duration::from_millis(100)).await; + + reconnection_retries += 1; + + if reconnection_retries >= RECONNECTION_MAX_RETRIES { + error!( + stream_address = self + .transaction_stream_config + .indexer_grpc_data_service_address + .to_string(), + "[Transaction Stream] Reconnected more than 100 times. Will not retry.", + ); + break Err(anyhow!("Reconnected more than 100 times. Will not retry.")); + } + + match self.reconnect_to_grpc().await { + Ok(_) => { + break Ok(()); + }, + Err(e) => { + error!( + stream_address = self.transaction_stream_config + .indexer_grpc_data_service_address + .to_string(), + error = ?e, + "[Transaction Stream] Error reconnecting to GRPC stream. Retrying..." + ); + continue; + }, + } + } + } + + pub async fn reconnect_to_grpc(&mut self) -> Result<()> { + info!( + stream_address = self + .transaction_stream_config + .indexer_grpc_data_service_address + .to_string(), + starting_version = self.next_version_to_fetch, + ending_version = self.transaction_stream_config.request_ending_version, + reconnection_retries = self.reconnection_retries, + "[Transaction Stream] Reconnecting to GRPC stream" + ); + let response = get_stream(self.transaction_stream_config.clone()).await?; + let connection_id = match response.metadata().get(GRPC_CONNECTION_ID) { + Some(connection_id) => connection_id.to_str().unwrap().to_string(), + None => "".to_string(), + }; + self.connection_id = connection_id; + self.stream = response.into_inner(); + info!( + stream_address = self + .transaction_stream_config + .indexer_grpc_data_service_address + .to_string(), + connection_id = self.connection_id, + starting_version = self.next_version_to_fetch, + ending_version = self.transaction_stream_config.request_ending_version, + reconnection_retries = self.reconnection_retries, + "[Transaction Stream] Successfully reconnected to GRPC stream" + ); + Ok(()) + } + + pub async fn get_chain_id(self) -> Result { + get_chain_id(self.transaction_stream_config).await + } +} diff --git a/rust/transaction-stream/src/utils/mod.rs b/rust/transaction-stream/src/utils/mod.rs new file mode 100644 index 0000000..5117c26 --- /dev/null +++ b/rust/transaction-stream/src/utils/mod.rs @@ -0,0 +1,28 @@ +use aptos_protos::util::timestamp::Timestamp; + +// 9999-12-31 23:59:59, this is the max supported by Google BigQuery +pub const MAX_TIMESTAMP_SECS: i64 = 253_402_300_799; + +pub fn parse_timestamp(ts: &Timestamp, version: i64) -> chrono::NaiveDateTime { + let final_ts = if ts.seconds >= MAX_TIMESTAMP_SECS { + Timestamp { + seconds: MAX_TIMESTAMP_SECS, + nanos: 0, + } + } else { + ts.clone() + }; + chrono::NaiveDateTime::from_timestamp_opt(final_ts.seconds, final_ts.nanos as u32) + .unwrap_or_else(|| panic!("Could not parse timestamp {:?} for version {}", ts, version)) +} + +/// Convert the protobuf timestamp to ISO format +pub fn timestamp_to_iso(timestamp: &Timestamp) -> String { + let dt = parse_timestamp(timestamp, 0); + dt.format("%Y-%m-%dT%H:%M:%S%.9fZ").to_string() +} + +/// Convert the protobuf timestamp to unixtime +pub fn timestamp_to_unixtime(timestamp: &Timestamp) -> f64 { + timestamp.seconds as f64 + timestamp.nanos as f64 * 1e-9 +}