Skip to content

Commit 7e7d3b8

Browse files
committed
migrate fa processor to sdk
1 parent 5772cdd commit 7e7d3b8

File tree

12 files changed

+375
-7
lines changed

12 files changed

+375
-7
lines changed

rust/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ testing-transactions = { path = "testing-transactions" }
3030

3131
ahash = { version = "0.8.7", features = ["serde"] }
3232
anyhow = "1.0.86"
33-
aptos-indexer-processor-sdk = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "9ecd252ccff53023664562001dd04c2886488c0d" }
34-
aptos-indexer-processor-sdk-server-framework = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "9ecd252ccff53023664562001dd04c2886488c0d" }
33+
aptos-indexer-processor-sdk = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "4923bbb8bf82298bc85dc3d16971fef2f22fc0b0" }
34+
aptos-indexer-processor-sdk-server-framework = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "4923bbb8bf82298bc85dc3d16971fef2f22fc0b0" }
3535
aptos-protos = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "5c48aee129b5a141be2792ffa3d9bd0a1a61c9cb" }
3636
aptos-system-utils = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "202bdccff2b2d333a385ae86a4fcf23e89da9f62" }
3737
aptos-indexer-test-transactions = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "202bdccff2b2d333a385ae86a4fcf23e89da9f62" }

rust/processor/src/processors/fungible_asset_processor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,7 @@ impl ProcessorTrait for FungibleAssetProcessor {
451451
}
452452

453453
/// V2 coin is called fungible assets and this flow includes all data from V1 in coin_processor
454-
async fn parse_v2_coin(
454+
pub async fn parse_v2_coin(
455455
transactions: &[Transaction],
456456
) -> (
457457
Vec<FungibleAssetActivity>,

rust/sdk-processor/src/config/indexer_processor_config.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
use super::{db_config::DbConfig, processor_config::ProcessorConfig};
5-
use crate::processors::events_processor::EventsProcessor;
5+
use crate::processors::{
6+
events_processor::EventsProcessor, fungible_asset_processor::FungibleAssetProcessor,
7+
};
8+
use ahash::HashSet;
69
use anyhow::Result;
710
use aptos_indexer_processor_sdk::aptos_indexer_transaction_stream::TransactionStreamConfig;
811
use aptos_indexer_processor_sdk_server_framework::RunnableConfig;
@@ -14,6 +17,10 @@ pub struct IndexerProcessorConfig {
1417
pub processor_config: ProcessorConfig,
1518
pub transaction_stream_config: TransactionStreamConfig,
1619
pub db_config: DbConfig,
20+
21+
// String vector for deprecated tables to skip db writes
22+
#[serde(default)]
23+
pub deprecated_tables: HashSet<String>,
1724
}
1825

1926
#[async_trait::async_trait]
@@ -24,6 +31,10 @@ impl RunnableConfig for IndexerProcessorConfig {
2431
let events_processor = EventsProcessor::new(self.clone()).await?;
2532
events_processor.run_processor().await
2633
},
34+
ProcessorConfig::FungibleAssetProcessor(_) => {
35+
let fungible_asset_processor = FungibleAssetProcessor::new(self.clone()).await?;
36+
fungible_asset_processor.run_processor().await
37+
},
2738
}
2839
}
2940

rust/sdk-processor/src/config/processor_config.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
use crate::processors::events_processor::EventsProcessorConfig;
1+
use crate::processors::{
2+
events_processor::EventsProcessorConfig, fungible_asset_processor::FungibleAssetProcessorConfig,
3+
};
24
use serde::{Deserialize, Serialize};
35

46
/// This enum captures the configs for all the different processors that are defined.
@@ -35,6 +37,7 @@ use serde::{Deserialize, Serialize};
3537
)]
3638
pub enum ProcessorConfig {
3739
EventsProcessor(EventsProcessorConfig),
40+
FungibleAssetProcessor(FungibleAssetProcessorConfig),
3841
}
3942

