Skip to content

Commit cf81b98

Browse files
committed
fix(coprocessor): host-listener, dependency chain, step 1
1 parent 88b6094 commit cf81b98

File tree

4 files changed

+384
-78
lines changed

4 files changed

+384
-78
lines changed
Lines changed: 289 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,289 @@
1+
use std::collections::hash_map::Entry;
2+
use std::collections::{HashMap, HashSet, VecDeque};
3+
4+
use tracing::{error, warn};
5+
6+
use crate::database::tfhe_event_propagate::{
7+
tfhe_inputs_handle, tfhe_result_handle,
8+
};
9+
use crate::database::tfhe_event_propagate::{
10+
ChainCache, ChainDependency, Chains, Handle, LogTfhe, TransactionHash,
11+
};
12+
13+
type HandleToTransaction = HashMap<Handle, (usize, TransactionHash)>;
14+
type Transactions = VecDeque<TransactionHash>;
15+
type TransactionToChain = HashMap<TransactionHash, ChainDependency>;
16+
17+
fn scan_transactions(logs: &[LogTfhe]) -> (HandleToTransaction, Transactions) {
18+
let nb_logs = logs.len();
19+
let mut prev_tx = None;
20+
let mut seen_txs = HashSet::with_capacity(nb_logs);
21+
let mut txs = VecDeque::with_capacity(nb_logs);
22+
let mut handle_to_tx = HashMap::with_capacity(nb_logs);
23+
for (age, log) in logs.iter().enumerate() {
24+
let tx = log.transaction_hash.unwrap_or_default();
25+
if prev_tx != Some(tx) && !seen_txs.contains(&tx) {
26+
prev_tx = Some(tx);
27+
seen_txs.insert(tx);
28+
txs.push_front(tx);
29+
}
30+
if let Some(handle) = tfhe_result_handle(&log.event) {
31+
handle_to_tx.insert(handle, (age, tx));
32+
}
33+
}
34+
(handle_to_tx, txs)
35+
}
36+
37+
async fn scan_transactions_dependencies(
38+
logs: &[LogTfhe],
39+
handle_to_tx: HandleToTransaction,
40+
nb_txs: usize,
41+
past_chains: &ChainCache,
42+
) -> (
43+
HashMap<TransactionHash, (usize, TransactionHash)>,
44+
TransactionToChain,
45+
) {
46+
// tx to its direct dependences or chain
47+
let mut tx_direct_tx_dependency: HashMap<
48+
TransactionHash,
49+
(usize, TransactionHash),
50+
> = HashMap::with_capacity(nb_txs);
51+
let mut tx_chain_dependency = HashMap::with_capacity(nb_txs);
52+
for log in logs {
53+
let tx = log.transaction_hash.unwrap_or_default();
54+
let log_inputs = tfhe_inputs_handle(&log.event);
55+
for input in log_inputs {
56+
let same_block_dep = tx_direct_tx_dependency.entry(tx);
57+
let already_one = matches!(same_block_dep, Entry::Occupied(_));
58+
if let Some((age, dep_tx)) = handle_to_tx.get(&input) {
59+
// block local, we keep the most recent tx only, assuming correct order
60+
let should_insert = match same_block_dep {
61+
Entry::Vacant(_) => true,
62+
Entry::Occupied(ref e) => *age > e.get().0 && tx != *dep_tx,
63+
};
64+
if should_insert {
65+
same_block_dep.insert_entry((*age, *dep_tx));
66+
}
67+
continue;
68+
}
69+
if already_one {
70+
// this tx already has a local dependency, skip checking past ones
71+
continue;
72+
}
73+
let pre_block_dep = tx_chain_dependency
74+
.entry(log.transaction_hash.unwrap_or_default());
75+
if let Entry::Occupied(_) = pre_block_dep {
76+
// this tx already has a chain dependency, skip checking past ones
77+
continue;
78+
}
79+
if let Some(&dep_tx) = past_chains.write().await.get(&input) {
80+
// from a previous block
81+
pre_block_dep.insert_entry(dep_tx);
82+
}
83+
// no dependency or only unknown ones
84+
}
85+
}
86+
(tx_direct_tx_dependency, tx_chain_dependency)
87+
}
88+
89+
fn assign_tx_a_chain_dependency(
90+
mut txs: Transactions,
91+
tx_direct_tx_dependency: &HashMap<
92+
TransactionHash,
93+
(usize, TransactionHash),
94+
>,
95+
tx_chain_dependency: &HashMap<TransactionHash, ChainDependency>,
96+
) -> (TransactionToChain, Chains) {
97+
let nb_txs = txs.len();
98+
let mut txn_to_chain_dep = HashMap::with_capacity(nb_txs);
99+
let mut chains = HashSet::with_capacity(nb_txs);
100+
// tx to its chain dependency
101+
while let Some(tx) = txs.pop_front() {
102+
let chain_dep = txn_to_chain_dep.entry(tx);
103+
if let Entry::Occupied(_) = chain_dep {
104+
// already done
105+
continue;
106+
}
107+
// in block dependency we propagate the chain
108+
let chain = if let Some(direct_deps) = tx_direct_tx_dependency.get(&tx)
109+
{
110+
let (_age, tx_dep) = direct_deps;
111+
let Some(chain) = txn_to_chain_dep.get(tx_dep) else {
112+
warn!(?tx, "Out of order transactions");
113+
// only happens if logs are out of order
114+
txs.push_back(tx);
115+
// let's do its dependency first
116+
txs.push_back(*tx_dep);
117+
continue;
118+
};
119+
*chain
120+
} else if let Some(chain) = tx_chain_dependency.get(&tx) {
121+
*chain
122+
} else {
123+
// no dependency or unknown ones
124+
// createa new chain
125+
tx
126+
};
127+
txn_to_chain_dep.insert(tx, chain);
128+
chains.insert(chain);
129+
}
130+
(txn_to_chain_dep, chains)
131+
}
132+
133+
pub async fn assign_logs_a_chain_dependency(
134+
logs: &mut [LogTfhe],
135+
txn_to_chain_dep: &TransactionToChain,
136+
past_chains: &ChainCache,
137+
) {
138+
let mut past_chains_write = past_chains.write().await;
139+
for log in logs {
140+
let tx = log.transaction_hash.unwrap_or_default();
141+
let chain_dep = txn_to_chain_dep.get(&tx);
142+
if chain_dep.is_none() {
143+
error!(?tx, "No chain dependency found for transaction");
144+
}
145+
let chain_dep = *chain_dep.unwrap_or(&tx);
146+
log.dependence_chain = chain_dep;
147+
if log.is_allowed == false {
148+
// cannot be reused in future blocks
149+
continue;
150+
}
151+
// update past chains cache for next block
152+
if let Some(handle) = tfhe_result_handle(&log.event) {
153+
past_chains_write.put(handle, chain_dep);
154+
}
155+
}
156+
}
157+
158+
pub async fn dependence_chains(
159+
logs: &mut [LogTfhe],
160+
past_chains: &ChainCache,
161+
) -> Chains {
162+
// handle to transaction
163+
let (handle_to_tx, txs) = scan_transactions(logs);
164+
165+
let nb_txs = txs.len();
166+
let (tx_direct_tx_dependency, tx_chain_dependency) =
167+
scan_transactions_dependencies(logs, handle_to_tx, nb_txs, past_chains)
168+
.await;
169+
170+
let (txn_to_chain_dep, chains) = assign_tx_a_chain_dependency(
171+
txs,
172+
&tx_direct_tx_dependency,
173+
&tx_chain_dependency,
174+
);
175+
176+
assign_logs_a_chain_dependency(logs, &txn_to_chain_dep, past_chains).await;
177+
chains
178+
}
179+
180+
#[cfg(test)]
181+
mod tests {
182+
// use super::*;
183+
use alloy::rpc::types::Log;
184+
185+
use crate::{
186+
contracts::TfheContract::TfheContractEvents as E,
187+
database::tfhe_event_propagate::ClearConst,
188+
};
189+
190+
fn push_event(
191+
e: E,
192+
logs: &mut Vec<Log>,
193+
tx: Option<TransactionHash>,
194+
) -> Handle {
195+
let Some(tx) = tx else {
196+
if let Some(log) = logs.last() {
197+
log.transaction_hash
198+
} else {
199+
TransactionHash::ZERO
200+
}
201+
};
202+
logs.push(Log {
203+
inner: e,
204+
block_hash: None,
205+
block_number: None,
206+
block_timestamp: None,
207+
transaction_hash: Some(tx),
208+
transaction_index: None,
209+
log_index: None,
210+
removed: true,
211+
})
212+
}
213+
214+
fn new_handle() -> Handle {
215+
static HANDLE_COUNTER: std::sync::atomic::AtomicU64 =
216+
std::sync::atomic::AtomicU64::new(1);
217+
let id =
218+
HANDLE_COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
219+
Handle::from_u64(id)
220+
}
221+
222+
fn input_handle(
223+
logs: &mut Vec<Log>,
224+
tx: Option<TransactionHash>,
225+
) -> Handle {
226+
push_event(
227+
E::TrivialEncrypt(C::TrivialEncrypt {
228+
caller: alloy::types::Address::ZERO,
229+
pt: ClearConst::from_u64(0),
230+
toType: 0,
231+
resultHandle: new_handle(),
232+
}),
233+
logs,
234+
tx,
235+
)
236+
}
237+
238+
fn op1(
239+
handle: Handle,
240+
logs: &mut Vec<Log>,
241+
tx: Option<TransactionHash>,
242+
) -> E {
243+
push_event(
244+
E::FheAdd(C::FheAdd {
245+
lhs: handle,
246+
rhs: handle,
247+
scalarByte: 1,
248+
result: new_handle(),
249+
caller: alloy::types::Address::ZERO,
250+
}),
251+
logs,
252+
tx,
253+
)
254+
}
255+
256+
fn op2(
257+
handle1: Handle,
258+
handle2: Handle,
259+
logs: &mut Vec<Log>,
260+
tx: Option<TransactionHash>,
261+
) -> E {
262+
push_event(
263+
E::FheAdd(C::FheAdd {
264+
lhs: handle1,
265+
rhs: handle2,
266+
scalarByte: 1,
267+
result: new_handle(),
268+
caller: alloy::types::Address::ZERO,
269+
}),
270+
logs,
271+
tx,
272+
)
273+
}
274+
275+
#[test]
276+
fn test_scan_transactions_tx() {
277+
let mut logs = vec![];
278+
let v0 = input_handle(&mut logs, None);
279+
let v1 = op(v0, &mut logs, None);
280+
let (handle_to_tx, txs) = scan_transactions(&mut logs);
281+
assert_eq!(txs.len(), 1);
282+
assert_eq!(handle_to_tx.len(), 2);
283+
284+
let v2 = input_handle(&mut logs, TransactionHash::with_last_byte(1));
285+
let (handle_to_tx, txs) = scan_transactions(&mut logs);
286+
assert_eq!(txs.len(), 2);
287+
assert_eq!(handle_to_tx.len(), 3);
288+
}
289+
}

