Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 11 additions & 15 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 20 additions & 2 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
[workspace]
resolver = "2"

members = ["instrumented-channel", "sdk", "sdk-examples"]
members = [
"instrumented-channel",
"moving-average",
"sdk",
"sdk-examples",
"transaction-stream",
]

[workspace.package]
authors = ["Aptos Labs <[email protected]>"]
Expand All @@ -14,12 +20,13 @@ rust-version = "1.75"

[workspace.dependencies]
aptos-indexer-processor-sdk = { path = "sdk" }
aptos-indexer-transaction-stream = { path = "transaction-stream" }
instrumented-channel = { path = "instrumented-channel" }
aptos-moving-average = { path = "moving-average" }
sdk-server-framework = { path = 'sdk-server-framework' }

ahash = { version = "0.8.7", features = ["serde"] }
anyhow = "1.0.86"
aptos-indexer-transaction-stream = { git = 'https://github.com/aptos-labs/aptos-indexer-processors.git', rev = '3c8896b489062ce0510c5e4162366db753b59499' }
aptos-protos = { git = "https://github.com/aptos-labs/aptos-core.git", tag = "aptos-node-v1.12.1" }
aptos-system-utils = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "4541add3fd29826ec57f22658ca286d2d6134b93" }
async-trait = "0.1.80"
Expand Down Expand Up @@ -54,6 +61,7 @@ field_count = "0.1.1"
futures = "0.3.30"
futures-util = "0.3.21"
hex = "0.4.3"
itertools = "0.13.0"
jemallocator = { version = "0.5.0", features = [
"profiling",
"unprefixed_malloc_on_supported_platforms",
Expand All @@ -66,6 +74,7 @@ once_cell = { version = "1.19.0" }
petgraph = "0.6.5"
prometheus = "0.13.4"
prometheus-client = "0.22.2"
prost = { version = "0.12.3", features = ["no-recursion-limit"] }
rayon = "1.10.0"
serde = { version = "1.0.193", features = ["derive", "rc"] }
serde_json = { version = "1.0.81", features = ["preserve_order"] }
Expand All @@ -78,6 +87,15 @@ tiny-keccak = { version = "2.0.2", features = ["keccak", "sha3"] }
tracing = "0.1.34"
tokio = { version = "1.37.0", features = ["full"] }
toml = "0.7.4"
tonic = { version = "0.11.0", features = [
"tls",
"tls-roots",
"transport",
"prost",
"gzip",
"codegen",
"zstd",
] }
tracing-subscriber = { version = "0.3.17", features = ["json", "env-filter"] }
url = { version = "2.5.1", features = ["serde"] }
warp = { version = "0.3.5", features = ["tls"] }
Expand Down
16 changes: 16 additions & 0 deletions rust/moving-average/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "aptos-moving-average"
description = "Utility to calculate moving average such as TPS"
version = "0.1.0"

# Workspace inherited keys
authors = { workspace = true }
edition = { workspace = true }
homepage = { workspace = true }
license = { workspace = true }
publish = { workspace = true }
repository = { workspace = true }
rust-version = { workspace = true }

[dependencies]
chrono = { workspace = true }
84 changes: 84 additions & 0 deletions rust/moving-average/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

#![forbid(unsafe_code)]

use std::collections::VecDeque;

// TPS data
pub struct MovingAverage {
window_millis: u64,
// (timestamp_millis, value)
values: VecDeque<(u64, u64)>,
sum: u64,
}

impl MovingAverage {
pub fn new(window_millis: u64) -> Self {
let now = chrono::Utc::now().naive_utc().and_utc().timestamp_millis() as u64;
let mut queue = VecDeque::new();
queue.push_back((now, 0));
Self {
window_millis,
values: queue,
sum: 0,
}
}

pub fn tick_now(&mut self, value: u64) {
let now = chrono::Utc::now().naive_utc().and_utc().timestamp_millis() as u64;
self.tick(now, value);
}

pub fn tick(&mut self, timestamp_millis: u64, value: u64) -> f64 {
self.values.push_back((timestamp_millis, value));
self.sum += value;
while self.values.len() > 2 {
match self.values.front() {
None => break,
Some((ts, val)) => {
if timestamp_millis - ts > self.window_millis {
self.sum -= val;
self.values.pop_front();
} else {
break;
}
},
}
}
self.avg()
}

// Only be called after tick_now/tick is called.
pub fn avg(&self) -> f64 {
if self.values.len() < 2 {
0.0
} else {
let elapsed = self.values.back().unwrap().0 - self.values.front().unwrap().0;
(self.sum * 1000) as f64 / elapsed as f64
}
}

pub fn sum(&self) -> u64 {
self.sum
}
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn test_moving_average() {
// 10 Second window.
let mut ma = MovingAverage::new(10_000);
// 9 seconds spent at 100 TPS.
for _ in 0..9 {
ma.tick_now(100);
std::thread::sleep(std::time::Duration::from_secs(1));
}
// No matter what algorithm we use, the average should be 99 at least.
let avg = ma.avg();
assert!(avg >= 99.0, "Average is too low: {}", avg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ use crate::{
};
use ahash::AHashMap;
use anyhow::{Context, Result};
use aptos_indexer_processor_sdk::utils::time::parse_timestamp;
use aptos_indexer_processor_sdk::{
steps::{pollable_async_step::PollableAsyncRunType, PollableAsyncStep},
traits::{NamedStep, Processable},
types::transaction_context::TransactionContext,
utils::time::parse_timestamp,
};
use async_trait::async_trait;
use diesel::{upsert::excluded, ExpressionMethods};
Expand Down Expand Up @@ -92,17 +92,15 @@ where
"Gap detected starting from version: {}",
current_batch.start_version
);
self.seen_versions.insert(
current_batch.start_version,
TransactionContext {
self.seen_versions
.insert(current_batch.start_version, TransactionContext {
data: vec![], // No data is needed for tracking. This is to avoid clone.
start_version: current_batch.start_version,
end_version: current_batch.end_version,
start_transaction_timestamp: current_batch.start_transaction_timestamp.clone(),
end_transaction_timestamp: current_batch.end_transaction_timestamp.clone(),
total_size_in_bytes: current_batch.total_size_in_bytes,
},
);
});
} else {
tracing::debug!("No gap detected");
// If the current_batch is the next expected version, update the last success batch
Expand Down
4 changes: 2 additions & 2 deletions rust/sdk-examples/src/processors/events/events_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use aptos_indexer_processor_sdk::{
steps::{TimedBuffer, TransactionStreamStep},
traits::{IntoRunnableStep, RunnableStepWithInputReceiver},
};
use aptos_indexer_transaction_stream::config::TransactionStreamConfig;
use aptos_indexer_transaction_stream::TransactionStreamConfig;
use instrumented_channel::instrumented_bounded_channel;
use std::time::Duration;

Expand Down Expand Up @@ -40,7 +40,7 @@ impl EventsProcessor {
let timed_buffer = TimedBuffer::new(Duration::from_secs(1));
let version_tracker = LatestVersionProcessedTracker::new(
self.db_config,
self.transaction_stream_config.starting_version,
self.transaction_stream_config.starting_version.unwrap(),
"events_processor".to_string(),
)
.await?;
Expand Down
2 changes: 1 addition & 1 deletion rust/sdk-examples/src/processors/events/events_storer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::db::models::events_models::EventModel;
use crate::{
config::indexer_processor_config::DbConfig,
db::models::events_models::EventModel,
schema,
utils::database::{execute_in_chunks, get_config_table_chunk_size, new_db_pool, ArcDbPool},
};
Expand Down
4 changes: 2 additions & 2 deletions rust/sdk/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use aptos_indexer_processor_sdk::{
steps::{TimedBuffer, TransactionStreamStep},
traits::IntoRunnableStep,
};
use aptos_indexer_transaction_stream::config::TransactionStreamConfig;
use aptos_indexer_transaction_stream::TransactionStreamConfig;
use std::time::Duration;
use url::Url;

Expand All @@ -31,7 +31,7 @@ async fn run_processor() -> Result<()> {
// let (input_sender, input_receiver) = kanal::bounded_async(1);
let transaction_stream_config = TransactionStreamConfig {
indexer_grpc_data_service_address: Url::parse("https://grpc.devnet.aptoslabs.com:443")?,
starting_version: 0,
starting_version: Some(0),
request_ending_version: None,
auth_token: String::from("aptoslabs_TJs4NQU8Xf5_EJMNnZFPXRH6YNpWM7bCcurMBEUtZtRb6"),
request_name_header: String::from("sdk_processor"),
Expand Down
3 changes: 1 addition & 2 deletions rust/sdk/src/steps/transaction_stream_step.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ use crate::{
};
use anyhow::Result;
use aptos_indexer_transaction_stream::{
config::TransactionStreamConfig,
transaction_stream::TransactionStream as TransactionStreamInternal,
TransactionStream as TransactionStreamInternal, TransactionStreamConfig,
};
use aptos_protos::transaction::v1::Transaction;
use async_trait::async_trait;
Expand Down
2 changes: 1 addition & 1 deletion rust/sdk/src/types/transaction_context.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use aptos_indexer_transaction_stream::utils::util::timestamp_to_unixtime;
use aptos_indexer_transaction_stream::utils::timestamp_to_unixtime;

/// TransactionContext is a struct that holds data processed from a set of transactions
/// and includes metadata about the transactions that the data is associated with.
Expand Down
31 changes: 31 additions & 0 deletions rust/transaction-stream/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
[package]
name = "aptos-indexer-transaction-stream"
version = "0.1.0"

# Workspace inherited keys
authors = { workspace = true }
edition = { workspace = true }
homepage = { workspace = true }
license = { workspace = true }
publish = { workspace = true }
repository = { workspace = true }
rust-version = { workspace = true }
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = { workspace = true }
aptos-moving-average = { workspace = true }
aptos-protos = { workspace = true }
bigdecimal = { workspace = true }
chrono = { workspace = true }
futures-util = { workspace = true }
itertools = { workspace = true }
kanal = { workspace = true }
once_cell = { workspace = true }
prometheus = { workspace = true }
prost = { workspace = true }
serde = { workspace = true }
tokio = { workspace = true }
tonic = { workspace = true }
tracing = { workspace = true }
url = { workspace = true }
Loading