Skip to content

Commit c41bcd4

Browse files
committed
add dedup step
1 parent 55c3294 commit c41bcd4

File tree

5 files changed

+161
-10
lines changed

5 files changed

+161
-10
lines changed

rust/Cargo.lock

Lines changed: 7 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ testing-transactions = { path = "testing-transactions" }
3030

3131
ahash = { version = "0.8.7", features = ["serde"] }
3232
anyhow = "1.0.86"
33-
aptos-indexer-processor-sdk = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "4923bbb8bf82298bc85dc3d16971fef2f22fc0b0" }
34-
aptos-indexer-processor-sdk-server-framework = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "4923bbb8bf82298bc85dc3d16971fef2f22fc0b0" }
33+
aptos-indexer-processor-sdk = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "028e5d3df748fbb846f2058159bd1f8304c96aba" }
34+
aptos-indexer-processor-sdk-server-framework = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "028e5d3df748fbb846f2058159bd1f8304c96aba" }
3535
aptos-protos = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "5c48aee129b5a141be2792ffa3d9bd0a1a61c9cb" }
3636
aptos-system-utils = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "202bdccff2b2d333a385ae86a4fcf23e89da9f62" }
3737
aptos-indexer-test-transactions = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "202bdccff2b2d333a385ae86a4fcf23e89da9f62" }

rust/processor/src/db/common/models/fungible_asset_models/v2_fungible_asset_balances.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,10 @@ use bigdecimal::{BigDecimal, Zero};
2929
use field_count::FieldCount;
3030
use lazy_static::lazy_static;
3131
use serde::{Deserialize, Serialize};
32-
use std::borrow::Borrow;
32+
use std::{
33+
borrow::Borrow,
34+
hash::{Hash, Hasher},
35+
};
3336

3437
// Storage id
3538
pub type CurrentFungibleAssetBalancePK = String;
@@ -154,6 +157,12 @@ pub struct CurrentFungibleAssetBalance {
154157
pub token_standard: String,
155158
}
156159

160+
// impl Hash for CurrentFungibleAssetBalance {
161+
// fn hash<H: Hasher>(&self, state: &mut H) {
162+
// self.storage_id.hash(state);
163+
// }
164+
// }
165+
157166
/// Note that this used to be called current_unified_fungible_asset_balances_to_be_renamed
158167
/// and was renamed to current_fungible_asset_balances to facilitate migration
159168
#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize, Default)]

