Skip to content

Commit 5c4719a

Browse files
authored
Create txn stream crate in this repo (#21)
* create txn stream crate in this repo * Comments
1 parent cecedb1 commit 5c4719a

File tree

15 files changed

+848
-31
lines changed

15 files changed

+848
-31
lines changed

rust/Cargo.lock

Lines changed: 11 additions & 15 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/Cargo.toml

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
11
[workspace]
22
resolver = "2"
33

4-
members = ["instrumented-channel", "sdk", "sdk-examples"]
4+
members = [
5+
"instrumented-channel",
6+
"moving-average",
7+
"sdk",
8+
"sdk-examples",
9+
"transaction-stream",
10+
]
511

612
[workspace.package]
713
authors = ["Aptos Labs <[email protected]>"]
@@ -14,12 +20,13 @@ rust-version = "1.75"
1420

1521
[workspace.dependencies]
1622
aptos-indexer-processor-sdk = { path = "sdk" }
23+
aptos-indexer-transaction-stream = { path = "transaction-stream" }
1724
instrumented-channel = { path = "instrumented-channel" }
25+
aptos-moving-average = { path = "moving-average" }
1826
sdk-server-framework = { path = 'sdk-server-framework' }
1927

2028
ahash = { version = "0.8.7", features = ["serde"] }
2129
anyhow = "1.0.86"
22-
aptos-indexer-transaction-stream = { git = 'https://github.com/aptos-labs/aptos-indexer-processors.git', rev = '3c8896b489062ce0510c5e4162366db753b59499' }
2330
aptos-protos = { git = "https://github.com/aptos-labs/aptos-core.git", tag = "aptos-node-v1.12.1" }
2431
aptos-system-utils = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "4541add3fd29826ec57f22658ca286d2d6134b93" }
2532
async-trait = "0.1.80"
@@ -54,6 +61,7 @@ field_count = "0.1.1"
5461
futures = "0.3.30"
5562
futures-util = "0.3.21"
5663
hex = "0.4.3"
64+
itertools = "0.13.0"
5765
jemallocator = { version = "0.5.0", features = [
5866
"profiling",
5967
"unprefixed_malloc_on_supported_platforms",
@@ -66,6 +74,7 @@ once_cell = { version = "1.19.0" }
6674
petgraph = "0.6.5"
6775
prometheus = "0.13.4"
6876
prometheus-client = "0.22.2"
77+
prost = { version = "0.12.3", features = ["no-recursion-limit"] }
6978
rayon = "1.10.0"
7079
serde = { version = "1.0.193", features = ["derive", "rc"] }
7180
serde_json = { version = "1.0.81", features = ["preserve_order"] }
@@ -78,6 +87,15 @@ tiny-keccak = { version = "2.0.2", features = ["keccak", "sha3"] }
7887
tracing = "0.1.34"
7988
tokio = { version = "1.37.0", features = ["full"] }
8089
toml = "0.7.4"
90+
tonic = { version = "0.11.0", features = [
91+
"tls",
92+
"tls-roots",
93+
"transport",
94+
"prost",
95+
"gzip",
96+
"codegen",
97+
"zstd",
98+
] }
8199
tracing-subscriber = { version = "0.3.17", features = ["json", "env-filter"] }
82100
url = { version = "2.5.1", features = ["serde"] }
83101
warp = { version = "0.3.5", features = ["tls"] }

rust/moving-average/Cargo.toml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
[package]
2+
name = "aptos-moving-average"
3+
description = "Utility to calculate moving average such as TPS"
4+
version = "0.1.0"
5+
6+
# Workspace inherited keys
7+
authors = { workspace = true }
8+
edition = { workspace = true }
9+
homepage = { workspace = true }
10+
license = { workspace = true }
11+
publish = { workspace = true }
12+
repository = { workspace = true }
13+
rust-version = { workspace = true }
14+
15+
[dependencies]
16+
chrono = { workspace = true }

rust/moving-average/src/lib.rs

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
// Copyright © Aptos Foundation
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
#![forbid(unsafe_code)]
5+
6+
use std::collections::VecDeque;
7+
8+
// TPS data
9+
pub struct MovingAverage {
10+
window_millis: u64,
11+
// (timestamp_millis, value)
12+
values: VecDeque<(u64, u64)>,
13+
sum: u64,
14+
}
15+
16+
impl MovingAverage {
17+
pub fn new(window_millis: u64) -> Self {
18+
let now = chrono::Utc::now().naive_utc().and_utc().timestamp_millis() as u64;
19+
let mut queue = VecDeque::new();
20+
queue.push_back((now, 0));
21+
Self {
22+
window_millis,
23+
values: queue,
24+
sum: 0,
25+
}
26+
}
27+
28+
pub fn tick_now(&mut self, value: u64) {
29+
let now = chrono::Utc::now().naive_utc().and_utc().timestamp_millis() as u64;
30+
self.tick(now, value);
31+
}
32+
33+
pub fn tick(&mut self, timestamp_millis: u64, value: u64) -> f64 {
34+
self.values.push_back((timestamp_millis, value));
35+
self.sum += value;
36+
while self.values.len() > 2 {
37+
match self.values.front() {
38+
None => break,
39+
Some((ts, val)) => {
40+
if timestamp_millis - ts > self.window_millis {
41+
self.sum -= val;
42+
self.values.pop_front();
43+
} else {
44+
break;
45+
}
46+
},
47+
}
48+
}
49+
self.avg()
50+
}
51+
52+
// Only be called after tick_now/tick is called.
53+
pub fn avg(&self) -> f64 {
54+
if self.values.len() < 2 {
55+
0.0
56+
} else {
57+
let elapsed = self.values.back().unwrap().0 - self.values.front().unwrap().0;
58+
(self.sum * 1000) as f64 / elapsed as f64
59+
}
60+
}
61+
62+
pub fn sum(&self) -> u64 {
63+
self.sum
64+
}
65+
}
66+
67+
#[cfg(test)]
68+
mod test {
69+
use super::*;
70+
71+
#[test]
72+
fn test_moving_average() {
73+
// 10 Second window.
74+
let mut ma = MovingAverage::new(10_000);
75+
// 9 seconds spent at 100 TPS.
76+
for _ in 0..9 {
77+
ma.tick_now(100);
78+
std::thread::sleep(std::time::Duration::from_secs(1));
79+
}
80+
// No matter what algorithm we use, the average should be 99 at least.
81+
let avg = ma.avg();
82+
assert!(avg >= 99.0, "Average is too low: {}", avg);
83+
}
84+
}

rust/sdk-examples/src/common_steps/latest_processed_version_tracker.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@ use crate::{
66
};
77
use ahash::AHashMap;
88
use anyhow::{Context, Result};
9-
use aptos_indexer_processor_sdk::utils::time::parse_timestamp;
109
use aptos_indexer_processor_sdk::{
1110
steps::{pollable_async_step::PollableAsyncRunType, PollableAsyncStep},
1211
traits::{NamedStep, Processable},
1312
types::transaction_context::TransactionContext,
13+
utils::time::parse_timestamp,
1414
};
1515
use async_trait::async_trait;
1616
use diesel::{upsert::excluded, ExpressionMethods};
@@ -92,17 +92,15 @@ where
9292
"Gap detected starting from version: {}",
9393
current_batch.start_version
9494
);
95-
self.seen_versions.insert(
96-
current_batch.start_version,
97-
TransactionContext {
95+
self.seen_versions
96+
.insert(current_batch.start_version, TransactionContext {
9897
data: vec![], // No data is needed for tracking. This is to avoid clone.
9998
start_version: current_batch.start_version,
10099
end_version: current_batch.end_version,
101100
start_transaction_timestamp: current_batch.start_transaction_timestamp.clone(),
102101
end_transaction_timestamp: current_batch.end_transaction_timestamp.clone(),
103102
total_size_in_bytes: current_batch.total_size_in_bytes,
104-
},
105-
);
103+
});
106104
} else {
107105
tracing::debug!("No gap detected");
108106
// If the current_batch is the next expected version, update the last success batch

rust/sdk-examples/src/processors/events/events_processor.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use aptos_indexer_processor_sdk::{
99
steps::{TimedBuffer, TransactionStreamStep},
1010
traits::{IntoRunnableStep, RunnableStepWithInputReceiver},
1111
};
12-
use aptos_indexer_transaction_stream::config::TransactionStreamConfig;
12+
use aptos_indexer_transaction_stream::TransactionStreamConfig;
1313
use instrumented_channel::instrumented_bounded_channel;
1414
use std::time::Duration;
1515

@@ -40,7 +40,7 @@ impl EventsProcessor {
4040
let timed_buffer = TimedBuffer::new(Duration::from_secs(1));
4141
let version_tracker = LatestVersionProcessedTracker::new(
4242
self.db_config,
43-
self.transaction_stream_config.starting_version,
43+
self.transaction_stream_config.starting_version.unwrap(),
4444
"events_processor".to_string(),
4545
)
4646
.await?;

rust/sdk-examples/src/processors/events/events_storer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
use crate::db::models::events_models::EventModel;
21
use crate::{
32
config::indexer_processor_config::DbConfig,
3+
db::models::events_models::EventModel,
44
schema,
55
utils::database::{execute_in_chunks, get_config_table_chunk_size, new_db_pool, ArcDbPool},
66
};

rust/sdk/src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use aptos_indexer_processor_sdk::{
44
steps::{TimedBuffer, TransactionStreamStep},
55
traits::IntoRunnableStep,
66
};
7-
use aptos_indexer_transaction_stream::config::TransactionStreamConfig;
7+
use aptos_indexer_transaction_stream::TransactionStreamConfig;
88
use std::time::Duration;
99
use url::Url;
1010

@@ -31,7 +31,7 @@ async fn run_processor() -> Result<()> {
3131
// let (input_sender, input_receiver) = kanal::bounded_async(1);
3232
let transaction_stream_config = TransactionStreamConfig {
3333
indexer_grpc_data_service_address: Url::parse("https://grpc.devnet.aptoslabs.com:443")?,
34-
starting_version: 0,
34+
starting_version: Some(0),
3535
request_ending_version: None,
3636
auth_token: String::from("aptoslabs_TJs4NQU8Xf5_EJMNnZFPXRH6YNpWM7bCcurMBEUtZtRb6"),
3737
request_name_header: String::from("sdk_processor"),

rust/sdk/src/steps/transaction_stream_step.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@ use crate::{
55
};
66
use anyhow::Result;
77
use aptos_indexer_transaction_stream::{
8-
config::TransactionStreamConfig,
9-
transaction_stream::TransactionStream as TransactionStreamInternal,
8+
TransactionStream as TransactionStreamInternal, TransactionStreamConfig,
109
};
1110
use aptos_protos::transaction::v1::Transaction;
1211
use async_trait::async_trait;

rust/sdk/src/types/transaction_context.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use aptos_indexer_transaction_stream::utils::util::timestamp_to_unixtime;
1+
use aptos_indexer_transaction_stream::utils::timestamp_to_unixtime;
22

33
/// TransactionContext is a struct that holds data processed from a set of transactions
44
/// and includes metadata about the transactions that the data is associated with.

0 commit comments

Comments
 (0)