Skip to content

Commit cecedb1

Browse files
authored
LatestProcessedVersion step (#18)
* add version tracker * move files around * make it work with events processor
1 parent 51a0865 commit cecedb1

File tree

14 files changed

+273
-34
lines changed

14 files changed

+273
-34
lines changed
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
use crate::{
2+
config::indexer_processor_config::DbConfig,
3+
db::models::processor_status::ProcessorStatus,
4+
schema::processor_status,
5+
utils::database::{execute_with_better_error, new_db_pool, ArcDbPool},
6+
};
7+
use ahash::AHashMap;
8+
use anyhow::{Context, Result};
9+
use aptos_indexer_processor_sdk::utils::time::parse_timestamp;
10+
use aptos_indexer_processor_sdk::{
11+
steps::{pollable_async_step::PollableAsyncRunType, PollableAsyncStep},
12+
traits::{NamedStep, Processable},
13+
types::transaction_context::TransactionContext,
14+
};
15+
use async_trait::async_trait;
16+
use diesel::{upsert::excluded, ExpressionMethods};
17+
18+
const UPDATE_PROCESSOR_STATUS_SECS: u64 = 1;
19+
20+
pub struct LatestVersionProcessedTracker<T>
21+
where
22+
Self: Sized + Send + 'static,
23+
T: Send + 'static,
24+
{
25+
conn_pool: ArcDbPool,
26+
tracker_name: String,
27+
// Next version to process that we expect.
28+
next_version: u64,
29+
// Last successful batch of sequentially processed transactions. Includes metadata to write to storage.
30+
last_success_batch: Option<TransactionContext<T>>,
31+
// Tracks all the versions that have been processed out of order.
32+
seen_versions: AHashMap<u64, TransactionContext<T>>,
33+
}
34+
35+
impl<T> LatestVersionProcessedTracker<T>
36+
where
37+
Self: Sized + Send + 'static,
38+
T: Send + 'static,
39+
{
40+
pub async fn new(
41+
db_config: DbConfig,
42+
starting_version: u64,
43+
tracker_name: String,
44+
) -> Result<Self> {
45+
let conn_pool = new_db_pool(
46+
&db_config.postgres_connection_string,
47+
Some(db_config.db_pool_size),
48+
)
49+
.await
50+
.context("Failed to create connection pool")?;
51+
Ok(Self {
52+
conn_pool,
53+
tracker_name,
54+
next_version: starting_version,
55+
last_success_batch: None,
56+
seen_versions: AHashMap::new(),
57+
})
58+
}
59+
60+
fn update_last_success_batch(&mut self, current_batch: TransactionContext<T>) {
61+
let mut new_prev_batch = current_batch;
62+
// While there are batches in seen_versions that are in order, update the new_prev_batch to the next batch.
63+
while let Some(next_version) = self.seen_versions.remove(&(new_prev_batch.end_version + 1))
64+
{
65+
new_prev_batch = next_version;
66+
}
67+
self.next_version = new_prev_batch.end_version + 1;
68+
self.last_success_batch = Some(new_prev_batch);
69+
}
70+
}
71+
72+
#[async_trait]
73+
impl<T> Processable for LatestVersionProcessedTracker<T>
74+
where
75+
Self: Sized + Send + 'static,
76+
T: Send + 'static,
77+
{
78+
type Input = T;
79+
type Output = T;
80+
type RunType = PollableAsyncRunType;
81+
82+
async fn process(
83+
&mut self,
84+
current_batch: TransactionContext<T>,
85+
) -> Option<TransactionContext<T>> {
86+
// If there's a gap in the next_version and current_version, save the current_version to seen_versions for
87+
// later processing.
88+
if self.next_version != current_batch.start_version {
89+
tracing::debug!(
90+
next_version = self.next_version,
91+
step = self.name(),
92+
"Gap detected starting from version: {}",
93+
current_batch.start_version
94+
);
95+
self.seen_versions.insert(
96+
current_batch.start_version,
97+
TransactionContext {
98+
data: vec![], // No data is needed for tracking. This is to avoid clone.
99+
start_version: current_batch.start_version,
100+
end_version: current_batch.end_version,
101+
start_transaction_timestamp: current_batch.start_transaction_timestamp.clone(),
102+
end_transaction_timestamp: current_batch.end_transaction_timestamp.clone(),
103+
total_size_in_bytes: current_batch.total_size_in_bytes,
104+
},
105+
);
106+
} else {
107+
tracing::debug!("No gap detected");
108+
// If the current_batch is the next expected version, update the last success batch
109+
self.update_last_success_batch(TransactionContext {
110+
data: vec![], // No data is needed for tracking. This is to avoid clone.
111+
start_version: current_batch.start_version,
112+
end_version: current_batch.end_version,
113+
start_transaction_timestamp: current_batch.start_transaction_timestamp.clone(),
114+
end_transaction_timestamp: current_batch.end_transaction_timestamp.clone(),
115+
total_size_in_bytes: current_batch.total_size_in_bytes,
116+
});
117+
}
118+
// Pass through
119+
Some(current_batch)
120+
}
121+
}
122+
123+
#[async_trait]
124+
impl<T: Send + 'static> PollableAsyncStep for LatestVersionProcessedTracker<T>
125+
where
126+
Self: Sized + Send + 'static,
127+
T: Send + 'static,
128+
{
129+
fn poll_interval(&self) -> std::time::Duration {
130+
std::time::Duration::from_secs(UPDATE_PROCESSOR_STATUS_SECS)
131+
}
132+
133+
async fn poll(&mut self) -> Option<Vec<TransactionContext<T>>> {
134+
// TODO: Add metrics for gap count
135+
// Update the processor status
136+
if let Some(last_success_batch) = self.last_success_batch.as_ref() {
137+
let end_timestamp = last_success_batch
138+
.end_transaction_timestamp
139+
.as_ref()
140+
.map(|t| parse_timestamp(t, last_success_batch.end_version as i64))
141+
.map(|t| t.naive_utc());
142+
let status = ProcessorStatus {
143+
processor: self.tracker_name.clone(),
144+
last_success_version: last_success_batch.end_version as i64,
145+
last_transaction_timestamp: end_timestamp,
146+
};
147+
execute_with_better_error(
148+
self.conn_pool.clone(),
149+
diesel::insert_into(processor_status::table)
150+
.values(&status)
151+
.on_conflict(processor_status::processor)
152+
.do_update()
153+
.set((
154+
processor_status::last_success_version
155+
.eq(excluded(processor_status::last_success_version)),
156+
processor_status::last_updated.eq(excluded(processor_status::last_updated)),
157+
processor_status::last_transaction_timestamp
158+
.eq(excluded(processor_status::last_transaction_timestamp)),
159+
)),
160+
Some(" WHERE processor_status.last_success_version <= EXCLUDED.last_success_version "),
161+
)
162+
.await
163+
.expect("Failed to update processor status");
164+
}
165+
// Nothing should be returned
166+
None
167+
}
168+
}
169+
170+
impl<T> NamedStep for LatestVersionProcessedTracker<T>
171+
where
172+
Self: Sized + Send + 'static,
173+
T: Send + 'static,
174+
{
175+
fn name(&self) -> String {
176+
format!(
177+
"LatestVersionProcessedTracker: {}",
178+
std::any::type_name::<T>()
179+
)
180+
}
181+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod latest_processed_version_tracker;

rust/sdk-examples/src/config/indexer_processor_config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
use super::processor_config::ProcessorConfig;
5-
use crate::events_processor::processor::EventsProcessor;
5+
use crate::processors::events::events_processor::EventsProcessor;
66
use anyhow::Result;
77
use aptos_indexer_transaction_stream::TransactionStreamConfig;
88
use sdk_server_framework::RunnableConfig;

rust/sdk-examples/src/db/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod models;
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
pub mod events_models;
2+
pub mod processor_status;
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Copyright © Aptos Foundation
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
#![allow(clippy::extra_unused_lifetimes)]
5+
6+
use crate::{schema::processor_status, utils::database::DbPoolConnection};
7+
use diesel::{AsChangeset, ExpressionMethods, Insertable, OptionalExtension, QueryDsl, Queryable};
8+
use diesel_async::RunQueryDsl;
9+
10+
#[derive(AsChangeset, Debug, Insertable)]
11+
#[diesel(table_name = processor_status)]
12+
/// Only tracking the latest version successfully processed
13+
pub struct ProcessorStatus {
14+
pub processor: String,
15+
pub last_success_version: i64,
16+
pub last_transaction_timestamp: Option<chrono::NaiveDateTime>,
17+
}
18+
19+
#[derive(AsChangeset, Debug, Queryable)]
20+
#[diesel(table_name = processor_status)]
21+
/// Only tracking the latest version successfully processed
22+
pub struct ProcessorStatusQuery {
23+
pub processor: String,
24+
pub last_success_version: i64,
25+
pub last_updated: chrono::NaiveDateTime,
26+
pub last_transaction_timestamp: Option<chrono::NaiveDateTime>,
27+
}
28+
29+
impl ProcessorStatusQuery {
30+
pub async fn get_by_processor(
31+
processor_name: &str,
32+
conn: &mut DbPoolConnection<'_>,
33+
) -> diesel::QueryResult<Option<Self>> {
34+
processor_status::table
35+
.filter(processor_status::processor.eq(processor_name))
36+
.first::<Self>(conn)
37+
.await
38+
.optional()
39+
}
40+
}

rust/sdk-examples/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
pub mod common_steps;
12
pub mod config;
2-
pub mod events_processor;
3+
pub mod db;
4+
pub mod processors;
35
pub mod schema;
46
pub mod utils;

rust/sdk-examples/src/events_processor/events_extractor.rs renamed to rust/sdk-examples/src/processors/events/events_extractor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use super::events_models::EventModel;
1+
use crate::db::models::events_models::EventModel;
22
use aptos_indexer_processor_sdk::{
33
steps::{async_step::AsyncRunType, AsyncStep},
44
traits::{NamedStep, Processable},

rust/sdk-examples/src/events_processor/processor.rs renamed to rust/sdk-examples/src/processors/events/events_processor.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
use super::{events_extractor::EventsExtractor, events_storer::EventsStorer};
2-
use crate::config::indexer_processor_config::DbConfig;
2+
use crate::{
3+
common_steps::latest_processed_version_tracker::LatestVersionProcessedTracker,
4+
config::indexer_processor_config::DbConfig,
5+
};
36
use anyhow::Result;
47
use aptos_indexer_processor_sdk::{
58
builder::ProcessorBuilder,
@@ -26,21 +29,29 @@ impl EventsProcessor {
2629
pub async fn run_processor(self) -> Result<()> {
2730
let (_input_sender, input_receiver) = instrumented_bounded_channel("input", 1);
2831

29-
let transaction_stream = TransactionStreamStep::new(self.transaction_stream_config).await?;
32+
let transaction_stream =
33+
TransactionStreamStep::new(self.transaction_stream_config.clone()).await?;
3034
let transaction_stream_with_input = RunnableStepWithInputReceiver::new(
3135
input_receiver,
3236
transaction_stream.into_runnable_step(),
3337
);
3438
let events_extractor = EventsExtractor {};
3539
let events_storer = EventsStorer::new(self.db_config.clone()).await?;
3640
let timed_buffer = TimedBuffer::new(Duration::from_secs(1));
41+
let version_tracker = LatestVersionProcessedTracker::new(
42+
self.db_config,
43+
self.transaction_stream_config.starting_version,
44+
"events_processor".to_string(),
45+
)
46+
.await?;
3747

3848
let (_, buffer_receiver) = ProcessorBuilder::new_with_runnable_input_receiver_first_step(
3949
transaction_stream_with_input,
4050
)
4151
.connect_to(events_extractor.into_runnable_step(), 10)
4252
.connect_to(timed_buffer.into_runnable_step(), 10)
4353
.connect_to(events_storer.into_runnable_step(), 10)
54+
.connect_to(version_tracker.into_runnable_step(), 10)
4455
.end_and_return_output_receiver(10);
4556

4657
loop {

0 commit comments

Comments
 (0)