4043
impl ProcessorConfig {

rust/sdk-processor/src/processors/events_processor.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,15 @@ impl EventsProcessor {
9494
.await?;
9595
check_or_update_chain_id(grpc_chain_id as i64, self.db_pool.clone()).await?;
9696

97-
let ProcessorConfig::EventsProcessor(events_processor_config) =
98-
self.config.processor_config;
97+
let events_processor_config = match self.config.processor_config {
98+
ProcessorConfig::EventsProcessor(events_processor_config) => events_processor_config,
99+
_ => {
100+
return Err(anyhow::anyhow!(
101+
"Invalid processor config for EventsProcessor: {:?}",
102+
self.config.processor_config
103+
))
104+
},
105+
};
99106
let channel_size = events_processor_config.channel_size;
100107

101108
// Define processor steps
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
use crate::{
2+
config::{
3+
db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig,
4+
processor_config::ProcessorConfig,
5+
},
6+
steps::{
7+
common::latest_processed_version_tracker::LatestVersionProcessedTracker,
8+
events_processor::{EventsExtractor, EventsStorer},
9+
},
10+
utils::{
11+
chain_id::check_or_update_chain_id,
12+
database::{new_db_pool, run_migrations, ArcDbPool},
13+
starting_version::get_starting_version,
14+
},
15+
};
16+
use ahash::AHashMap;
17+
use anyhow::Result;
18+
use aptos_indexer_processor_sdk::{
19+
aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig},
20+
builder::ProcessorBuilder,
21+
common_steps::TransactionStreamStep,
22+
traits::IntoRunnableStep,
23+
};
24+
use serde::{Deserialize, Serialize};
25+
use tracing::{debug, info};
26+
27+
#[derive(Clone, Debug, Deserialize, Serialize)]
28+
#[serde(deny_unknown_fields)]
29+
pub struct FungibleAssetProcessorConfig {
30+
// Number of rows to insert, per chunk, for each DB table. Default per table is ~32,768 (2**16/2)
31+
#[serde(default = "AHashMap::new")]
32+
pub per_table_chunk_sizes: AHashMap<String, usize>,
33+
// Size of channel between steps
34+
#[serde(default = "FungibleAssetProcessorConfig::default_channel_size")]
35+
pub channel_size: usize,
36+
}
37+
38+
impl FungibleAssetProcessorConfig {
39+
pub const fn default_channel_size() -> usize {
40+
10
41+
}
42+
}
43+
44+
pub struct FungibleAssetProcessor {
45+
pub config: IndexerProcessorConfig,
46+
pub db_pool: ArcDbPool,
47+
}
48+
49+
impl FungibleAssetProcessor {
50+
pub async fn new(config: IndexerProcessorConfig) -> Result<Self> {
51+
match config.db_config {
52+
DbConfig::PostgresConfig(ref postgres_config) => {
53+
let conn_pool = new_db_pool(
54+
&postgres_config.connection_string,
55+
Some(postgres_config.db_pool_size),
56+
)
57+
.await
58+
.map_err(|e| {
59+
anyhow::anyhow!(
60+
"Failed to create connection pool for PostgresConfig: {:?}",
61+
e
62+
)
63+
})?;
64+
65+
Ok(Self {
66+
config,
67+
db_pool: conn_pool,
68+
})
69+
},
70+
}
71+
}
72+
73+
pub async fn run_processor(self) -> Result<()> {
74+
let processor_name = self.config.processor_config.name();
75+
76+
// (Optional) Run migrations
77+
match self.config.db_config {
78+
DbConfig::PostgresConfig(ref postgres_config) => {
79+
run_migrations(
80+
postgres_config.connection_string.clone(),
81+
self.db_pool.clone(),
82+
)
83+
.await;
84+
},
85+
}
86+
87+
// (Optional) Merge the starting version from config and the latest processed version from the DB
88+
let starting_version = get_starting_version(&self.config, self.db_pool.clone()).await?;
89+
90+
// (Optional) Check and update the ledger chain id to ensure we're indexing the correct chain
91+
let grpc_chain_id = TransactionStream::new(self.config.transaction_stream_config.clone())
92+
.await?
93+
.get_chain_id()
94+
.await?;
95+
check_or_update_chain_id(grpc_chain_id as i64, self.db_pool.clone()).await?;
96+
97+
let fa_config = match self.config.processor_config {
98+
ProcessorConfig::FungibleAssetProcessor(fa_config) => fa_config,
99+
_ => return Err(anyhow::anyhow!("Processor config is wrong type")),
100+
};
101+
let channel_size = fa_config.channel_size;
102+
103+
// Define processor steps
104+
let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig {
105+
starting_version: Some(starting_version),
106+
..self.config.transaction_stream_config
107+
})
108+
.await?;
109+
// let events_extractor = EventsExtractor {};
110+
// let events_storer = EventsStorer::new(self.db_pool.clone(), events_processor_config);
111+
let version_tracker = LatestVersionProcessedTracker::new(
112+
self.db_pool.clone(),
113+
starting_version,
114+
processor_name.to_string(),
115+
);
116+
117+
// Connect processor steps together
118+
let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step(
119+
transaction_stream.into_runnable_step(),
120+
)
121+
// .connect_to(events_extractor.into_runnable_step(), channel_size)
122+
// .connect_to(events_storer.into_runnable_step(), channel_size)
123+
.connect_to(version_tracker.into_runnable_step(), channel_size)
124+
.end_and_return_output_receiver(channel_size);
125+
126+
// (Optional) Parse the results
127+
loop {
128+
match buffer_receiver.recv().await {
129+
Ok(txn_context) => {
130+
if txn_context.data.is_empty() {
131+
continue;
132+
}
133+
debug!(
134+
"Finished processing versions [{:?}, {:?}]",
135+
txn_context.start_version, txn_context.end_version,
136+
);
137+
},
138+
Err(e) => {
139+
info!("No more transactions in channel: {:?}", e);
140+
break Ok(());
141+
},
142+
}
143+
}
144+
}
145+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
pub mod events_processor;
2+
pub mod fungible_asset_processor;

rust/sdk-processor/src/steps/fungible_asset_processor/fungible_asset_deduper.rs

Whitespace-only changes.
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
use aptos_indexer_processor_sdk::{
2+
aptos_protos::transaction::v1::Transaction,
3+
traits::{async_step::AsyncRunType, AsyncStep, NamedStep, Processable},
4+
types::transaction_context::TransactionContext,
5+
utils::errors::ProcessorError,
6+
};
7+
use async_trait::async_trait;
8+
use processor::{
9+
db::common::models::{
10+
coin_models::coin_supply::CoinSupply,
11+
fungible_asset_models::{
12+
v2_fungible_asset_activities::FungibleAssetActivity,
13+
v2_fungible_asset_balances::{
14+
CurrentFungibleAssetBalance, CurrentUnifiedFungibleAssetBalance,
15+
FungibleAssetBalance,
16+
},
17+
v2_fungible_metadata::FungibleAssetMetadataModel,
18+
},
19+
},
20+
processors::fungible_asset_processor::parse_v2_coin,
21+
};
22+
23+
pub struct FungibleAssetExtractor
24+
where
25+
Self: Sized + Send + 'static, {}
26+
27+
#[async_trait]
28+
impl Processable for FungibleAssetExtractor {
29+
type Input = Transaction;
30+
type Output = (
31+
Vec<FungibleAssetActivity>,
32+
Vec<FungibleAssetMetadataModel>,
33+
Vec<FungibleAssetBalance>,
34+
Vec<CurrentFungibleAssetBalance>,
35+
Vec<CurrentUnifiedFungibleAssetBalance>,
36+
Vec<CoinSupply>,
37+
);
38+
type RunType = AsyncRunType;
39+
40+
async fn process(
41+
&mut self,
42+
transactions: TransactionContext<Transaction>,
43+
) -> Result<
44+
Option<
45+
TransactionContext<(
46+
Vec<FungibleAssetActivity>,
47+
Vec<FungibleAssetMetadataModel>,
48+
Vec<FungibleAssetBalance>,
49+
Vec<CurrentFungibleAssetBalance>,
50+
Vec<CurrentUnifiedFungibleAssetBalance>,
51+
Vec<CoinSupply>,
52+
)>,
53+
>,
54+
ProcessorError,
55+
> {
56+
let (
57+
fungible_asset_activities,
58+
fungible_asset_metadata,
59+
fungible_asset_balances,
60+
current_fungible_asset_balances,
61+
current_unified_fungible_asset_balances,
62+
coin_supply,
63+
) = parse_v2_coin(&transactions.data).await;
64+
65+
Ok(Some(TransactionContext {
66+
data: vec![(
67+
fungible_asset_activities,
68+
fungible_asset_metadata,
69+
fungible_asset_balances,
70+
current_fungible_asset_balances,
71+
current_unified_fungible_asset_balances,
72+
coin_supply,
73+
)],
74+
start_version: transactions.start_version,
75+
end_version: transactions.end_version,
76+
start_transaction_timestamp: transactions.start_transaction_timestamp,
77+
end_transaction_timestamp: transactions.end_transaction_timestamp,
78+
total_size_in_bytes: transactions.total_size_in_bytes,
79+
}))
80+
}
81+
}
82+
83+
impl AsyncStep for FungibleAssetExtractor {}
84+
85+
impl NamedStep for FungibleAssetExtractor {
86+
fn name(&self) -> String {
87+
"FungibleAssetExtractor".to_string()
88+
}
89+
}

0 commit comments

Comments
 (0)