Skip to content

Commit 55c3294

Browse files
committed
upgrade dep
1 parent 7e7d3b8 commit 55c3294

File tree

9 files changed

+138
-94
lines changed

9 files changed

+138
-94
lines changed

rust/Cargo.lock

Lines changed: 10 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/processor/src/processors/fungible_asset_processor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ fn insert_fungible_asset_balances_query(
239239
)
240240
}
241241

242-
fn insert_current_fungible_asset_balances_query(
242+
pub fn insert_current_fungible_asset_balances_query(
243243
items_to_insert: Vec<CurrentFungibleAssetBalance>,
244244
) -> (
245245
impl QueryFragment<Pg> + diesel::query_builder::QueryId + Send,

rust/sdk-processor/src/processors/events_processor.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,13 @@ impl EventsProcessor {
136136
continue;
137137
}
138138
debug!(
139-
"Finished processing events from versions [{:?}, {:?}]",
140-
txn_context.start_version, txn_context.end_version,
139+
"Finished processing events from versions {}",
140+
txn_context
141+
.get_versions()
142+
.iter()
143+
.map(|(x, y)| format!("[{}, {}]", x, y))
144+
.collect::<Vec<_>>()
145+
.join(", "),
141146
);
142147
},
143148
Err(e) => {

rust/sdk-processor/src/processors/fungible_asset_processor.rs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@ use crate::{
66
steps::{
77
common::latest_processed_version_tracker::LatestVersionProcessedTracker,
88
events_processor::{EventsExtractor, EventsStorer},
9+
fungible_asset_processor::{
10+
fungible_asset_extractor::FungibleAssetExtractor,
11+
fungible_asset_storer::FungibleAssetStorer,
12+
},
913
},
1014
utils::{
1115
chain_id::check_or_update_chain_id,
@@ -106,8 +110,8 @@ impl FungibleAssetProcessor {
106110
..self.config.transaction_stream_config
107111
})
108112
.await?;
109-
// let events_extractor = EventsExtractor {};
110-
// let events_storer = EventsStorer::new(self.db_pool.clone(), events_processor_config);
113+
let fa_extractor = FungibleAssetExtractor {};
114+
let fa_storer = FungibleAssetStorer::new(self.db_pool.clone(), fa_config);
111115
let version_tracker = LatestVersionProcessedTracker::new(
112116
self.db_pool.clone(),
113117
starting_version,
@@ -118,8 +122,8 @@ impl FungibleAssetProcessor {
118122
let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step(
119123
transaction_stream.into_runnable_step(),
120124
)
121-
// .connect_to(events_extractor.into_runnable_step(), channel_size)
122-
// .connect_to(events_storer.into_runnable_step(), channel_size)
125+
.connect_to(fa_extractor.into_runnable_step(), channel_size)
126+
.connect_to(fa_storer.into_runnable_step(), channel_size)
123127
.connect_to(version_tracker.into_runnable_step(), channel_size)
124128
.end_and_return_output_receiver(channel_size);
125129

@@ -131,8 +135,13 @@ impl FungibleAssetProcessor {
131135
continue;
132136
}
133137
debug!(
134-
"Finished processing versions [{:?}, {:?}]",
135-
txn_context.start_version, txn_context.end_version,
138+
"Finished processing versions {}",
139+
txn_context
140+
.get_versions()
141+
.iter()
142+
.map(|(x, y)| format!("[{}, {}]", x, y))
143+
.collect::<Vec<_>>()
144+
.join(", "),
136145
);
137146
},
138147
Err(e) => {

rust/sdk-processor/src/steps/common/latest_processed_version_tracker.rs

Lines changed: 40 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::utils::database::{execute_with_better_error, ArcDbPool};
22
use ahash::AHashMap;
33
use anyhow::Result;
44
use aptos_indexer_processor_sdk::{
5+
aptos_protos,
56
traits::{
67
pollable_async_step::PollableAsyncRunType, NamedStep, PollableAsyncStep, Processable,
78
},
@@ -11,10 +12,18 @@ use aptos_indexer_processor_sdk::{
1112
use async_trait::async_trait;
1213
use diesel::{upsert::excluded, ExpressionMethods};
1314
use processor::{db::common::models::processor_status::ProcessorStatus, schema::processor_status};
15+
use std::marker::PhantomData;
1416
use tracing::info;
1517

1618
const UPDATE_PROCESSOR_STATUS_SECS: u64 = 1;
1719

20+
pub struct ProcessedBatch {
21+
pub start_version: u64,
22+
pub end_version: u64,
23+
pub start_transaction_timestamp: Option<aptos_protos::util::timestamp::Timestamp>,
24+
pub end_transaction_timestamp: Option<aptos_protos::util::timestamp::Timestamp>,
25+
}
26+
1827
pub struct LatestVersionProcessedTracker<T>
1928
where
2029
Self: Sized + Send + 'static,
@@ -25,9 +34,10 @@ where
2534
// Next version to process that we expect.
2635
next_version: u64,
2736
// Last successful batch of sequentially processed transactions. Includes metadata to write to storage.
28-
last_success_batch: Option<TransactionContext<T>>,
37+
last_success_batch: Option<ProcessedBatch>,
2938
// Tracks all the versions that have been processed out of order.
30-
seen_versions: AHashMap<u64, TransactionContext<T>>,
39+
seen_versions: AHashMap<u64, ProcessedBatch>,
40+
_marker: PhantomData<T>,
3141
}
3242

3343
impl<T> LatestVersionProcessedTracker<T>
@@ -42,10 +52,11 @@ where
4252
next_version: starting_version,
4353
last_success_batch: None,
4454
seen_versions: AHashMap::new(),
55+
_marker: PhantomData,
4556
}
4657
}
4758

48-
fn update_last_success_batch(&mut self, current_batch: TransactionContext<T>) {
59+
fn update_last_success_batch(&mut self, current_batch: ProcessedBatch) {
4960
let mut new_prev_batch = current_batch;
5061
// While there are batches in seen_versions that are in order, update the new_prev_batch to the next batch.
5162
while let Some(next_version) = self.seen_versions.remove(&(new_prev_batch.end_version + 1))
@@ -107,41 +118,33 @@ where
107118
&mut self,
108119
current_batch: TransactionContext<T>,
109120
) -> Result<Option<TransactionContext<T>>, ProcessorError> {
110-
// info!(
111-
// start_version = current_batch.start_version,
112-
// end_version = current_batch.end_version,
113-
// step_name = self.name(),
114-
// "Processing versions"
115-
// );
116-
// If there's a gap in the next_version and current_version, save the current_version to seen_versions for
117-
// later processing.
118-
if self.next_version != current_batch.start_version {
119-
info!(
120-
expected_next_version = self.next_version,
121-
step = self.name(),
122-
batch_version = current_batch.start_version,
123-
"Gap detected",
124-
);
125-
self.seen_versions
126-
.insert(current_batch.start_version, TransactionContext {
127-
data: vec![], // No data is needed for tracking. This is to avoid clone.
128-
start_version: current_batch.start_version,
129-
end_version: current_batch.end_version,
130-
start_transaction_timestamp: current_batch.start_transaction_timestamp.clone(),
131-
end_transaction_timestamp: current_batch.end_transaction_timestamp.clone(),
132-
total_size_in_bytes: current_batch.total_size_in_bytes,
121+
for context in current_batch.context.iter() {
122+
// If there's a gap in the next_version and current_version, save the current_version to seen_versions for
123+
// later processing.
124+
if self.next_version != context.start_version {
125+
tracing::debug!(
126+
next_version = self.next_version,
127+
step = self.name(),
128+
"Gap detected starting from version: {}",
129+
context.start_version
130+
);
131+
self.seen_versions
132+
.insert(context.start_version, ProcessedBatch {
133+
start_version: context.start_version,
134+
end_version: context.end_version,
135+
start_transaction_timestamp: context.start_transaction_timestamp.clone(),
136+
end_transaction_timestamp: context.end_transaction_timestamp.clone(),
137+
});
138+
} else {
139+
tracing::debug!("No gap detected");
140+
// If the current_batch is the next expected version, update the last success batch
141+
self.update_last_success_batch(ProcessedBatch {
142+
start_version: context.start_version,
143+
end_version: context.end_version,
144+
start_transaction_timestamp: context.start_transaction_timestamp.clone(),
145+
end_transaction_timestamp: context.end_transaction_timestamp.clone(),
133146
});
134-
} else {
135-
// info!("No gap detected");
136-
// If the current_batch is the next expected version, update the last success batch
137-
self.update_last_success_batch(TransactionContext {
138-
data: vec![], // No data is needed for tracking. This is to avoid clone.
139-
start_version: current_batch.start_version,
140-
end_version: current_batch.end_version,
141-
start_transaction_timestamp: current_batch.start_transaction_timestamp.clone(),
142-
end_transaction_timestamp: current_batch.end_transaction_timestamp.clone(),
143-
total_size_in_bytes: current_batch.total_size_in_bytes,
144-
});
147+
}
145148
}
146149
// Pass through
147150
Ok(Some(current_batch))

rust/sdk-processor/src/steps/events_processor/events_extractor.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,7 @@ impl Processable for EventsExtractor {
6868
.collect::<Vec<EventModel>>();
6969
Ok(Some(TransactionContext {
7070
data: events,
71-
start_version: item.start_version,
72-
end_version: item.end_version,
73-
start_transaction_timestamp: item.start_transaction_timestamp,
74-
end_transaction_timestamp: item.end_transaction_timestamp,
75-
total_size_in_bytes: item.total_size_in_bytes,
71+
context: item.context,
7672
}))
7773
}
7874
}

rust/sdk-processor/src/steps/events_processor/events_storer.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,15 +84,26 @@ impl Processable for EventsStorer {
8484
match execute_res {
8585
Ok(_) => {
8686
debug!(
87-
"Events version [{}, {}] stored successfully",
88-
events.start_version, events.end_version
87+
"Events version {} stored successfully",
88+
events
89+
.get_versions()
90+
.iter()
91+
.map(|(x, y)| format!("[{}, {}]", x, y))
92+
.collect::<Vec<_>>()
93+
.join(", "),
8994
);
9095
Ok(Some(events))
9196
},
9297
Err(e) => Err(ProcessorError::DBStoreError {
9398
message: format!(
94-
"Failed to store events versions {} to {}: {:?}",
95-
events.start_version, events.end_version, e,
99+
"Failed to store events versions {:?}: {:?}",
100+
events
101+
.get_versions()
102+
.iter()
103+
.map(|(x, y)| format!("[{}, {}]", x, y))
104+
.collect::<Vec<_>>()
105+
.join(", "),
106+
e,
96107
),
97108
// TODO: fix it with a debug_query.
98109
query: None,

rust/sdk-processor/src/steps/fungible_asset_processor/fungible_asset_extractor.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,7 @@ impl Processable for FungibleAssetExtractor {
7171
current_unified_fungible_asset_balances,
7272
coin_supply,
7373
)],
74-
start_version: transactions.start_version,
75-
end_version: transactions.end_version,
76-
start_transaction_timestamp: transactions.start_transaction_timestamp,
77-
end_transaction_timestamp: transactions.end_transaction_timestamp,
78-
total_size_in_bytes: transactions.total_size_in_bytes,
74+
context: transactions.context,
7975
}))
8076
}
8177
}

0 commit comments

Comments
 (0)