rust/sdk-processor/src/processors/fungible_asset_processor.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use crate::{
77
common::latest_processed_version_tracker::LatestVersionProcessedTracker,
88
events_processor::{EventsExtractor, EventsStorer},
99
fungible_asset_processor::{
10+
fungible_asset_deduper::FungibleAssetDeduper,
1011
fungible_asset_extractor::FungibleAssetExtractor,
1112
fungible_asset_storer::FungibleAssetStorer,
1213
},
@@ -26,6 +27,7 @@ use aptos_indexer_processor_sdk::{
2627
traits::IntoRunnableStep,
2728
};
2829
use serde::{Deserialize, Serialize};
30+
use std::time::Duration;
2931
use tracing::{debug, info};
3032

3133
#[derive(Clone, Debug, Deserialize, Serialize)]
@@ -111,6 +113,7 @@ impl FungibleAssetProcessor {
111113
})
112114
.await?;
113115
let fa_extractor = FungibleAssetExtractor {};
116+
let fa_deduper = FungibleAssetDeduper::new(Duration::from_secs(1));
114117
let fa_storer = FungibleAssetStorer::new(self.db_pool.clone(), fa_config);
115118
let version_tracker = LatestVersionProcessedTracker::new(
116119
self.db_pool.clone(),
@@ -123,6 +126,7 @@ impl FungibleAssetProcessor {
123126
transaction_stream.into_runnable_step(),
124127
)
125128
.connect_to(fa_extractor.into_runnable_step(), channel_size)
129+
.connect_to(fa_deduper.into_runnable_step(), channel_size)
126130
.connect_to(fa_storer.into_runnable_step(), channel_size)
127131
.connect_to(version_tracker.into_runnable_step(), channel_size)
128132
.end_and_return_output_receiver(channel_size);
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
use ahash::AHashMap;
2+
use aptos_indexer_processor_sdk::{
3+
aptos_protos::transaction::v1::Transaction,
4+
traits::{
5+
async_step::AsyncRunType, AsyncStep, NamedStep, PollableAsyncRunType, PollableAsyncStep,
6+
Processable,
7+
},
8+
types::transaction_context::{Context, TransactionContext},
9+
utils::errors::ProcessorError,
10+
};
11+
use async_trait::async_trait;
12+
use diesel::associations::Identifiable;
13+
use processor::db::common::models::{
14+
coin_models::coin_supply::CoinSupply,
15+
fungible_asset_models::{
16+
v2_fungible_asset_activities::FungibleAssetActivity,
17+
v2_fungible_asset_balances::{
18+
CurrentFungibleAssetBalance, CurrentFungibleAssetBalancePK,
19+
CurrentUnifiedFungibleAssetBalance, FungibleAssetBalance,
20+
},
21+
v2_fungible_metadata::FungibleAssetMetadataModel,
22+
},
23+
};
24+
use std::{collections::BTreeSet, time::Duration};
25+
26+
pub struct FungibleAssetDeduper
27+
where
28+
Self: Sized + Send + 'static,
29+
{
30+
// The duration to collect and dedup data before releasing it
31+
poll_interval: Duration,
32+
merged_cfab: AHashMap<CurrentFungibleAssetBalancePK, CurrentFungibleAssetBalance>,
33+
contexts: BTreeSet<Context>,
34+
}
35+
36+
impl FungibleAssetDeduper
37+
where
38+
Self: Sized + Send + 'static,
39+
{
40+
pub fn new(poll_interval: Duration) -> Self {
41+
Self {
42+
poll_interval,
43+
merged_cfab: AHashMap::new(),
44+
contexts: BTreeSet::new(),
45+
}
46+
}
47+
}
48+
49+
#[async_trait]
50+
impl Processable for FungibleAssetDeduper {
51+
type Input = (
52+
Vec<FungibleAssetActivity>,
53+
Vec<FungibleAssetMetadataModel>,
54+
Vec<FungibleAssetBalance>,
55+
Vec<CurrentFungibleAssetBalance>,
56+
Vec<CurrentUnifiedFungibleAssetBalance>,
57+
Vec<CoinSupply>,
58+
);
59+
// type Output = (
60+
// &'static [FungibleAssetActivity],
61+
// &'static [FungibleAssetMetadataModel],
62+
// &'static [FungibleAssetBalance],
63+
// &'static [CurrentFungibleAssetBalance],
64+
// &'static [CurrentUnifiedFungibleAssetBalance],
65+
// &'static [CoinSupply],
66+
// );
67+
type Output = (
68+
Vec<FungibleAssetActivity>,
69+
Vec<FungibleAssetMetadataModel>,
70+
Vec<FungibleAssetBalance>,
71+
Vec<CurrentFungibleAssetBalance>,
72+
Vec<CurrentUnifiedFungibleAssetBalance>,
73+
Vec<CoinSupply>,
74+
);
75+
type RunType = PollableAsyncRunType;
76+
77+
async fn process(
78+
&mut self,
79+
items: TransactionContext<Self::Input>,
80+
) -> Result<Option<TransactionContext<Self::Output>>, ProcessorError> {
81+
let (_, _, _, cfab, _, _) = &items.data[0];
82+
83+
// Update transaction contexts
84+
for context in items.context.iter() {
85+
self.contexts.insert(context.clone());
86+
}
87+
88+
// Dedup
89+
for balance in cfab.iter() {
90+
self.merged_cfab
91+
.insert(balance.id().to_string(), balance.clone());
92+
}
93+
94+
Ok(None)
95+
}
96+
}
97+
98+
#[async_trait]
99+
impl PollableAsyncStep for FungibleAssetDeduper {
100+
fn poll_interval(&self) -> Duration {
101+
self.poll_interval
102+
}
103+
104+
async fn poll(
105+
&mut self,
106+
) -> Result<Option<Vec<TransactionContext<Self::Output>>>, ProcessorError> {
107+
// let faa: &[FungibleAssetActivity] = &[];
108+
// let fam: &[FungibleAssetMetadataModel] = &[];
109+
// let fab: &[FungibleAssetBalance] = &[];
110+
// let cfab: &[CurrentFungibleAssetBalance] = &[];
111+
// let cufab: &[CurrentUnifiedFungibleAssetBalance] = &[];
112+
// let cs: &[CoinSupply] = &[];
113+
// let cfab: AHashMap<String, CurrentFungibleAssetBalance> =
114+
// std::mem::take(&mut self.merged_cfab);
115+
// let cfab = cfab.values().cloned().collect::<Vec<_>>();
116+
let transaction_context = TransactionContext::new_with_contexts(
117+
vec![(
118+
vec![],
119+
vec![],
120+
vec![],
121+
std::mem::take(&mut self.merged_cfab)
122+
.values()
123+
.cloned()
124+
.collect::<Vec<_>>(),
125+
vec![],
126+
vec![],
127+
)],
128+
self.contexts.clone(),
129+
);
130+
Ok(Some(vec![transaction_context]))
131+
}
132+
}
133+
134+
impl NamedStep for FungibleAssetDeduper {
135+
fn name(&self) -> String {
136+
"FungibleAssetDeduper".to_string()
137+
}
138+
}

0 commit comments

Comments
 (0)