Skip to content

Commit 9cf9d3d

Browse files
committed
create txn stream crate in this repo
1 parent 51a0865 commit 9cf9d3d

File tree

12 files changed

+841
-19
lines changed

12 files changed

+841
-19
lines changed

rust/Cargo.lock

Lines changed: 11 additions & 15 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/Cargo.toml

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
11
[workspace]
22
resolver = "2"
33

4-
members = ["instrumented-channel", "sdk", "sdk-examples"]
4+
members = [
5+
"instrumented-channel",
6+
"moving-average",
7+
"sdk",
8+
"sdk-examples",
9+
"transaction-stream",
10+
]
511

612
[workspace.package]
713
authors = ["Aptos Labs <[email protected]>"]
@@ -14,12 +20,13 @@ rust-version = "1.75"
1420

1521
[workspace.dependencies]
1622
aptos-indexer-processor-sdk = { path = "sdk" }
23+
aptos-indexer-transaction-stream = { path = "transaction-stream" }
1724
instrumented-channel = { path = "instrumented-channel" }
25+
aptos-moving-average = { path = "moving-average" }
1826
sdk-server-framework = { path = 'sdk-server-framework' }
1927

2028
ahash = { version = "0.8.7", features = ["serde"] }
2129
anyhow = "1.0.86"
22-
aptos-indexer-transaction-stream = { git = 'https://github.com/aptos-labs/aptos-indexer-processors.git', rev = '3c8896b489062ce0510c5e4162366db753b59499' }
2330
aptos-protos = { git = "https://github.com/aptos-labs/aptos-core.git", tag = "aptos-node-v1.12.1" }
2431
aptos-system-utils = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "4541add3fd29826ec57f22658ca286d2d6134b93" }
2532
async-trait = "0.1.80"
@@ -54,6 +61,7 @@ field_count = "0.1.1"
5461
futures = "0.3.30"
5562
futures-util = "0.3.21"
5663
hex = "0.4.3"
64+
itertools = "0.13.0"
5765
jemallocator = { version = "0.5.0", features = [
5866
"profiling",
5967
"unprefixed_malloc_on_supported_platforms",
@@ -66,6 +74,7 @@ once_cell = { version = "1.19.0" }
6674
petgraph = "0.6.5"
6775
prometheus = "0.13.4"
6876
prometheus-client = "0.22.2"
77+
prost = { version = "0.12.3", features = ["no-recursion-limit"] }
6978
rayon = "1.10.0"
7079
serde = { version = "1.0.193", features = ["derive", "rc"] }
7180
serde_json = { version = "1.0.81", features = ["preserve_order"] }
@@ -78,6 +87,15 @@ tiny-keccak = { version = "2.0.2", features = ["keccak", "sha3"] }
7887
tracing = "0.1.34"
7988
tokio = { version = "1.37.0", features = ["full"] }
8089
toml = "0.7.4"
90+
tonic = { version = "0.11.0", features = [
91+
"tls",
92+
"tls-roots",
93+
"transport",
94+
"prost",
95+
"gzip",
96+
"codegen",
97+
"zstd",
98+
] }
8199
tracing-subscriber = { version = "0.3.17", features = ["json", "env-filter"] }
82100
url = { version = "2.5.1", features = ["serde"] }
83101
warp = { version = "0.3.5", features = ["tls"] }

rust/moving-average/Cargo.toml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
[package]
2+
name = "aptos-moving-average"
3+
description = "Utility to calculate moving average such as TPS"
4+
version = "0.1.0"
5+
6+
# Workspace inherited keys
7+
authors = { workspace = true }
8+
edition = { workspace = true }
9+
homepage = { workspace = true }
10+
license = { workspace = true }
11+
publish = { workspace = true }
12+
repository = { workspace = true }
13+
rust-version = { workspace = true }
14+
15+
[dependencies]
16+
chrono = { workspace = true }

rust/moving-average/src/lib.rs

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
// Copyright © Aptos Foundation
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
#![forbid(unsafe_code)]
5+
6+
use std::collections::VecDeque;
7+
8+
// TPS data
9+
pub struct MovingAverage {
10+
window_millis: u64,
11+
// (timestamp_millis, value)
12+
values: VecDeque<(u64, u64)>,
13+
sum: u64,
14+
}
15+
16+
impl MovingAverage {
17+
pub fn new(window_millis: u64) -> Self {
18+
let now = chrono::Utc::now().naive_utc().timestamp_millis() as u64;
19+
let mut queue = VecDeque::new();
20+
queue.push_back((now, 0));
21+
Self {
22+
window_millis,
23+
values: queue,
24+
sum: 0,
25+
}
26+
}
27+
28+
pub fn tick_now(&mut self, value: u64) {
29+
let now = chrono::Utc::now().naive_utc().timestamp_millis() as u64;
30+
self.tick(now, value);
31+
}
32+
33+
pub fn tick(&mut self, timestamp_millis: u64, value: u64) -> f64 {
34+
self.values.push_back((timestamp_millis, value));
35+
self.sum += value;
36+
while self.values.len() > 2 {
37+
match self.values.front() {
38+
None => break,
39+
Some((ts, val)) => {
40+
if timestamp_millis - ts > self.window_millis {
41+
self.sum -= val;
42+
self.values.pop_front();
43+
} else {
44+
break;
45+
}
46+
},
47+
}
48+
}
49+
self.avg()
50+
}
51+
52+
// Only be called after tick_now/tick is called.
53+
pub fn avg(&self) -> f64 {
54+
if self.values.len() < 2 {
55+
0.0
56+
} else {
57+
let elapsed = self.values.back().unwrap().0 - self.values.front().unwrap().0;
58+
(self.sum * 1000) as f64 / elapsed as f64
59+
}
60+
}
61+
62+
pub fn sum(&self) -> u64 {
63+
self.sum
64+
}
65+
}
66+
67+
#[cfg(test)]
68+
mod test {
69+
use super::*;
70+
71+
#[test]
72+
fn test_moving_average() {
73+
// 10 Second window.
74+
let mut ma = MovingAverage::new(10_000);
75+
// 9 seconds spent at 100 TPS.
76+
for _ in 0..9 {
77+
ma.tick_now(100);
78+
std::thread::sleep(std::time::Duration::from_secs(1));
79+
}
80+
// No matter what algorithm we use, the average should be 99 at least.
81+
let avg = ma.avg();
82+
assert!(avg >= 99.0, "Average is too low: {}", avg);
83+
}
84+
}

rust/sdk/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ async fn run_processor() -> Result<()> {
3131
// let (input_sender, input_receiver) = kanal::bounded_async(1);
3232
let transaction_stream_config = TransactionStreamConfig {
3333
indexer_grpc_data_service_address: Url::parse("https://grpc.devnet.aptoslabs.com:443")?,
34-
starting_version: 0,
34+
starting_version: Some(0),
3535
request_ending_version: None,
3636
auth_token: String::from("aptoslabs_TJs4NQU8Xf5_EJMNnZFPXRH6YNpWM7bCcurMBEUtZtRb6"),
3737
request_name_header: String::from("sdk_processor"),

rust/sdk/src/types/transaction_context.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use aptos_indexer_transaction_stream::utils::util::timestamp_to_unixtime;
1+
use aptos_indexer_transaction_stream::utils::timestamp::timestamp_to_unixtime;
22

33
/// TransactionContext is a struct that holds data processed from a set of transactions
44
/// and includes metadata about the transactions that the data is associated with.

rust/transaction-stream/Cargo.toml

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
[package]
2+
name = "aptos-indexer-transaction-stream"
3+
version = "0.1.0"
4+
5+
# Workspace inherited keys
6+
authors = { workspace = true }
7+
edition = { workspace = true }
8+
homepage = { workspace = true }
9+
license = { workspace = true }
10+
publish = { workspace = true }
11+
repository = { workspace = true }
12+
rust-version = { workspace = true }
13+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
14+
15+
[dependencies]
16+
anyhow = { workspace = true }
17+
aptos-moving-average = { workspace = true }
18+
aptos-protos = { workspace = true }
19+
bigdecimal = { workspace = true }
20+
chrono = { workspace = true }
21+
futures-util = { workspace = true }
22+
itertools = { workspace = true }
23+
kanal = { workspace = true }
24+
once_cell = { workspace = true }
25+
prometheus = { workspace = true }
26+
prost = { workspace = true }
27+
serde = { workspace = true }
28+
tokio = { workspace = true }
29+
tonic = { workspace = true }
30+
tracing = { workspace = true }
31+
url = { workspace = true }
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
use serde::{Deserialize, Serialize};
2+
use std::time::Duration;
3+
use url::Url;
4+
5+
#[derive(Clone, Debug, Deserialize, Serialize)]
6+
#[serde(deny_unknown_fields)]
7+
pub struct TransactionStreamConfig {
8+
pub indexer_grpc_data_service_address: Url,
9+
pub starting_version: Option<u64>,
10+
pub request_ending_version: Option<u64>,
11+
pub auth_token: String,
12+
pub request_name_header: String,
13+
#[serde(default = "TransactionStreamConfig::default_indexer_grpc_http2_ping_interval")]
14+
pub indexer_grpc_http2_ping_interval_secs: u64,
15+
#[serde(default = "TransactionStreamConfig::default_indexer_grpc_http2_ping_timeout")]
16+
pub indexer_grpc_http2_ping_timeout_secs: u64,
17+
#[serde(default = "TransactionStreamConfig::default_indexer_grpc_reconnection_timeout")]
18+
pub indexer_grpc_reconnection_timeout_secs: u64,
19+
#[serde(default = "TransactionStreamConfig::default_indexer_grpc_response_item_timeout")]
20+
pub indexer_grpc_response_item_timeout_secs: u64,
21+
}
22+
23+
impl TransactionStreamConfig {
24+
pub const fn indexer_grpc_http2_ping_interval(&self) -> Duration {
25+
Duration::from_secs(self.indexer_grpc_http2_ping_interval_secs)
26+
}
27+
28+
pub const fn indexer_grpc_http2_ping_timeout(&self) -> Duration {
29+
Duration::from_secs(self.indexer_grpc_http2_ping_timeout_secs)
30+
}
31+
32+
pub const fn indexer_grpc_reconnection_timeout(&self) -> Duration {
33+
Duration::from_secs(self.indexer_grpc_reconnection_timeout_secs)
34+
}
35+
36+
pub const fn indexer_grpc_response_item_timeout(&self) -> Duration {
37+
Duration::from_secs(self.indexer_grpc_response_item_timeout_secs)
38+
}
39+
40+
/// Indexer GRPC http2 ping interval in seconds. Defaults to 30.
41+
/// Tonic ref: https://docs.rs/tonic/latest/tonic/transport/channel/struct.Endpoint.html#method.http2_keep_alive_interval
42+
pub const fn default_indexer_grpc_http2_ping_interval() -> u64 {
43+
30
44+
}
45+
46+
/// Indexer GRPC http2 ping timeout in seconds. Defaults to 10.
47+
pub const fn default_indexer_grpc_http2_ping_timeout() -> u64 {
48+
10
49+
}
50+
51+
/// Default timeout for establishing a grpc connection. Defaults to 5 seconds.
52+
pub const fn default_indexer_grpc_reconnection_timeout() -> u64 {
53+
5
54+
}
55+
56+
/// Default timeout for receiving an item from grpc stream. Defaults to 60 seconds.
57+
pub const fn default_indexer_grpc_response_item_timeout() -> u64 {
58+
60
59+
}
60+
}

rust/transaction-stream/src/lib.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pub mod config;
2+
pub mod transaction_stream;
3+
pub mod utils;
4+
5+
pub use config::TransactionStreamConfig;
6+
pub use transaction_stream::{TransactionStream, TransactionsPBResponse};

0 commit comments

Comments
 (0)