diff --git a/rust/Cargo.lock b/rust/Cargo.lock index acb7e643c..5e8971a86 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -116,6 +116,29 @@ version = "1.0.71" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c7d0618f0e0b7e8ff11427422b64564d5fb0be1940354bfe2e0529b18a9d9b8" +[[package]] +name = "aptos-indexer-transaction-stream" +version = "0.1.0" +dependencies = [ + "anyhow", + "aptos-moving-average", + "aptos-protos", + "bigdecimal", + "chrono", + "futures-util", + "itertools 0.12.1", + "kanal", + "once_cell", + "prometheus", + "prost 0.12.3", + "serde", + "tokio", + "tonic 0.11.0", + "tracing", + "transaction-filter", + "url", +] + [[package]] name = "aptos-moving-average" version = "0.1.0" @@ -2191,6 +2214,7 @@ version = "1.0.0" dependencies = [ "ahash", "anyhow", + "aptos-indexer-transaction-stream", "aptos-moving-average", "aptos-protos", "async-trait", @@ -2234,6 +2258,7 @@ dependencies = [ "tokio-postgres", "tonic 0.11.0", "tracing", + "transaction-filter", "unescape", "url", ] @@ -3614,6 +3639,15 @@ dependencies = [ "tracing-serde", ] +[[package]] +name = "transaction-filter" +version = "0.1.0" +dependencies = [ + "ahash", + "aptos-protos", + "serde", +] + [[package]] name = "try-lock" version = "0.2.4" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 2f984cd89..0642e0ca3 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -1,7 +1,14 @@ [workspace] resolver = "2" -members = ["indexer-metrics", "moving-average", "processor", "server-framework"] +members = [ + "aptos-indexer-transaction-stream", + "indexer-metrics", + "moving-average", + "processor", + "server-framework", + "transaction-filter", +] [workspace.package] authors = ["Aptos Labs "] @@ -16,6 +23,8 @@ rust-version = "1.75" processor = { path = "processor" } server-framework = { path = "server-framework" } aptos-moving-average = { path = "moving-average" } +aptos-indexer-transaction-stream = { path = "aptos-indexer-transaction-stream" } +transaction-filter = { path = "transaction-filter" } ahash = { version = "0.8.7", features = ["serde"] } anyhow = "1.0.62" diff --git a/rust/aptos-indexer-transaction-stream/Cargo.toml b/rust/aptos-indexer-transaction-stream/Cargo.toml new file mode 100644 index 000000000..cb31966d9 --- /dev/null +++ b/rust/aptos-indexer-transaction-stream/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "aptos-indexer-transaction-stream" +version = "0.1.0" + +# Workspace inherited keys +authors = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +publish = { workspace = true } +repository = { workspace = true } +rust-version = { workspace = true } +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = { workspace = true } +aptos-moving-average = { workspace = true } +aptos-protos = { workspace = true } +bigdecimal = { workspace = true } +chrono = { workspace = true } +futures-util = { workspace = true } +itertools = { workspace = true } +kanal = { workspace = true } +once_cell = { workspace = true } +prometheus = { workspace = true } +prost = { workspace = true } +serde = { workspace = true } +tokio = { workspace = true } +tonic = { workspace = true } +tracing = { workspace = true } +transaction-filter = { workspace = true } +url = { workspace = true } diff --git a/rust/aptos-indexer-transaction-stream/src/config.rs b/rust/aptos-indexer-transaction-stream/src/config.rs new file mode 100644 index 000000000..f3a173742 --- /dev/null +++ b/rust/aptos-indexer-transaction-stream/src/config.rs @@ -0,0 +1,60 @@ +use serde::{Deserialize, Serialize}; +use std::time::Duration; +use url::Url; + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub struct TransactionStreamConfig { + pub indexer_grpc_data_service_address: Url, + pub starting_version: u64, + pub request_ending_version: Option, + pub auth_token: String, + pub request_name_header: String, + #[serde(default = "TransactionStreamConfig::default_indexer_grpc_http2_ping_interval")] + pub indexer_grpc_http2_ping_interval_secs: u64, + #[serde(default = "TransactionStreamConfig::default_indexer_grpc_http2_ping_timeout")] + pub indexer_grpc_http2_ping_timeout_secs: u64, + #[serde(default = "TransactionStreamConfig::default_indexer_grpc_reconnection_timeout")] + pub indexer_grpc_reconnection_timeout_secs: u64, + #[serde(default = "TransactionStreamConfig::default_indexer_grpc_response_item_timeout")] + pub indexer_grpc_response_item_timeout_secs: u64, +} + +impl TransactionStreamConfig { + pub const fn indexer_grpc_http2_ping_interval(&self) -> Duration { + Duration::from_secs(self.indexer_grpc_http2_ping_interval_secs) + } + + pub const fn indexer_grpc_http2_ping_timeout(&self) -> Duration { + Duration::from_secs(self.indexer_grpc_http2_ping_timeout_secs) + } + + pub const fn indexer_grpc_reconnection_timeout(&self) -> Duration { + Duration::from_secs(self.indexer_grpc_reconnection_timeout_secs) + } + + pub const fn indexer_grpc_response_item_timeout(&self) -> Duration { + Duration::from_secs(self.indexer_grpc_response_item_timeout_secs) + } + + /// Indexer GRPC http2 ping interval in seconds. Defaults to 30. + /// Tonic ref: https://docs.rs/tonic/latest/tonic/transport/channel/struct.Endpoint.html#method.http2_keep_alive_interval + pub const fn default_indexer_grpc_http2_ping_interval() -> u64 { + 30 + } + + /// Indexer GRPC http2 ping timeout in seconds. Defaults to 10. + pub const fn default_indexer_grpc_http2_ping_timeout() -> u64 { + 10 + } + + /// Default timeout for establishing a grpc connection. Defaults to 5 seconds. + pub const fn default_indexer_grpc_reconnection_timeout() -> u64 { + 5 + } + + /// Default timeout for receiving an item from grpc stream. Defaults to 60 seconds. + pub const fn default_indexer_grpc_response_item_timeout() -> u64 { + 60 + } +} diff --git a/rust/aptos-indexer-transaction-stream/src/lib.rs b/rust/aptos-indexer-transaction-stream/src/lib.rs new file mode 100644 index 000000000..a2ac14edb --- /dev/null +++ b/rust/aptos-indexer-transaction-stream/src/lib.rs @@ -0,0 +1,6 @@ +pub mod config; +pub mod transaction_stream; +pub mod utils; + +pub use config::TransactionStreamConfig; +pub use transaction_stream::{TransactionStream, TransactionsPBResponse}; diff --git a/rust/aptos-indexer-transaction-stream/src/transaction_stream.rs b/rust/aptos-indexer-transaction-stream/src/transaction_stream.rs new file mode 100644 index 000000000..e0017fddb --- /dev/null +++ b/rust/aptos-indexer-transaction-stream/src/transaction_stream.rs @@ -0,0 +1,569 @@ +use crate::{config::TransactionStreamConfig, utils::util::timestamp_to_iso}; +use anyhow::{anyhow, Result}; +use aptos_moving_average::MovingAverage; +use aptos_protos::{ + indexer::v1::{raw_data_client::RawDataClient, GetTransactionsRequest, TransactionsResponse}, + transaction::v1::Transaction, + util::timestamp::Timestamp, +}; +use futures_util::StreamExt; +use prost::Message; +use std::time::Duration; +use tokio::time::timeout; +use tonic::{Response, Streaming}; +use tracing::{error, info}; + +/// GRPC request metadata key for the token ID. +const GRPC_API_GATEWAY_API_KEY_HEADER: &str = "authorization"; +/// GRPC request metadata key for the request name. This is used to identify the +/// data destination. +const GRPC_REQUEST_NAME_HEADER: &str = "x-aptos-request-name"; +/// GRPC connection id +const GRPC_CONNECTION_ID: &str = "x-aptos-connection-id"; +/// We will try to reconnect to GRPC 5 times in case upstream connection is being updated +pub const RECONNECTION_MAX_RETRIES: u64 = 5; +/// 256MB +pub const MAX_RESPONSE_SIZE: usize = 1024 * 1024 * 256; + +/// TransactionsPBResponse is a struct that holds the transactions fetched from the stream. +/// It also includes some contextual information about the transactions. +#[derive(Clone)] +pub struct TransactionsPBResponse { + pub transactions: Vec, + pub chain_id: u64, + // We put start/end versions here as filtering means there are potential "gaps" here now + pub start_version: u64, + pub end_version: u64, + pub start_txn_timestamp: Option, + pub end_txn_timestamp: Option, + pub size_in_bytes: u64, +} + +/// Helper function to build a GRPC request for fetching transactions. +pub fn grpc_request_builder( + starting_version: u64, + transactions_count: Option, + grpc_auth_token: String, + request_name_header: String, +) -> tonic::Request { + let mut request = tonic::Request::new(GetTransactionsRequest { + starting_version: Some(starting_version), + transactions_count, + ..GetTransactionsRequest::default() + }); + request.metadata_mut().insert( + GRPC_API_GATEWAY_API_KEY_HEADER, + format!("Bearer {}", grpc_auth_token.clone()) + .parse() + .unwrap(), + ); + request.metadata_mut().insert( + GRPC_REQUEST_NAME_HEADER, + request_name_header.parse().unwrap(), + ); + request +} + +/// Given a `TransactionStreamConfig`, this function will return a stream of transactions. +/// It also handles timeouts and retries. +pub async fn get_stream( + transaction_stream_config: TransactionStreamConfig, +) -> Result>> { + info!( + stream_address = transaction_stream_config + .indexer_grpc_data_service_address + .to_string(), + start_version = transaction_stream_config.starting_version, + end_version = transaction_stream_config.request_ending_version, + "[Transaction Stream] Setting up rpc channel" + ); + + let channel = tonic::transport::Channel::from_shared( + transaction_stream_config + .indexer_grpc_data_service_address + .to_string(), + ) + .expect( + "[Transaction Stream] Failed to build GRPC channel, perhaps because the data service URL is invalid", + ) + .http2_keep_alive_interval(transaction_stream_config.indexer_grpc_http2_ping_interval()) + .keep_alive_timeout(transaction_stream_config.indexer_grpc_http2_ping_timeout()); + + // If the scheme is https, add a TLS config. + let channel = if transaction_stream_config + .indexer_grpc_data_service_address + .scheme() + == "https" + { + let config = tonic::transport::channel::ClientTlsConfig::new(); + channel + .tls_config(config) + .expect("[Transaction Stream] Failed to create TLS config") + } else { + channel + }; + + info!( + stream_address = transaction_stream_config + .indexer_grpc_data_service_address + .to_string(), + start_version = transaction_stream_config.starting_version, + end_version = transaction_stream_config.request_ending_version, + "[Transaction Stream] Setting up GRPC client" + ); + + // TODO: move this to a config file + // Retry this connection a few times before giving up + let mut connect_retries = 0; + let res = loop { + let res = timeout( + transaction_stream_config.indexer_grpc_reconnection_timeout(), + RawDataClient::connect(channel.clone()), + ) + .await; + match res { + Ok(connect_res) => match connect_res { + Ok(client) => break Ok(client), + Err(e) => { + error!( + stream_address = transaction_stream_config.indexer_grpc_data_service_address.to_string(), + start_version = transaction_stream_config.starting_version, + end_version = transaction_stream_config.request_ending_version, + error = ?e, + "[Transaction Stream] Error connecting to GRPC client" + ); + connect_retries += 1; + if connect_retries >= RECONNECTION_MAX_RETRIES { + break Err(anyhow!("Error connecting to GRPC client").context(e)); + } + }, + }, + Err(e) => { + error!( + stream_address = transaction_stream_config.indexer_grpc_data_service_address.to_string(), + start_version = transaction_stream_config.starting_version, + end_version = transaction_stream_config.request_ending_version, + retries = connect_retries, + error = ?e, + "[Transaction Stream] Timed out connecting to GRPC client" + ); + connect_retries += 1; + if connect_retries >= RECONNECTION_MAX_RETRIES { + break Err(anyhow!("Timed out connecting to GRPC client")); + } + }, + } + }; + + let raw_data_client = res?; + + let mut rpc_client = raw_data_client + .accept_compressed(tonic::codec::CompressionEncoding::Gzip) + .accept_compressed(tonic::codec::CompressionEncoding::Zstd) + .send_compressed(tonic::codec::CompressionEncoding::Zstd) + .max_decoding_message_size(MAX_RESPONSE_SIZE) + .max_encoding_message_size(MAX_RESPONSE_SIZE); + + let count = transaction_stream_config + .request_ending_version + .map(|v| (v as i64 - transaction_stream_config.starting_version as i64 + 1) as u64); + + info!( + stream_address = transaction_stream_config.indexer_grpc_data_service_address.to_string(), + start_version = transaction_stream_config.starting_version, + end_version = transaction_stream_config.request_ending_version, + num_of_transactions = ?count, + "[Transaction Stream] Setting up GRPC stream", + ); + + // TODO: move this to a config file + // Retry this connection a few times before giving up + let mut connect_retries = 0; + loop { + let timeout_res = timeout( + transaction_stream_config.indexer_grpc_reconnection_timeout(), + async { + let request = grpc_request_builder( + transaction_stream_config.starting_version, + count, + transaction_stream_config.auth_token.clone(), + transaction_stream_config.request_name_header.clone(), + ); + rpc_client.get_transactions(request).await + }, + ) + .await; + match timeout_res { + Ok(response_res) => match response_res { + Ok(response) => break Ok(response), + Err(e) => { + error!( + stream_address = transaction_stream_config.indexer_grpc_data_service_address.to_string(), + start_version = transaction_stream_config.starting_version, + end_version = transaction_stream_config.request_ending_version, + error = ?e, + "[Transaction Stream] Error making grpc request. Retrying..." + ); + connect_retries += 1; + if connect_retries >= RECONNECTION_MAX_RETRIES { + break Err(anyhow!("Error making grpc request").context(e)); + } + }, + }, + Err(e) => { + error!( + stream_address = transaction_stream_config.indexer_grpc_data_service_address.to_string(), + start_version = transaction_stream_config.starting_version, + end_version = transaction_stream_config.request_ending_version, + retries = connect_retries, + error = ?e, + "[Transaction Stream] Timeout making grpc request. Retrying...", + ); + connect_retries += 1; + if connect_retries >= RECONNECTION_MAX_RETRIES { + break Err(anyhow!("Timeout making grpc request").context(e)); + } + }, + } + } +} + +/// Helper function to get the chain id from the stream. +pub async fn get_chain_id(transaction_stream_config: TransactionStreamConfig) -> Result { + info!( + stream_address = transaction_stream_config + .indexer_grpc_data_service_address + .to_string(), + "[Transaction Stream] Connecting to GRPC stream to get chain id", + ); + + let transaction_stream_config_for_chain_id = TransactionStreamConfig { + starting_version: 1, + request_ending_version: Some(2), + ..transaction_stream_config.clone() + }; + let response = get_stream(transaction_stream_config_for_chain_id).await?; + let connection_id = match response.metadata().get(GRPC_CONNECTION_ID) { + Some(connection_id) => connection_id.to_str().unwrap().to_string(), + None => "".to_string(), + }; + let mut resp_stream = response.into_inner(); + info!( + stream_address = transaction_stream_config + .indexer_grpc_data_service_address + .to_string(), + connection_id, "[Transaction Stream] Successfully connected to GRPC stream to get chain id", + ); + + match resp_stream.next().await { + Some(Ok(r)) => match r.chain_id { + Some(chain_id) => Ok(chain_id), + None => { + error!( + stream_address = transaction_stream_config + .indexer_grpc_data_service_address + .to_string(), + connection_id, "[Transaction Stream] Chain Id doesn't exist." + ); + Err(anyhow!("Chain Id doesn't exist")) + }, + }, + Some(Err(rpc_error)) => { + error!( + stream_address = transaction_stream_config.indexer_grpc_data_service_address.to_string(), + connection_id, + error = ?rpc_error, + "[Transaction Stream] Error receiving datastream response for chain id" + ); + Err(anyhow!("Error receiving datastream response for chain id").context(rpc_error)) + }, + None => { + error!( + stream_address = transaction_stream_config + .indexer_grpc_data_service_address + .to_string(), + connection_id, + "[Transaction Stream] Stream ended before getting response fo for chain id" + ); + Err(anyhow!("Stream ended before getting response for chain id")) + }, + } +} + +/// TransactionStream is a struct that holds the state of the stream and provides methods to fetch transactions +/// from the stream. +/// - init_stream: Initializes the stream and returns the stream and connection id +/// - get_next_transaction_batch: Fetches the next batch of transactions from the stream +/// - is_end_of_stream: Checks if we've reached the end of the stream. This is determined by the ending version set in `TransactionStreamConfig` +/// - reconnect_to_grpc: Reconnects to the GRPC stream +/// - get_chain_id: Fetches the chain id from the stream +pub struct TransactionStream { + transaction_stream_config: TransactionStreamConfig, + stream: Streaming, + connection_id: String, + next_version_to_fetch: u64, + reconnection_retries: u64, + last_fetched_version: i64, + fetch_ma: MovingAverage, +} + +impl TransactionStream { + pub async fn new(transaction_stream_config: TransactionStreamConfig) -> Result { + let (stream, connection_id) = Self::init_stream(transaction_stream_config.clone()).await?; + Ok(Self { + transaction_stream_config: transaction_stream_config.clone(), + stream, + connection_id, + next_version_to_fetch: transaction_stream_config.starting_version, + reconnection_retries: 0, + last_fetched_version: transaction_stream_config.starting_version as i64 - 1, + fetch_ma: MovingAverage::new(3000), + }) + } + + async fn init_stream( + transaction_stream_config: TransactionStreamConfig, + ) -> Result<(Streaming, String)> { + info!( + stream_address = transaction_stream_config + .indexer_grpc_data_service_address + .to_string(), + start_version = transaction_stream_config.starting_version, + end_version = transaction_stream_config.request_ending_version, + "[Transaction Stream] Connecting to GRPC stream", + ); + let resp_stream = get_stream(transaction_stream_config.clone()).await?; + let connection_id = match resp_stream.metadata().get(GRPC_CONNECTION_ID) { + Some(connection_id) => connection_id.to_str().unwrap().to_string(), + None => "".to_string(), + }; + info!( + stream_address = transaction_stream_config + .indexer_grpc_data_service_address + .to_string(), + connection_id = connection_id, + start_version = transaction_stream_config.starting_version, + end_version = transaction_stream_config.request_ending_version, + "[Transaction Stream] Successfully connected to GRPC stream", + ); + Ok((resp_stream.into_inner(), connection_id)) + } + + /// Gets a batch of transactions from the stream. Batch size is set in the grpc server. + /// The number of batches depends on our config + /// There could be several special scenarios: + /// 1. If we lose the connection, we will try reconnecting X times within Y seconds before crashing. + /// 2. If we specified an end version and we hit that, we will stop fetching, but we will make sure that + /// all existing transactions are processed + /// + /// Returns + /// - true if should continue fetching + /// - false if we reached the end of the stream or there is an error and the loop should stop + pub async fn get_next_transaction_batch(&mut self) -> Result { + let grpc_channel_recv_latency = std::time::Instant::now(); + + let txn_pb_res = match tokio::time::timeout( + self.transaction_stream_config + .indexer_grpc_response_item_timeout(), + self.stream.next(), + ) + .await + { + // Received datastream response + Ok(response) => { + match response { + Some(Ok(r)) => { + self.reconnection_retries = 0; + let start_version = r.transactions.as_slice().first().unwrap().version; + let start_txn_timestamp = + r.transactions.as_slice().first().unwrap().timestamp.clone(); + let end_version = r.transactions.as_slice().last().unwrap().version; + let end_txn_timestamp = + r.transactions.as_slice().last().unwrap().timestamp.clone(); + + self.next_version_to_fetch = end_version + 1; + + let size_in_bytes = r.encoded_len() as u64; + let chain_id: u64 = r + .chain_id + .expect("[Transaction Stream] Chain Id doesn't exist."); + let num_txns = r.transactions.len(); + let duration_in_secs = grpc_channel_recv_latency.elapsed().as_secs_f64(); + self.fetch_ma.tick_now(num_txns as u64); + + info!( + stream_address = self + .transaction_stream_config + .indexer_grpc_data_service_address + .to_string(), + connection_id = self.connection_id, + start_version, + end_version, + start_txn_timestamp_iso = start_txn_timestamp + .as_ref() + .map(timestamp_to_iso) + .unwrap_or_default(), + end_txn_timestamp_iso = end_txn_timestamp + .as_ref() + .map(timestamp_to_iso) + .unwrap_or_default(), + num_of_transactions = end_version - start_version + 1, + // num_filtered_txns, + size_in_bytes, + duration_in_secs, + tps = self.fetch_ma.avg().ceil() as u64, + bytes_per_sec = size_in_bytes as f64 / duration_in_secs, + "[Transaction Stream] Received transactions from GRPC.", + ); + + if self.last_fetched_version + 1 != start_version as i64 { + error!( + batch_start_version = self.last_fetched_version + 1, + self.last_fetched_version, + current_fetched_version = start_version, + "[Transaction Stream] Received batch with gap from GRPC stream" + ); + return Err(anyhow!("Received batch with gap from GRPC stream")); + } + self.last_fetched_version = end_version as i64; + + let txn_pb = TransactionsPBResponse { + transactions: r.transactions, + chain_id, + start_version, + end_version, + start_txn_timestamp, + end_txn_timestamp, + size_in_bytes, + }; + + Ok(txn_pb) + }, + // Error receiving datastream response + Some(Err(rpc_error)) => { + tracing::warn!( + stream_address = self.transaction_stream_config.indexer_grpc_data_service_address.to_string(), + self.connection_id, + start_version = self.transaction_stream_config.starting_version, + end_version = self.transaction_stream_config.request_ending_version, + error = ?rpc_error, + "[Transaction Stream] Error receiving datastream response." + ); + Err(anyhow!("Error receiving datastream response")) + }, + // Stream is finished + None => { + tracing::warn!( + stream_address = self + .transaction_stream_config + .indexer_grpc_data_service_address + .to_string(), + connection_id = self.connection_id, + start_version = self.transaction_stream_config.starting_version, + end_version = self.transaction_stream_config.request_ending_version, + "[Transaction Stream] Stream ended." + ); + Err(anyhow!("Stream ended")) + }, + } + }, + // Timeout receiving datastream response + Err(e) => { + tracing::warn!( + stream_address = self.transaction_stream_config.indexer_grpc_data_service_address.to_string(), + connection_id = self.connection_id, + start_version = self.transaction_stream_config.starting_version, + end_version = self.transaction_stream_config.request_ending_version, + error = ?e, + "[Transaction Stream] Timeout receiving datastream response." + ); + Err(anyhow!("Timeout receiving datastream response")) + }, + }; + txn_pb_res + } + + /// Helper function to signal that we've fetched all the transactions up to the ending version that was requested. + pub fn is_end_of_stream(&self) -> bool { + if let Some(ending_version) = self.transaction_stream_config.request_ending_version { + self.next_version_to_fetch > ending_version + } else { + false + } + } + + pub async fn reconnect_to_grpc_with_retries(&mut self) -> Result<()> { + let mut reconnection_retries = 0; + + loop { + // Sleep for 100ms between reconnect tries + // TODO: Turn this into exponential backoff + tokio::time::sleep(Duration::from_millis(100)).await; + + reconnection_retries += 1; + + if reconnection_retries >= RECONNECTION_MAX_RETRIES { + error!( + stream_address = self + .transaction_stream_config + .indexer_grpc_data_service_address + .to_string(), + "[Transaction Stream] Reconnected more than 100 times. Will not retry.", + ); + break Err(anyhow!("Reconnected more than 100 times. Will not retry.")); + } + + match self.reconnect_to_grpc().await { + Ok(_) => { + break Ok(()); + }, + Err(e) => { + error!( + stream_address = self.transaction_stream_config + .indexer_grpc_data_service_address + .to_string(), + error = ?e, + "[Transaction Stream] Error reconnecting to GRPC stream. Retrying..." + ); + continue; + }, + } + } + } + + pub async fn reconnect_to_grpc(&mut self) -> Result<()> { + info!( + stream_address = self + .transaction_stream_config + .indexer_grpc_data_service_address + .to_string(), + starting_version = self.next_version_to_fetch, + ending_version = self.transaction_stream_config.request_ending_version, + reconnection_retries = self.reconnection_retries, + "[Transaction Stream] Reconnecting to GRPC stream" + ); + let response = get_stream(self.transaction_stream_config.clone()).await?; + let connection_id = match response.metadata().get(GRPC_CONNECTION_ID) { + Some(connection_id) => connection_id.to_str().unwrap().to_string(), + None => "".to_string(), + }; + self.connection_id = connection_id; + self.stream = response.into_inner(); + info!( + stream_address = self + .transaction_stream_config + .indexer_grpc_data_service_address + .to_string(), + connection_id = self.connection_id, + starting_version = self.next_version_to_fetch, + ending_version = self.transaction_stream_config.request_ending_version, + reconnection_retries = self.reconnection_retries, + "[Transaction Stream] Successfully reconnected to GRPC stream" + ); + Ok(()) + } + + pub async fn get_chain_id(self) -> Result { + get_chain_id(self.transaction_stream_config).await + } +} diff --git a/rust/aptos-indexer-transaction-stream/src/utils/mod.rs b/rust/aptos-indexer-transaction-stream/src/utils/mod.rs new file mode 100644 index 000000000..812d1edf2 --- /dev/null +++ b/rust/aptos-indexer-transaction-stream/src/utils/mod.rs @@ -0,0 +1 @@ +pub mod util; diff --git a/rust/aptos-indexer-transaction-stream/src/utils/util.rs b/rust/aptos-indexer-transaction-stream/src/utils/util.rs new file mode 100644 index 000000000..5117c2615 --- /dev/null +++ b/rust/aptos-indexer-transaction-stream/src/utils/util.rs @@ -0,0 +1,28 @@ +use aptos_protos::util::timestamp::Timestamp; + +// 9999-12-31 23:59:59, this is the max supported by Google BigQuery +pub const MAX_TIMESTAMP_SECS: i64 = 253_402_300_799; + +pub fn parse_timestamp(ts: &Timestamp, version: i64) -> chrono::NaiveDateTime { + let final_ts = if ts.seconds >= MAX_TIMESTAMP_SECS { + Timestamp { + seconds: MAX_TIMESTAMP_SECS, + nanos: 0, + } + } else { + ts.clone() + }; + chrono::NaiveDateTime::from_timestamp_opt(final_ts.seconds, final_ts.nanos as u32) + .unwrap_or_else(|| panic!("Could not parse timestamp {:?} for version {}", ts, version)) +} + +/// Convert the protobuf timestamp to ISO format +pub fn timestamp_to_iso(timestamp: &Timestamp) -> String { + let dt = parse_timestamp(timestamp, 0); + dt.format("%Y-%m-%dT%H:%M:%S%.9fZ").to_string() +} + +/// Convert the protobuf timestamp to unixtime +pub fn timestamp_to_unixtime(timestamp: &Timestamp) -> f64 { + timestamp.seconds as f64 + timestamp.nanos as f64 * 1e-9 +} diff --git a/rust/processor/Cargo.toml b/rust/processor/Cargo.toml index 96628d6b3..86c649292 100644 --- a/rust/processor/Cargo.toml +++ b/rust/processor/Cargo.toml @@ -15,6 +15,7 @@ rust-version = { workspace = true } [dependencies] ahash = { workspace = true } anyhow = { workspace = true } +aptos-indexer-transaction-stream = { workspace = true } aptos-moving-average = { workspace = true } aptos-protos = { workspace = true } async-trait = { workspace = true } @@ -53,6 +54,7 @@ strum = { workspace = true } tokio = { workspace = true } tonic = { workspace = true } tracing = { workspace = true } +transaction-filter = { workspace = true } unescape = { workspace = true } url = { workspace = true } diff --git a/rust/processor/src/config.rs b/rust/processor/src/config.rs index 5a9626b2a..4d9aad452 100644 --- a/rust/processor/src/config.rs +++ b/rust/processor/src/config.rs @@ -2,14 +2,14 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - gap_detector::DEFAULT_GAP_DETECTION_BATCH_SIZE, processors::ProcessorConfig, - transaction_filter::TransactionFilter, worker::Worker, + gap_detector::DEFAULT_GAP_DETECTION_BATCH_SIZE, processors::ProcessorConfig, worker::Worker, }; use ahash::AHashMap; use anyhow::{Context, Result}; +use aptos_indexer_transaction_stream::TransactionStreamConfig; use serde::{Deserialize, Serialize}; use server_framework::RunnableConfig; -use std::time::Duration; +use transaction_filter::transaction_filter::TransactionFilter; use url::Url; pub const QUERY_DEFAULT_RETRIES: u32 = 5; @@ -44,7 +44,7 @@ pub struct IndexerGrpcProcessorConfig { pub per_table_chunk_sizes: AHashMap, pub enable_verbose_logging: Option, - #[serde(default = "IndexerGrpcProcessorConfig::default_grpc_response_item_timeout_in_secs")] + #[serde(default = "TransactionStreamConfig::default_indexer_grpc_response_item_timeout")] pub grpc_response_item_timeout_in_secs: u64, #[serde(default)] @@ -69,11 +69,6 @@ impl IndexerGrpcProcessorConfig { pub const fn default_pb_channel_txn_chunk_size() -> usize { 100_000 } - - /// Default timeout for grpc response item in seconds. Defaults to 60 seconds. - pub const fn default_grpc_response_item_timeout_in_secs() -> u64 { - 60 - } } #[async_trait::async_trait] @@ -116,39 +111,17 @@ impl RunnableConfig for IndexerGrpcProcessorConfig { #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(deny_unknown_fields)] -#[serde(default)] pub struct IndexerGrpcHttp2Config { /// Indexer GRPC http2 ping interval in seconds. Defaults to 30. /// Tonic ref: https://docs.rs/tonic/latest/tonic/transport/channel/struct.Endpoint.html#method.http2_keep_alive_interval - indexer_grpc_http2_ping_interval_in_secs: u64, + #[serde(default = "TransactionStreamConfig::default_indexer_grpc_http2_ping_interval")] + pub indexer_grpc_http2_ping_interval_in_secs: u64, /// Indexer GRPC http2 ping timeout in seconds. Defaults to 10. - indexer_grpc_http2_ping_timeout_in_secs: u64, + #[serde(default = "TransactionStreamConfig::default_indexer_grpc_http2_ping_timeout")] + pub indexer_grpc_http2_ping_timeout_in_secs: u64, /// Seconds before timeout for grpc connection. - indexer_grpc_connection_timeout_secs: u64, -} - -impl IndexerGrpcHttp2Config { - pub fn grpc_http2_ping_interval_in_secs(&self) -> Duration { - Duration::from_secs(self.indexer_grpc_http2_ping_interval_in_secs) - } - - pub fn grpc_http2_ping_timeout_in_secs(&self) -> Duration { - Duration::from_secs(self.indexer_grpc_http2_ping_timeout_in_secs) - } - - pub fn grpc_connection_timeout_secs(&self) -> Duration { - Duration::from_secs(self.indexer_grpc_connection_timeout_secs) - } -} - -impl Default for IndexerGrpcHttp2Config { - fn default() -> Self { - Self { - indexer_grpc_http2_ping_interval_in_secs: 30, - indexer_grpc_http2_ping_timeout_in_secs: 10, - indexer_grpc_connection_timeout_secs: 5, - } - } + #[serde(default = "TransactionStreamConfig::default_indexer_grpc_reconnection_timeout")] + pub indexer_grpc_connection_timeout_secs: u64, } diff --git a/rust/processor/src/grpc_stream.rs b/rust/processor/src/grpc_stream.rs deleted file mode 100644 index e1674dcd0..000000000 --- a/rust/processor/src/grpc_stream.rs +++ /dev/null @@ -1,694 +0,0 @@ -use crate::utils::{ - counters::{ - ProcessorStep, FETCHER_THREAD_CHANNEL_SIZE, LATEST_PROCESSED_VERSION, - NUM_TRANSACTIONS_FILTERED_OUT_COUNT, NUM_TRANSACTIONS_PROCESSED_COUNT, - PROCESSED_BYTES_COUNT, TRANSACTION_UNIX_TIMESTAMP, - }, - util::{timestamp_to_iso, timestamp_to_unixtime}, -}; -use aptos_moving_average::MovingAverage; -use aptos_protos::{ - indexer::v1::{raw_data_client::RawDataClient, GetTransactionsRequest, TransactionsResponse}, - transaction::v1::Transaction, - util::timestamp::Timestamp, -}; -use bigdecimal::Zero; -use futures_util::StreamExt; -use itertools::Itertools; -use kanal::AsyncSender; -use prost::Message; -use std::time::Duration; -use tokio::time::timeout; -use tonic::{Response, Streaming}; -use tracing::{debug, error, info}; -use url::Url; - -/// GRPC request metadata key for the token ID. -const GRPC_API_GATEWAY_API_KEY_HEADER: &str = "authorization"; -/// GRPC request metadata key for the request name. This is used to identify the -/// data destination. -const GRPC_REQUEST_NAME_HEADER: &str = "x-aptos-request-name"; -/// GRPC connection id -const GRPC_CONNECTION_ID: &str = "x-aptos-connection-id"; -/// We will try to reconnect to GRPC 5 times in case upstream connection is being updated -pub const RECONNECTION_MAX_RETRIES: u64 = 5; -/// 256MB -pub const MAX_RESPONSE_SIZE: usize = 1024 * 1024 * 256; - -#[derive(Clone)] -pub struct TransactionsPBResponse { - pub transactions: Vec, - pub chain_id: u64, - // We put start/end versions here as filtering means there are potential "gaps" here now - pub start_version: u64, - pub end_version: u64, - pub start_txn_timestamp: Option, - pub end_txn_timestamp: Option, - pub size_in_bytes: u64, -} - -pub fn grpc_request_builder( - starting_version: u64, - transactions_count: Option, - grpc_auth_token: String, - processor_name: String, -) -> tonic::Request { - let mut request = tonic::Request::new(GetTransactionsRequest { - starting_version: Some(starting_version), - transactions_count, - ..GetTransactionsRequest::default() - }); - request.metadata_mut().insert( - GRPC_API_GATEWAY_API_KEY_HEADER, - format!("Bearer {}", grpc_auth_token.clone()) - .parse() - .unwrap(), - ); - request - .metadata_mut() - .insert(GRPC_REQUEST_NAME_HEADER, processor_name.parse().unwrap()); - request -} - -pub async fn get_stream( - indexer_grpc_data_service_address: Url, - indexer_grpc_http2_ping_interval: Duration, - indexer_grpc_http2_ping_timeout: Duration, - indexer_grpc_reconnection_timeout_secs: Duration, - starting_version: u64, - ending_version: Option, - auth_token: String, - processor_name: String, -) -> Response> { - info!( - processor_name = processor_name, - service_type = crate::worker::PROCESSOR_SERVICE_TYPE, - stream_address = indexer_grpc_data_service_address.to_string(), - start_version = starting_version, - end_version = ending_version, - "[Parser] Setting up rpc channel" - ); - - let channel = tonic::transport::Channel::from_shared( - indexer_grpc_data_service_address.to_string(), - ) - .expect( - "[Parser] Failed to build GRPC channel, perhaps because the data service URL is invalid", - ) - .http2_keep_alive_interval(indexer_grpc_http2_ping_interval) - .keep_alive_timeout(indexer_grpc_http2_ping_timeout); - - // If the scheme is https, add a TLS config. - let channel = if indexer_grpc_data_service_address.scheme() == "https" { - let config = tonic::transport::channel::ClientTlsConfig::new(); - channel - .tls_config(config) - .expect("[Parser] Failed to create TLS config") - } else { - channel - }; - - info!( - processor_name = processor_name, - service_type = crate::worker::PROCESSOR_SERVICE_TYPE, - stream_address = indexer_grpc_data_service_address.to_string(), - start_version = starting_version, - end_version = ending_version, - "[Parser] Setting up GRPC client" - ); - - // TODO: move this to a config file - // Retry this connection a few times before giving up - let mut connect_retries = 0; - let connect_res = loop { - let res = timeout( - indexer_grpc_reconnection_timeout_secs, - RawDataClient::connect(channel.clone()), - ) - .await; - match res { - Ok(client) => break Ok(client), - Err(e) => { - error!( - processor_name = processor_name, - service_type = crate::worker::PROCESSOR_SERVICE_TYPE, - stream_address = indexer_grpc_data_service_address.to_string(), - start_version = starting_version, - end_version = ending_version, - retries = connect_retries, - error = ?e, - "[Parser] Error connecting to GRPC client" - ); - connect_retries += 1; - if connect_retries >= RECONNECTION_MAX_RETRIES { - break Err(e); - } - }, - } - } - .expect("[Parser] Timeout connecting to GRPC server"); - - let mut rpc_client = match connect_res { - Ok(client) => client - .accept_compressed(tonic::codec::CompressionEncoding::Gzip) - .accept_compressed(tonic::codec::CompressionEncoding::Zstd) - .send_compressed(tonic::codec::CompressionEncoding::Zstd) - .max_decoding_message_size(MAX_RESPONSE_SIZE) - .max_encoding_message_size(MAX_RESPONSE_SIZE), - Err(e) => { - error!( - processor_name = processor_name, - service_type = crate::worker::PROCESSOR_SERVICE_TYPE, - stream_address = indexer_grpc_data_service_address.to_string(), - start_version = starting_version, - ending_version = ending_version, - error = ?e, - "[Parser] Error connecting to GRPC client" - ); - panic!("[Parser] Error connecting to GRPC client"); - }, - }; - let count = ending_version.map(|v| (v as i64 - starting_version as i64 + 1) as u64); - info!( - processor_name = processor_name, - service_type = crate::worker::PROCESSOR_SERVICE_TYPE, - stream_address = indexer_grpc_data_service_address.to_string(), - start_version = starting_version, - end_version = ending_version, - num_of_transactions = ?count, - "[Parser] Setting up GRPC stream", - ); - - // TODO: move this to a config file - // Retry this connection a few times before giving up - let mut connect_retries = 0; - let stream_res = loop { - let timeout_res = timeout(indexer_grpc_reconnection_timeout_secs, async { - let request = grpc_request_builder( - starting_version, - count, - auth_token.clone(), - processor_name.clone(), - ); - rpc_client.get_transactions(request).await - }) - .await; - match timeout_res { - Ok(client) => break Ok(client), - Err(e) => { - error!( - processor_name = processor_name, - service_type = crate::worker::PROCESSOR_SERVICE_TYPE, - stream_address = indexer_grpc_data_service_address.to_string(), - start_version = starting_version, - end_version = ending_version, - retries = connect_retries, - error = ?e, - "[Parser] Timeout making grpc request. Retrying...", - ); - connect_retries += 1; - if connect_retries >= RECONNECTION_MAX_RETRIES { - break Err(e); - } - }, - } - } - .expect("[Parser] Timed out making grpc request after max retries."); - - match stream_res { - Ok(stream) => stream, - Err(e) => { - error!( - processor_name = processor_name, - service_type = crate::worker::PROCESSOR_SERVICE_TYPE, - stream_address = indexer_grpc_data_service_address.to_string(), - start_version = starting_version, - ending_version = ending_version, - error = ?e, - "[Parser] Failed to get grpc response. Is the server running?" - ); - panic!("[Parser] Failed to get grpc response. Is the server running?"); - }, - } -} - -pub async fn get_chain_id( - indexer_grpc_data_service_address: Url, - indexer_grpc_http2_ping_interval: Duration, - indexer_grpc_http2_ping_timeout: Duration, - indexer_grpc_reconnection_timeout_secs: Duration, - auth_token: String, - processor_name: String, -) -> u64 { - info!( - processor_name = processor_name, - service_type = crate::worker::PROCESSOR_SERVICE_TYPE, - stream_address = indexer_grpc_data_service_address.to_string(), - "[Parser] Connecting to GRPC stream to get chain id", - ); - let response = get_stream( - indexer_grpc_data_service_address.clone(), - indexer_grpc_http2_ping_interval, - indexer_grpc_http2_ping_timeout, - indexer_grpc_reconnection_timeout_secs, - 1, - Some(2), - auth_token.clone(), - processor_name.to_string(), - ) - .await; - let connection_id = match response.metadata().get(GRPC_CONNECTION_ID) { - Some(connection_id) => connection_id.to_str().unwrap().to_string(), - None => "".to_string(), - }; - let mut resp_stream = response.into_inner(); - info!( - processor_name = processor_name, - service_type = crate::worker::PROCESSOR_SERVICE_TYPE, - stream_address = indexer_grpc_data_service_address.to_string(), - connection_id, - "[Parser] Successfully connected to GRPC stream to get chain id", - ); - - match resp_stream.next().await { - Some(Ok(r)) => r.chain_id.expect("[Parser] Chain Id doesn't exist."), - Some(Err(rpc_error)) => { - error!( - processor_name = processor_name, - service_type = crate::worker::PROCESSOR_SERVICE_TYPE, - stream_address = indexer_grpc_data_service_address.to_string(), - connection_id, - error = ?rpc_error, - "[Parser] Error receiving datastream response for chain id" - ); - panic!("[Parser] Error receiving datastream response for chain id"); - }, - None => { - error!( - processor_name = processor_name, - service_type = crate::worker::PROCESSOR_SERVICE_TYPE, - stream_address = indexer_grpc_data_service_address.to_string(), - connection_id, - "[Parser] Stream ended before getting response fo for chain id" - ); - panic!("[Parser] Stream ended before getting response fo for chain id"); - }, - } -} - -/// Gets a batch of transactions from the stream. Batch size is set in the grpc server. -/// The number of batches depends on our config -/// There could be several special scenarios: -/// 1. If we lose the connection, we will try reconnecting X times within Y seconds before crashing. -/// 2. If we specified an end version and we hit that, we will stop fetching, but we will make sure that -/// all existing transactions are processed -pub async fn create_fetcher_loop( - txn_sender: AsyncSender, - indexer_grpc_data_service_address: Url, - indexer_grpc_http2_ping_interval: Duration, - indexer_grpc_http2_ping_timeout: Duration, - indexer_grpc_reconnection_timeout_secs: Duration, - indexer_grpc_response_item_timeout_secs: Duration, - starting_version: u64, - request_ending_version: Option, - auth_token: String, - processor_name: String, - transaction_filter: crate::transaction_filter::TransactionFilter, - // The number of transactions per protobuf batch - pb_channel_txn_chunk_size: usize, -) { - info!( - processor_name = processor_name, - service_type = crate::worker::PROCESSOR_SERVICE_TYPE, - stream_address = indexer_grpc_data_service_address.to_string(), - start_version = starting_version, - end_version = request_ending_version, - "[Parser] Connecting to GRPC stream", - ); - let mut response = get_stream( - indexer_grpc_data_service_address.clone(), - indexer_grpc_http2_ping_interval, - indexer_grpc_http2_ping_timeout, - indexer_grpc_reconnection_timeout_secs, - starting_version, - request_ending_version, - auth_token.clone(), - processor_name.to_string(), - ) - .await; - let mut connection_id = match response.metadata().get(GRPC_CONNECTION_ID) { - Some(connection_id) => connection_id.to_str().unwrap().to_string(), - None => "".to_string(), - }; - let mut resp_stream = response.into_inner(); - info!( - processor_name = processor_name, - service_type = crate::worker::PROCESSOR_SERVICE_TYPE, - stream_address = indexer_grpc_data_service_address.to_string(), - connection_id, - start_version = starting_version, - end_version = request_ending_version, - "[Parser] Successfully connected to GRPC stream", - ); - - let mut grpc_channel_recv_latency = std::time::Instant::now(); - let mut next_version_to_fetch = starting_version; - let mut reconnection_retries = 0; - let mut last_fetched_version = starting_version as i64 - 1; - let mut fetch_ma = MovingAverage::new(3000); - let mut send_ma = MovingAverage::new(3000); - - loop { - let is_success = match tokio::time::timeout( - indexer_grpc_response_item_timeout_secs, - resp_stream.next(), - ) - .await - { - // Received datastream response - Ok(response) => { - match response { - Some(Ok(mut r)) => { - reconnection_retries = 0; - let start_version = r.transactions.as_slice().first().unwrap().version; - let start_txn_timestamp = - r.transactions.as_slice().first().unwrap().timestamp.clone(); - let end_version = r.transactions.as_slice().last().unwrap().version; - let end_txn_timestamp = - r.transactions.as_slice().last().unwrap().timestamp.clone(); - - next_version_to_fetch = end_version + 1; - - let size_in_bytes = r.encoded_len() as u64; - let chain_id: u64 = r.chain_id.expect("[Parser] Chain Id doesn't exist."); - let num_txns = r.transactions.len(); - let duration_in_secs = grpc_channel_recv_latency.elapsed().as_secs_f64(); - fetch_ma.tick_now(num_txns as u64); - - let num_txns = r.transactions.len(); - - // Filter out the txns we don't care about - r.transactions.retain(|txn| transaction_filter.include(txn)); - - let num_txn_post_filter = r.transactions.len(); - let num_filtered_txns = num_txns - num_txn_post_filter; - let step = ProcessorStep::ReceivedTxnsFromGrpc.get_step(); - let label = ProcessorStep::ReceivedTxnsFromGrpc.get_label(); - - info!( - processor_name = processor_name, - service_type = crate::worker::PROCESSOR_SERVICE_TYPE, - stream_address = indexer_grpc_data_service_address.to_string(), - connection_id, - start_version, - end_version, - start_txn_timestamp_iso = start_txn_timestamp - .as_ref() - .map(timestamp_to_iso) - .unwrap_or_default(), - end_txn_timestamp_iso = end_txn_timestamp - .as_ref() - .map(timestamp_to_iso) - .unwrap_or_default(), - num_of_transactions = end_version - start_version + 1, - num_filtered_txns, - channel_size = txn_sender.len(), - size_in_bytes, - duration_in_secs, - tps = fetch_ma.avg().ceil() as u64, - bytes_per_sec = size_in_bytes as f64 / duration_in_secs, - step, - "{}", - label, - ); - - if last_fetched_version + 1 != start_version as i64 { - error!( - batch_start_version = last_fetched_version + 1, - last_fetched_version, - current_fetched_version = start_version, - "[Parser] Received batch with gap from GRPC stream" - ); - panic!("[Parser] Received batch with gap from GRPC stream"); - } - last_fetched_version = end_version as i64; - - LATEST_PROCESSED_VERSION - .with_label_values(&[&processor_name, step, label, "-"]) - .set(end_version as i64); - TRANSACTION_UNIX_TIMESTAMP - .with_label_values(&[&processor_name, step, label, "-"]) - .set( - start_txn_timestamp - .as_ref() - .map(timestamp_to_unixtime) - .unwrap_or_default(), - ); - PROCESSED_BYTES_COUNT - .with_label_values(&[&processor_name, step, label, "-"]) - .inc_by(size_in_bytes); - NUM_TRANSACTIONS_PROCESSED_COUNT - .with_label_values(&[&processor_name, step, label, "-"]) - .inc_by(end_version - start_version + 1); - - let txn_channel_send_latency = std::time::Instant::now(); - - //potentially break txn_pb into many `TransactionsPBResponse` that are each `pb_channel_txn_chunk_size` txns max in size - if num_txn_post_filter < pb_channel_txn_chunk_size { - // We only need to send one; avoid the chunk/clone - let txn_pb = TransactionsPBResponse { - transactions: r.transactions, - chain_id, - start_version, - end_version, - start_txn_timestamp, - end_txn_timestamp, - size_in_bytes, - }; - - match txn_sender.send(txn_pb).await { - Ok(()) => {}, - Err(e) => { - error!( - processor_name = processor_name, - stream_address = indexer_grpc_data_service_address.to_string(), - connection_id, - error = ?e, - "[Parser] Error sending GRPC response to channel." - ); - panic!("[Parser] Error sending GRPC response to channel.") - }, - } - } else { - // We are breaking down a big batch into small batches; this involves an iterator - let average_size_in_bytes = size_in_bytes / num_txns as u64; - - let pb_txn_chunks: Vec> = r - .transactions - .into_iter() - .chunks(pb_channel_txn_chunk_size) - .into_iter() - .map(|chunk| chunk.collect()) - .collect(); - for txns in pb_txn_chunks { - let size_in_bytes = average_size_in_bytes * txns.len() as u64; - let txn_pb = TransactionsPBResponse { - transactions: txns, - chain_id, - start_version, - end_version, - // TODO: this is only for gap checker + filtered txns, but this is wrong - start_txn_timestamp: start_txn_timestamp.clone(), - end_txn_timestamp: end_txn_timestamp.clone(), - size_in_bytes, - }; - - match txn_sender.send(txn_pb).await { - Ok(()) => {}, - Err(e) => { - error!( - processor_name = processor_name, - stream_address = indexer_grpc_data_service_address.to_string(), - connection_id, - error = ?e, - "[Parser] Error sending GRPC response to channel." - ); - panic!("[Parser] Error sending GRPC response to channel.") - }, - } - } - } - - let duration_in_secs = txn_channel_send_latency.elapsed().as_secs_f64(); - send_ma.tick_now(num_txns as u64); - let tps = send_ma.avg().ceil() as u64; - let bytes_per_sec = size_in_bytes as f64 / duration_in_secs; - - let channel_size = txn_sender.len(); - debug!( - processor_name = processor_name, - service_type = crate::worker::PROCESSOR_SERVICE_TYPE, - stream_address = indexer_grpc_data_service_address.to_string(), - connection_id, - start_version, - end_version, - channel_size, - size_in_bytes, - duration_in_secs, - bytes_per_sec, - tps, - num_filtered_txns, - "[Parser] Successfully sent transactions to channel." - ); - FETCHER_THREAD_CHANNEL_SIZE - .with_label_values(&[&processor_name]) - .set(channel_size as i64); - grpc_channel_recv_latency = std::time::Instant::now(); - - NUM_TRANSACTIONS_FILTERED_OUT_COUNT - .with_label_values(&[&processor_name]) - .inc_by(num_filtered_txns as u64); - true - }, - // Error receiving datastream response - Some(Err(rpc_error)) => { - tracing::warn!( - processor_name = processor_name, - service_type = crate::worker::PROCESSOR_SERVICE_TYPE, - stream_address = indexer_grpc_data_service_address.to_string(), - connection_id, - start_version = starting_version, - end_version = request_ending_version, - error = ?rpc_error, - "[Parser] Error receiving datastream response." - ); - false - }, - // Stream is finished - None => { - tracing::warn!( - processor_name = processor_name, - service_type = crate::worker::PROCESSOR_SERVICE_TYPE, - stream_address = indexer_grpc_data_service_address.to_string(), - connection_id, - start_version = starting_version, - end_version = request_ending_version, - "[Parser] Stream ended." - ); - false - }, - } - }, - // Timeout receiving datastream response - Err(e) => { - tracing::warn!( - processor_name = processor_name, - service_type = crate::worker::PROCESSOR_SERVICE_TYPE, - stream_address = indexer_grpc_data_service_address.to_string(), - connection_id, - start_version = starting_version, - end_version = request_ending_version, - error = ?e, - "[Parser] Timeout receiving datastream response." - ); - false - }, - }; - // Check if we're at the end of the stream - let is_end = if let Some(ending_version) = request_ending_version { - next_version_to_fetch > ending_version - } else { - false - }; - if is_end { - info!( - processor_name = processor_name, - service_type = crate::worker::PROCESSOR_SERVICE_TYPE, - stream_address = indexer_grpc_data_service_address.to_string(), - connection_id, - ending_version = request_ending_version, - next_version_to_fetch = next_version_to_fetch, - "[Parser] Reached ending version.", - ); - // Wait for the fetched transactions to finish processing before closing the channel - loop { - let channel_size = txn_sender.len(); - info!( - processor_name = processor_name, - service_type = crate::worker::PROCESSOR_SERVICE_TYPE, - stream_address = indexer_grpc_data_service_address.to_string(), - connection_id, - channel_size, - "[Parser] Waiting for channel to be empty" - ); - if channel_size.is_zero() { - break; - } - tokio::time::sleep(Duration::from_millis(100)).await; - } - info!( - processor_name = processor_name, - service_type = crate::worker::PROCESSOR_SERVICE_TYPE, - stream_address = indexer_grpc_data_service_address.to_string(), - connection_id, - "[Parser] Transaction fetcher send channel is closed." - ); - break; - } else { - // The rest is to see if we need to reconnect - if is_success { - continue; - } - - // Sleep for 100ms between reconnect tries - // TODO: Turn this into exponential backoff - tokio::time::sleep(Duration::from_millis(100)).await; - - if reconnection_retries >= RECONNECTION_MAX_RETRIES { - error!( - processor_name = processor_name, - service_type = crate::worker::PROCESSOR_SERVICE_TYPE, - stream_address = indexer_grpc_data_service_address.to_string(), - "[Parser] Reconnected more than {RECONNECTION_MAX_RETRIES} times. Will not retry.", - ); - panic!("[Parser] Reconnected more than {RECONNECTION_MAX_RETRIES} times. Will not retry.") - } - reconnection_retries += 1; - info!( - processor_name = processor_name, - service_type = crate::worker::PROCESSOR_SERVICE_TYPE, - stream_address = indexer_grpc_data_service_address.to_string(), - starting_version = next_version_to_fetch, - ending_version = request_ending_version, - reconnection_retries = reconnection_retries, - "[Parser] Reconnecting to GRPC stream" - ); - response = get_stream( - indexer_grpc_data_service_address.clone(), - indexer_grpc_http2_ping_interval, - indexer_grpc_http2_ping_timeout, - indexer_grpc_reconnection_timeout_secs, - next_version_to_fetch, - request_ending_version, - auth_token.clone(), - processor_name.to_string(), - ) - .await; - connection_id = match response.metadata().get(GRPC_CONNECTION_ID) { - Some(connection_id) => connection_id.to_str().unwrap().to_string(), - None => "".to_string(), - }; - resp_stream = response.into_inner(); - info!( - processor_name = processor_name, - service_type = crate::worker::PROCESSOR_SERVICE_TYPE, - stream_address = indexer_grpc_data_service_address.to_string(), - connection_id, - starting_version = next_version_to_fetch, - ending_version = request_ending_version, - reconnection_retries = reconnection_retries, - "[Parser] Successfully reconnected to GRPC stream" - ); - } - } -} diff --git a/rust/processor/src/lib.rs b/rust/processor/src/lib.rs index ecb81ff3b..6142bc060 100644 --- a/rust/processor/src/lib.rs +++ b/rust/processor/src/lib.rs @@ -15,10 +15,8 @@ pub use config::IndexerGrpcProcessorConfig; mod config; pub mod gap_detector; -pub mod grpc_stream; pub mod models; pub mod processors; pub mod schema; -pub mod transaction_filter; pub mod utils; pub mod worker; diff --git a/rust/processor/src/utils/counters.rs b/rust/processor/src/utils/counters.rs index ee9431a8d..165d5f2f4 100644 --- a/rust/processor/src/utils/counters.rs +++ b/rust/processor/src/utils/counters.rs @@ -167,26 +167,6 @@ pub static NUM_TRANSACTIONS_PROCESSED_COUNT: Lazy = Lazy::new(|| .unwrap() }); -/// Count of transactions filtered out -pub static NUM_TRANSACTIONS_FILTERED_OUT_COUNT: Lazy = Lazy::new(|| { - register_int_counter_vec!( - "indexer_processor_num_transactions_filtered_out_count", - "Number of transactions filtered out", - &["processor_name"] - ) - .unwrap() -}); - -/// Size of the channel containing transactions fetched from GRPC, waiting to be processed -pub static FETCHER_THREAD_CHANNEL_SIZE: Lazy = Lazy::new(|| { - register_int_gauge_vec!( - "indexer_processor_fetcher_thread_channel_size", - "Size of the fetcher thread channel", - &["processor_name"] - ) - .unwrap() -}); - /// Overall processing time for a single batch of transactions (per task) pub static SINGLE_BATCH_PROCESSING_TIME_IN_SECS: Lazy = Lazy::new(|| { register_gauge_vec!( @@ -254,3 +234,23 @@ pub static PROCESSOR_UNKNOWN_TYPE_COUNT: Lazy = Lazy::new(|| { ) .unwrap() }); + +/// Size of the channel containing transactions fetched from GRPC, waiting to be processed +pub static FETCHER_THREAD_CHANNEL_SIZE: Lazy = Lazy::new(|| { + register_int_gauge_vec!( + "indexer_processor_fetcher_thread_channel_size", + "Size of the fetcher thread channel", + &["processor_name"] + ) + .unwrap() +}); + +/// Count of transactions filtered out +pub static NUM_TRANSACTIONS_FILTERED_OUT_COUNT: Lazy = Lazy::new(|| { + register_int_counter_vec!( + format!("indexer_processor_num_transactions_filtered_out_count",), + "Number of transactions filtered out", + &["processor_name"] + ) + .unwrap() +}); diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index 06ec9b213..cd94084b3 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -3,7 +3,6 @@ use crate::{ config::IndexerGrpcHttp2Config, - grpc_stream::TransactionsPBResponse, models::{ledger_info::LedgerInfo, processor_status::ProcessorStatusQuery}, processors::{ account_transactions_processor::AccountTransactionsProcessor, ans_processor::AnsProcessor, @@ -17,10 +16,10 @@ use crate::{ ProcessorConfig, ProcessorTrait, }, schema::ledger_infos, - transaction_filter::TransactionFilter, utils::{ counters::{ - ProcessorStep, GRPC_LATENCY_BY_PROCESSOR_IN_SECS, LATEST_PROCESSED_VERSION, + ProcessorStep, FETCHER_THREAD_CHANNEL_SIZE, GRPC_LATENCY_BY_PROCESSOR_IN_SECS, + LATEST_PROCESSED_VERSION, NUM_TRANSACTIONS_FILTERED_OUT_COUNT, NUM_TRANSACTIONS_PROCESSED_COUNT, PB_CHANNEL_FETCH_WAIT_TIME_SECS, PROCESSED_BYTES_COUNT, PROCESSOR_DATA_PROCESSED_LATENCY_IN_SECS, PROCESSOR_DATA_RECEIVED_LATENCY_IN_SECS, PROCESSOR_ERRORS_COUNT, @@ -34,9 +33,18 @@ use crate::{ }; use ahash::AHashMap; use anyhow::{Context, Result}; +use aptos_indexer_transaction_stream::{ + config::TransactionStreamConfig, TransactionStream, TransactionsPBResponse, +}; use aptos_moving_average::MovingAverage; +use aptos_protos::transaction::v1::Transaction; +use bigdecimal::Zero; +use itertools::Itertools; +use kanal::AsyncSender; +use std::time::Duration; use tokio::task::JoinHandle; use tracing::{debug, error, info}; +use transaction_filter::transaction_filter::TransactionFilter; use url::Url; // this is how large the fetch queue should be. Each bucket should have a max of 80MB or so, so a batch @@ -169,17 +177,30 @@ impl Worker { ); let concurrent_tasks = self.number_concurrent_processing_tasks; + let transaction_stream_config = TransactionStreamConfig { + indexer_grpc_data_service_address: self.indexer_grpc_data_service_address.clone(), + starting_version, + request_ending_version: self.ending_version, + auth_token: self.auth_token.clone(), + request_name_header: processor_name.to_string(), + indexer_grpc_http2_ping_interval_secs: self + .grpc_http2_config + .indexer_grpc_http2_ping_interval_in_secs, + indexer_grpc_http2_ping_timeout_secs: self + .grpc_http2_config + .indexer_grpc_http2_ping_timeout_in_secs, + indexer_grpc_reconnection_timeout_secs: self + .grpc_http2_config + .indexer_grpc_connection_timeout_secs, + indexer_grpc_response_item_timeout_secs: self.grpc_response_item_timeout_in_secs, + }; // get the chain id - let chain_id = crate::grpc_stream::get_chain_id( - self.indexer_grpc_data_service_address.clone(), - self.grpc_http2_config.grpc_http2_ping_interval_in_secs(), - self.grpc_http2_config.grpc_http2_ping_timeout_in_secs(), - self.grpc_http2_config.grpc_connection_timeout_secs(), - self.auth_token.clone(), - processor_name.to_string(), + let chain_id = aptos_indexer_transaction_stream::transaction_stream::get_chain_id( + transaction_stream_config.clone(), ) - .await; + .await + .expect("[Parser] Error getting chain id"); self.check_or_update_chain_id(chain_id as i64) .await .unwrap(); @@ -187,24 +208,13 @@ impl Worker { self.grpc_chain_id = Some(chain_id); let ending_version = self.ending_version; - let indexer_grpc_data_service_address = self.indexer_grpc_data_service_address.clone(); - let indexer_grpc_http2_ping_interval = - self.grpc_http2_config.grpc_http2_ping_interval_in_secs(); - let indexer_grpc_http2_ping_timeout = - self.grpc_http2_config.grpc_http2_ping_timeout_in_secs(); - let indexer_grpc_reconnection_timeout_secs = - self.grpc_http2_config.grpc_connection_timeout_secs(); - let pb_channel_txn_chunk_size = self.pb_channel_txn_chunk_size; // Create a transaction fetcher thread that will continuously fetch transactions from the GRPC stream // and write into a channel // TODO: change channel size based on number_concurrent_processing_tasks let (tx, receiver) = kanal::bounded_async::(BUFFER_SIZE); - let request_ending_version = self.ending_version; - let auth_token = self.auth_token.clone(); let transaction_filter = self.transaction_filter.clone(); - let grpc_response_item_timeout = - std::time::Duration::from_secs(self.grpc_response_item_timeout_in_secs); + let pb_channel_txn_chunk_size = self.pb_channel_txn_chunk_size; let fetcher_task = tokio::spawn(async move { info!( processor_name = processor_name, @@ -214,16 +224,9 @@ impl Worker { "[Parser] Starting fetcher thread" ); - crate::grpc_stream::create_fetcher_loop( + fetch_transactions_from_transaction_stream( tx.clone(), - indexer_grpc_data_service_address.clone(), - indexer_grpc_http2_ping_interval, - indexer_grpc_http2_ping_timeout, - indexer_grpc_reconnection_timeout_secs, - grpc_response_item_timeout, - starting_version, - request_ending_version, - auth_token.clone(), + transaction_stream_config.clone(), processor_name.to_string(), transaction_filter, pb_channel_txn_chunk_size, @@ -616,6 +619,228 @@ impl Worker { } } +/// Initializes a GRPC stream and runs fetching transactions from the stream in a loop. + +pub async fn fetch_transactions_from_transaction_stream( + txn_sender: AsyncSender, + transaction_stream_config: TransactionStreamConfig, + processor_name: String, + transaction_filter: TransactionFilter, + // The number of transactions per protobuf batch + pb_channel_txn_chunk_size: usize, +) { + // Initialize the stream + let transaction_stream_res = TransactionStream::new(transaction_stream_config.clone()).await; + + let mut transaction_stream = match transaction_stream_res { + Ok(stream) => stream, + Err(e) => { + error!( + processor_name = processor_name, + stream_address = transaction_stream_config.indexer_grpc_data_service_address.to_string(), + error = ?e, + "[Parser] Error creating transaction stream." + ); + panic!("[Parser] Error creating transaction stream.") + }, + }; + + let mut send_ma = MovingAverage::new(3000); + + // Fetch transactions in a loop + // There could be several special scenarios: + // 1. If we lose the connection, we will try reconnecting X times within Y seconds before crashing. + // 2. If we specified an end version and we hit that, we will stop fetching, but we will make sure that + // all existing transactions are processed. + loop { + let transactions_res = transaction_stream.get_next_transaction_batch().await; + + match transactions_res { + Ok(mut txn_pb) => { + let start_version = txn_pb.transactions.first().unwrap().version; + let end_version = txn_pb.transactions.last().unwrap().version; + let size_in_bytes = txn_pb.size_in_bytes; + let num_txns = end_version - start_version + 1; + let start_txn_timestamp = txn_pb.start_txn_timestamp.clone(); + + // Filter out the txns we don't care about + txn_pb + .transactions + .retain(|txn| transaction_filter.include(txn)); + let num_txn_post_filter = txn_pb.transactions.len() as u64; + let num_filtered_txns = num_txns - num_txn_post_filter; + + // potentially break txn_pb into many `TransactionsPBResponse` that are each `pb_channel_txn_chunk_size` txns max in size + let txn_channel_send_latency = std::time::Instant::now(); + if num_txn_post_filter < pb_channel_txn_chunk_size as u64 { + // We only need to send one; avoid the chunk/clone + match txn_sender.send(txn_pb).await { + Ok(()) => {}, + Err(e) => { + error!( + processor_name = processor_name, + stream_address = transaction_stream_config.indexer_grpc_data_service_address.to_string(), + error = ?e, + "[Parser] Error sending GRPC response to channel." + ); + panic!("[Parser] Error sending GRPC response to channel.") + }, + } + } else { + // We are breaking down a big batch into small batches; this involves an iterator + let average_size_in_bytes = size_in_bytes / num_filtered_txns; + + let txn_pb_chunked: Vec> = txn_pb + .transactions + .into_iter() + .chunks(pb_channel_txn_chunk_size) + .into_iter() + .map(|chunk| chunk.collect()) + .collect(); + for txn_pb_chunk in txn_pb_chunked { + let size_in_bytes = average_size_in_bytes * txn_pb_chunk.len() as u64; + let txn_pb = TransactionsPBResponse { + transactions: txn_pb_chunk, + chain_id: txn_pb.chain_id, + start_version: txn_pb.start_version, + end_version: txn_pb.end_version, + // TODO: this is only for gap checker + filtered txns, but this is wrong + start_txn_timestamp: txn_pb.start_txn_timestamp.clone(), + end_txn_timestamp: txn_pb.end_txn_timestamp.clone(), + size_in_bytes, + }; + match txn_sender.send(txn_pb).await { + Ok(()) => {}, + Err(e) => { + error!( + processor_name = processor_name, + stream_address = transaction_stream_config.indexer_grpc_data_service_address.to_string(), + error = ?e, + "[Parser] Error sending GRPC response to channel." + ); + panic!("[Parser] Error sending GRPC response to channel.") + }, + } + } + } + + // Log channel send metrics + let duration_in_secs = txn_channel_send_latency.elapsed().as_secs_f64(); + + send_ma.tick_now(num_txns); + let tps = send_ma.avg().ceil() as u64; + let bytes_per_sec = size_in_bytes as f64 / duration_in_secs; + let channel_size = txn_sender.len(); + + let step = ProcessorStep::ReceivedTxnsFromGrpc.get_step(); + let label = ProcessorStep::ReceivedTxnsFromGrpc.get_label(); + debug!( + processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + stream_address = transaction_stream_config + .indexer_grpc_data_service_address + .to_string(), + start_version, + end_version, + channel_size, + size_in_bytes, + duration_in_secs, + bytes_per_sec, + tps, + "[Parser] Successfully sent transactions to channel." + ); + LATEST_PROCESSED_VERSION + .with_label_values(&[&processor_name, step, label, "-"]) + .set(end_version as i64); + TRANSACTION_UNIX_TIMESTAMP + .with_label_values(&[&processor_name, step, label, "-"]) + .set( + start_txn_timestamp + .as_ref() + .map(timestamp_to_unixtime) + .unwrap_or_default(), + ); + PROCESSED_BYTES_COUNT + .with_label_values(&[&processor_name, step, label, "-"]) + .inc_by(size_in_bytes); + NUM_TRANSACTIONS_PROCESSED_COUNT + .with_label_values(&[&processor_name, step, label, "-"]) + .inc_by(end_version - start_version + 1); + FETCHER_THREAD_CHANNEL_SIZE + .with_label_values(&[&processor_name]) + .set(channel_size as i64); + NUM_TRANSACTIONS_FILTERED_OUT_COUNT + .with_label_values(&[&processor_name]) + .inc_by(num_filtered_txns); + }, + Err(e) => { + if transaction_stream.is_end_of_stream() { + // Wait for the fetched transactions to finish processing before closing the channel + loop { + let channel_size = txn_sender.len(); + info!( + processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + stream_address = transaction_stream_config + .indexer_grpc_data_service_address + .to_string(), + channel_size, + "[Parser] Waiting for channel to be empty" + ); + if channel_size.is_zero() { + break; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + info!( + processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + stream_address = transaction_stream_config + .indexer_grpc_data_service_address + .to_string(), + "[Parser] Transaction fetcher send channel is closed." + ); + break; + } else { + error!( + processor_name = processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + stream_address = transaction_stream_config.indexer_grpc_data_service_address.to_string(), + error = ?e, + "[Parser] Error fetching transactions." + ); + + // If there was an error fetching transactions, try to reconnect + match transaction_stream.reconnect_to_grpc_with_retries().await { + Ok(_) => { + info!( + processor_name = processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + stream_address = transaction_stream_config + .indexer_grpc_data_service_address + .to_string(), + "[Parser] Successfully reconnected transaction stream." + ); + }, + Err(e) => { + error!( + processor_name = processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + stream_address = transaction_stream_config + .indexer_grpc_data_service_address + .to_string(), + error = ?e, + "[Parser] Error reconnecting transaction stream." + ); + }, + } + } + }, + } + } +} + +/// Fetches transactions from the channel and processes them. async fn fetch_transactions( processor_name: &str, stream_address: &str, diff --git a/rust/transaction-filter/Cargo.toml b/rust/transaction-filter/Cargo.toml new file mode 100644 index 000000000..837128093 --- /dev/null +++ b/rust/transaction-filter/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "transaction-filter" +version = "0.1.0" + +# Workspace inherited keys +authors = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +publish = { workspace = true } +repository = { workspace = true } +rust-version = { workspace = true } +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +ahash = { workspace = true } +aptos-protos = { workspace = true } +serde = { workspace = true } diff --git a/rust/transaction-filter/src/lib.rs b/rust/transaction-filter/src/lib.rs new file mode 100644 index 000000000..c60bee1bc --- /dev/null +++ b/rust/transaction-filter/src/lib.rs @@ -0,0 +1 @@ +pub mod transaction_filter; diff --git a/rust/processor/src/transaction_filter.rs b/rust/transaction-filter/src/transaction_filter.rs similarity index 100% rename from rust/processor/src/transaction_filter.rs rename to rust/transaction-filter/src/transaction_filter.rs