Skip to content

Commit 91a9826

Browse files
authored
Refactor Staking Models and Create Parquet Stake Processor (#647)
1 parent 3064a07 commit 91a9826

31 files changed

+2063
-920
lines changed

rust/processor/src/db/common/models/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@ pub mod default_models;
44
pub mod event_models;
55
pub mod fungible_asset_models;
66
pub mod object_models;
7+
pub mod stake_models;
78
pub mod token_v2_models;

rust/processor/src/db/postgres/models/stake_models/current_delegated_voter.rs renamed to rust/processor/src/db/common/models/stake_models/current_delegated_voter.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@
44
// This is required because a diesel macro makes clippy sad
55
#![allow(clippy::extra_unused_lifetimes)]
66

7-
use super::{
8-
delegator_balances::{CurrentDelegatorBalance, ShareToStakingPoolMapping},
9-
stake_utils::VoteDelegationTableItem,
10-
};
7+
use super::delegator_balances::ShareToRawStakingPoolMapping;
118
use crate::{
9+
db::common::models::stake_models::{
10+
delegator_balances::RawCurrentDelegatorBalance, stake_utils::VoteDelegationTableItem,
11+
},
1212
schema::current_delegated_voter,
1313
utils::{database::DbPoolConnection, util::standardize_address},
1414
};
@@ -135,14 +135,14 @@ impl CurrentDelegatedVoter {
135135
write_table_item: &WriteTableItem,
136136
txn_version: i64,
137137
txn_timestamp: chrono::NaiveDateTime,
138-
active_pool_to_staking_pool: &ShareToStakingPoolMapping,
138+
active_pool_to_staking_pool: &ShareToRawStakingPoolMapping,
139139
previous_delegated_voters: &CurrentDelegatedVoterMap,
140140
conn: &mut DbPoolConnection<'_>,
141141
query_retries: u32,
142142
query_retry_delay_ms: u64,
143143
) -> anyhow::Result<Option<Self>> {
144144
if let Some((_, active_balance)) =
145-
CurrentDelegatorBalance::get_active_share_from_write_table_item(
145+
RawCurrentDelegatorBalance::get_active_share_from_write_table_item(
146146
write_table_item,
147147
txn_version,
148148
0, // placeholder
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
// Copyright © Aptos Foundation
2+
3+
// This is required because a diesel macro makes clippy sad
4+
#![allow(clippy::extra_unused_lifetimes)]
5+
6+
use crate::{
7+
db::common::models::stake_models::stake_utils::StakeEvent,
8+
utils::{
9+
counters::PROCESSOR_UNKNOWN_TYPE_COUNT,
10+
util::{parse_timestamp, standardize_address, u64_to_bigdecimal},
11+
},
12+
};
13+
use aptos_protos::transaction::v1::{transaction::TxnData, Transaction};
14+
use bigdecimal::BigDecimal;
15+
use serde::{Deserialize, Serialize};
16+
17+
#[derive(Clone, Debug, Deserialize, Serialize)]
18+
pub struct RawDelegatedStakingActivity {
19+
pub transaction_version: i64,
20+
pub event_index: i64,
21+
pub delegator_address: String,
22+
pub pool_address: String,
23+
pub event_type: String,
24+
pub amount: BigDecimal,
25+
pub block_timestamp: chrono::NaiveDateTime,
26+
}
27+
pub trait RawDelegatedStakingActivityConvertible {
28+
fn from_raw(raw: RawDelegatedStakingActivity) -> Self;
29+
}
30+
31+
impl RawDelegatedStakingActivity {
32+
/// Pretty straightforward parsing from known delegated staking events
33+
pub fn from_transaction(transaction: &Transaction) -> anyhow::Result<Vec<Self>> {
34+
let mut delegator_activities = vec![];
35+
let txn_data = match transaction.txn_data.as_ref() {
36+
Some(data) => data,
37+
None => {
38+
PROCESSOR_UNKNOWN_TYPE_COUNT
39+
.with_label_values(&["DelegatedStakingActivity"])
40+
.inc();
41+
tracing::warn!(
42+
transaction_version = transaction.version,
43+
"Transaction data doesn't exist",
44+
);
45+
return Ok(delegator_activities);
46+
},
47+
};
48+
49+
let txn_version = transaction.version as i64;
50+
let events = match txn_data {
51+
TxnData::User(txn) => &txn.events,
52+
TxnData::BlockMetadata(txn) => &txn.events,
53+
TxnData::Validator(txn) => &txn.events,
54+
_ => return Ok(delegator_activities),
55+
};
56+
let block_timestamp = parse_timestamp(transaction.timestamp.as_ref().unwrap(), txn_version);
57+
for (index, event) in events.iter().enumerate() {
58+
let event_index = index as i64;
59+
if let Some(staking_event) =
60+
StakeEvent::from_event(event.type_str.as_str(), &event.data, txn_version)?
61+
{
62+
let activity = match staking_event {
63+
StakeEvent::AddStakeEvent(inner) => RawDelegatedStakingActivity {
64+
transaction_version: txn_version,
65+
event_index,
66+
delegator_address: standardize_address(&inner.delegator_address),
67+
pool_address: standardize_address(&inner.pool_address),
68+
event_type: event.type_str.clone(),
69+
amount: u64_to_bigdecimal(inner.amount_added),
70+
block_timestamp,
71+
},
72+
StakeEvent::UnlockStakeEvent(inner) => RawDelegatedStakingActivity {
73+
transaction_version: txn_version,
74+
event_index,
75+
delegator_address: standardize_address(&inner.delegator_address),
76+
pool_address: standardize_address(&inner.pool_address),
77+
event_type: event.type_str.clone(),
78+
amount: u64_to_bigdecimal(inner.amount_unlocked),
79+
block_timestamp,
80+
},
81+
StakeEvent::WithdrawStakeEvent(inner) => RawDelegatedStakingActivity {
82+
transaction_version: txn_version,
83+
event_index,
84+
delegator_address: standardize_address(&inner.delegator_address),
85+
pool_address: standardize_address(&inner.pool_address),
86+
event_type: event.type_str.clone(),
87+
amount: u64_to_bigdecimal(inner.amount_withdrawn),
88+
block_timestamp,
89+
},
90+
StakeEvent::ReactivateStakeEvent(inner) => RawDelegatedStakingActivity {
91+
transaction_version: txn_version,
92+
event_index,
93+
delegator_address: standardize_address(&inner.delegator_address),
94+
pool_address: standardize_address(&inner.pool_address),
95+
event_type: event.type_str.clone(),
96+
amount: u64_to_bigdecimal(inner.amount_reactivated),
97+
block_timestamp,
98+
},
99+
StakeEvent::DistributeRewardsEvent(inner) => RawDelegatedStakingActivity {
100+
transaction_version: txn_version,
101+
event_index,
102+
delegator_address: "".to_string(),
103+
pool_address: standardize_address(&inner.pool_address),
104+
event_type: event.type_str.clone(),
105+
amount: u64_to_bigdecimal(inner.rewards_amount),
106+
block_timestamp,
107+
},
108+
_ => continue,
109+
};
110+
delegator_activities.push(activity);
111+
}
112+
}
113+
Ok(delegator_activities)
114+
}
115+
}

0 commit comments

Comments
 (0)