coprocessor/fhevm-engine/host-listener/src/database/ingest.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use tracing::{error, info};
99

1010
use crate::cmd::block_history::BlockSummary;
1111
use crate::contracts::{AclContract, TfheContract};
12+
use crate::database::dependence_chains::dependence_chains;
1213
use crate::database::tfhe_event_propagate::{
1314
acl_result_handles, tfhe_result_handle, Database, LogTfhe,
1415
};
@@ -100,9 +101,11 @@ pub async fn ingest_block_logs(
100101
let log = LogTfhe {
101102
event,
102103
transaction_hash: log.transaction_hash,
103-
is_allowed: false, // updated in the next loop
104104
block_number,
105105
block_timestamp,
106+
// updated in the next loop and dependence_chains
107+
is_allowed: false,
108+
dependence_chain: Default::default(),
106109
};
107110
tfhe_event_log.push(log);
108111
continue;
@@ -119,18 +122,19 @@ pub async fn ingest_block_logs(
119122
);
120123
}
121124
}
122-
123-
for tfhe_log in tfhe_event_log {
124-
let is_allowed =
125+
for tfhe_log in tfhe_event_log.iter_mut() {
126+
tfhe_log.is_allowed =
125127
if let Some(result_handle) = tfhe_result_handle(&tfhe_log.event) {
126128
is_allowed.contains(&result_handle.to_vec())
127129
} else {
128130
false
129131
};
130-
let tfhe_log = LogTfhe {
131-
is_allowed,
132-
..tfhe_log
133-
};
132+
}
133+
134+
let chains =
135+
dependence_chains(&mut tfhe_event_log, &db.dependence_chain).await;
136+
137+
for tfhe_log in tfhe_event_log {
134138
let inserted = db.insert_tfhe_event(&mut tx, &tfhe_log).await?;
135139
if block_logs.catchup && inserted {
136140
info!(tfhe_log = ?tfhe_log, "TFHE event missed before");
@@ -147,5 +151,6 @@ pub async fn ingest_block_logs(
147151
}
148152

149153
db.mark_block_as_valid(&mut tx, &block_logs.summary).await?;
154+
db.update_dependence_chain(&mut tx, chains).await?;
150155
tx.commit().await
151156
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
1+
pub mod dependence_chains;
12
pub mod ingest;
23
pub mod tfhe_event_propagate;

0 commit comments

Comments
 (0)