diff --git a/agent/flow-trace/00_INDEX.md b/agent/flow-trace/00_INDEX.md index d7299a280..b0aa8fddd 100644 --- a/agent/flow-trace/00_INDEX.md +++ b/agent/flow-trace/00_INDEX.md @@ -191,3 +191,4 @@ _Found during source-code cross-referencing of these trace documents._ | 4 | **`activate` CLI command is misleading** | Low | Named "activate" but actually calls "register" — will fail for already-registered operators. There's no standalone way to trigger re-evaluation of active status; instead, `_updateOperatorStatus()` runs automatically inside `addTicketBalance()`, `bondLicense()`, etc. | | 5 | **Active-job load balancing bug fixed** | Info | The Rust `NodeStateStore.available_tickets()` subtracts `active_jobs` from total tickets, reducing the chance of busy nodes being selected for new E3s. Previously, the `Sortition` actor's `Handler` was missing match arms for `E3Failed` and `E3StageChanged`, causing these events to fall to the default `_ => ()` — the typed handlers for decrementing jobs were dead code. This has been fixed: E3Failed and E3StageChanged are now routed to their handlers, and `finalized_committees` is cleaned up in `decrement_jobs_for_e3` to prevent unbounded memory growth. | | 6 | **Committee member expulsion** | Info | `SlashingManager` can call `expelCommitteeMember()` mid-DKG. The `Sortition` actor enriches the raw `CommitteeMemberExpelled` event with the expelled member's `party_id` (resolved from its stored `Committee` list) and re-publishes it. `ThresholdKeyshare` then uses the enriched `party_id` to update its collectors, potentially completing DKG with fewer parties. `ThresholdKeyshare` itself does not hold committee state. | +| 7 | **NodeProofAggregator stall bridge fixed** | Info | `NodeProofAggregator` no longer drops `DKGInnerProofReady` events that arrive before `ThresholdSharePending`; it prebuffers them until collection state exists. It also converts `NodeDkgFold` `ComputeRequestError` into `DKGRecursiveAggregationComplete { aggregated_proof: None }` instead of silently discarding actor state, preventing DKG proof aggregation stalls when fold workers fail or events arrive slightly out of order. | diff --git a/agent/flow-trace/04_DKG_AND_COMPUTATION.md b/agent/flow-trace/04_DKG_AND_COMPUTATION.md index f4cc411a4..3aafc938c 100644 --- a/agent/flow-trace/04_DKG_AND_COMPUTATION.md +++ b/agent/flow-trace/04_DKG_AND_COMPUTATION.md @@ -277,9 +277,15 @@ The per-circuit `wrapper/` Noir step was removed; aggregator response structs no implements `ZkRequest::NodeDkgFold` (full per-node pipeline to a `NodeFold` proof), `ZkRequest::DkgAggregation` (`NodesFold` + C5 + `DkgAggregator`), and `ZkRequest::DecryptionAggregation` (per-ciphertext `C6Fold` + C7 + `DecryptionAggregator`). -`NodeProofAggregator` buffers all `DKGInnerProofReady` proofs then issues one `NodeDkgFold` request; -`PublicKeyAggregator` and `ThresholdPlaintextAggregator` dispatch the aggregator requests instead of -pairwise folding. +`NodeProofAggregator` prebuffers `DKGInnerProofReady` proofs that arrive before +`ThresholdSharePending`, drains those buffered proofs into collection state once +`ThresholdSharePending` arrives, and issues one `NodeDkgFold` request when the +full ordered proof set is available. If that `NodeDkgFold` compute request fails, +it publishes `DKGRecursiveAggregationComplete { aggregated_proof: None }` so the +downstream DKG/public-key aggregation path can terminate deterministically instead +of stalling on missing node-fold output. `PublicKeyAggregator` and +`ThresholdPlaintextAggregator` dispatch the aggregator requests instead of pairwise +folding. ### Step 6: Collect All Threshold Shares (with C2/C3 Verification) diff --git a/crates/events/src/enclave_event/dkg_inner_proof_ready.rs b/crates/events/src/enclave_event/dkg_inner_proof_ready.rs index 1a1e32480..867da5357 100644 --- a/crates/events/src/enclave_event/dkg_inner_proof_ready.rs +++ b/crates/events/src/enclave_event/dkg_inner_proof_ready.rs @@ -17,8 +17,9 @@ use serde::{Deserialize, Serialize}; /// Emitted for every inner circuit (C0–C4) when available. `seq` gives the deterministic ordering. /// /// The total count of expected proofs is communicated separately via -/// [`ThresholdSharePending`], which is always published before the first -/// `DKGInnerProofReady` for a given E3. +/// [`ThresholdSharePending`]. The normal flow publishes that event first, but +/// [`NodeProofAggregator`] also tolerates earlier `DKGInnerProofReady` arrivals +/// by prebuffering them until collection state exists. #[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct DKGInnerProofReady { pub e3_id: E3id, diff --git a/crates/zk-prover/src/actors/node_proof_aggregator.rs b/crates/zk-prover/src/actors/node_proof_aggregator.rs index e4d38e444..8fed4f778 100644 --- a/crates/zk-prover/src/actors/node_proof_aggregator.rs +++ b/crates/zk-prover/src/actors/node_proof_aggregator.rs @@ -46,6 +46,7 @@ pub struct NodeProofAggregator { bus: BusHandle, states: HashMap, fold_correlation: HashMap, + pending_inner_proofs: HashMap>, } impl NodeProofAggregator { @@ -54,6 +55,7 @@ impl NodeProofAggregator { bus: bus.clone(), states: HashMap::new(), fold_correlation: HashMap::new(), + pending_inner_proofs: HashMap::new(), } } @@ -71,6 +73,7 @@ impl NodeProofAggregator { let e3_id = msg.e3_id.clone(); if !msg.proof_aggregation_enabled { + self.pending_inner_proofs.remove(&e3_id); info!( "NodeProofAggregator: proof aggregation disabled for E3 {} — skipping", e3_id @@ -101,6 +104,7 @@ impl NodeProofAggregator { (n, threshold_params.moduli().len()) } Err(e) => { + self.pending_inner_proofs.remove(&e3_id); error!( "NodeProofAggregator: build_pair_for_preset failed for E3 {}: {e}", e3_id @@ -126,15 +130,7 @@ impl NodeProofAggregator { e3_id, meta.party_id, total_expected, ); - self.states.insert( - e3_id.clone(), - DkgProofCollectionState { - meta, - buffer: BTreeMap::new(), - fold_correlation: None, - last_ec: ec, - }, - ); + self.initialize_collection_state(e3_id, meta, ec); } fn handle_inner_proof_ready(&mut self, msg: TypedEvent) { @@ -142,9 +138,13 @@ impl NodeProofAggregator { let e3_id = msg.e3_id.clone(); let Some(state) = self.states.get_mut(&e3_id) else { - error!( - "NodeProofAggregator: received DKGInnerProofReady for E3 {} before ThresholdSharePending — proof dropped", - e3_id + let pending = self.pending_inner_proofs.entry(e3_id.clone()).or_default(); + pending.insert(msg.seq, msg.proof); + warn!( + "NodeProofAggregator: received DKGInnerProofReady for E3 {} before ThresholdSharePending — prebuffered seq={} (have {})", + e3_id, + msg.seq, + pending.len() ); return; }; @@ -171,6 +171,34 @@ impl NodeProofAggregator { self.try_dispatch_node_dkg_fold(&e3_id); } + fn initialize_collection_state( + &mut self, + e3_id: E3id, + meta: NodeDkgFoldMeta, + ec: EventContext, + ) { + let mut buffer = self.pending_inner_proofs.remove(&e3_id).unwrap_or_default(); + if !buffer.is_empty() { + info!( + "NodeProofAggregator: recovered {} prebuffered inner proofs for E3 {}", + buffer.len(), + e3_id + ); + } + + self.states.insert( + e3_id.clone(), + DkgProofCollectionState { + meta, + buffer: std::mem::take(&mut buffer), + fold_correlation: None, + last_ec: ec, + }, + ); + + self.try_dispatch_node_dkg_fold(&e3_id); + } + fn try_dispatch_node_dkg_fold(&mut self, e3_id: &E3id) { let state = match self.states.get_mut(e3_id) { Some(s) => s, @@ -381,18 +409,243 @@ impl NodeProofAggregator { } fn handle_compute_request_error(&mut self, msg: TypedEvent) { - let (msg, _ec) = msg.into_components(); + let (msg, ec) = msg.into_components(); if let Some(e3_id) = self.fold_correlation.remove(msg.correlation_id()) { error!( "NodeProofAggregator: NodeDkgFold failed for E3 {}: {:?} — aggregation aborted", e3_id, msg.get_err() ); - self.states.remove(&e3_id); + let state = self.states.remove(&e3_id); warn!( - "NodeProofAggregator: E3 {} aggregation state discarded due to error", + "NodeProofAggregator: E3 {} NodeDkgFold failed — publishing DKGRecursiveAggregationComplete(None)", e3_id ); + + if let Some(state) = state { + if let Err(err) = self.bus.publish( + DKGRecursiveAggregationComplete { + e3_id: e3_id.clone(), + party_id: state.meta.party_id, + aggregated_proof: None, + }, + ec, + ) { + error!( + "NodeProofAggregator: failed to publish DKGRecursiveAggregationComplete(None) for E3 {}: {err}", + e3_id + ); + } + } } } } + +#[cfg(test)] +mod tests { + use super::*; + use anyhow::Result; + use e3_events::{ + CircuitName, ComputeRequestErrorKind, ComputeRequestKind, Event, HistoryCollector, + TakeEvents, Unsequenced, ZkError, + }; + use e3_test_helpers::get_common_setup; + + fn test_ctx(data: impl Into) -> EventContext { + EventContext::::from(data.into()).sequence(0) + } + + fn dummy_proof(seed: u8) -> Proof { + Proof::new( + CircuitName::PkAggregation, + e3_utils::ArcBytes::from_bytes(&[seed]), + e3_utils::ArcBytes::from_bytes(&[seed.wrapping_add(1)]), + ) + } + + async fn next_event(history: &Addr>) -> Result { + let mut result = history.send(TakeEvents::::new(1)).await?; + assert!(!result.timed_out, "timed out waiting for an event"); + Ok(result.events.pop().expect("expected one event")) + } + + #[actix::test] + async fn node_dkg_fold_compute_error_emits_none_aggregation_result() -> Result<()> { + let (bus, _rng, _seed, _params, _crp, _errors, history) = get_common_setup(None)?; + let mut aggregator = NodeProofAggregator::new(&bus); + let e3_id = E3id::new("42", 1); + let correlation_id = CorrelationId::new(); + + aggregator.states.insert( + e3_id.clone(), + DkgProofCollectionState { + meta: NodeDkgFoldMeta { + party_id: 7, + total_expected: 0, + sk_enc_count: 0, + e_sm_enc_count: 0, + sk_share_encryption_requests: Vec::new(), + e_sm_share_encryption_requests: Vec::new(), + committee_n: 0, + n_moduli: 0, + params_preset: e3_fhe_params::BfvPreset::InsecureThreshold512, + }, + buffer: BTreeMap::new(), + fold_correlation: Some(correlation_id), + last_ec: test_ctx(DKGRecursiveAggregationComplete { + e3_id: e3_id.clone(), + party_id: 7, + aggregated_proof: None, + }), + }, + ); + aggregator + .fold_correlation + .insert(correlation_id, e3_id.clone()); + + let request = ComputeRequest::zk( + ZkRequest::NodeDkgFold(NodeDkgFoldRequest { + c0_proof: dummy_proof(1), + c1_proof: dummy_proof(2), + c2a_proof: dummy_proof(3), + c2b_proof: dummy_proof(4), + c3a_inner_proofs: Vec::new(), + c3b_inner_proofs: Vec::new(), + c4a_proof: dummy_proof(5), + c4b_proof: dummy_proof(6), + c3_slot_indices_a: Vec::new(), + c3_slot_indices_b: Vec::new(), + c3_total_slots: 0, + party_id: 7, + params_preset: e3_fhe_params::BfvPreset::InsecureThreshold512, + }), + correlation_id, + e3_id.clone(), + ); + + aggregator.handle_compute_request_error(TypedEvent::new( + ComputeRequestError::new( + ComputeRequestErrorKind::Zk(ZkError::ProofGenerationFailed("boom".to_string())), + request, + ), + test_ctx(DKGRecursiveAggregationComplete { + e3_id: e3_id.clone(), + party_id: 7, + aggregated_proof: None, + }), + )); + + let event = next_event(&history).await?; + assert!(matches!( + event.into_data(), + EnclaveEventData::DKGRecursiveAggregationComplete(data) + if data.e3_id == e3_id + && data.party_id == 7 + && data.aggregated_proof.is_none() + )); + assert!(!aggregator.states.contains_key(&e3_id)); + assert!(aggregator.fold_correlation.is_empty()); + + Ok(()) + } + + #[actix::test] + async fn early_inner_proof_is_prebuffered_until_collection_starts() -> Result<()> { + let (bus, _rng, _seed, _params, _crp, _errors, history) = get_common_setup(None)?; + let mut aggregator = NodeProofAggregator::new(&bus); + let e3_id = E3id::new("43", 1); + let early_proof = dummy_proof(10); + + aggregator.handle_inner_proof_ready(TypedEvent::new( + DKGInnerProofReady { + e3_id: e3_id.clone(), + party_id: 7, + proof: early_proof.clone(), + seq: 0, + }, + test_ctx(DKGInnerProofReady { + e3_id: e3_id.clone(), + party_id: 7, + proof: early_proof.clone(), + seq: 0, + }), + )); + + assert_eq!( + aggregator + .pending_inner_proofs + .get(&e3_id) + .map(BTreeMap::len), + Some(1) + ); + + aggregator.initialize_collection_state( + e3_id.clone(), + NodeDkgFoldMeta { + party_id: 7, + total_expected: 6, + sk_enc_count: 0, + e_sm_enc_count: 0, + sk_share_encryption_requests: Vec::new(), + e_sm_share_encryption_requests: Vec::new(), + committee_n: 0, + n_moduli: 0, + params_preset: e3_fhe_params::BfvPreset::InsecureThreshold512, + }, + test_ctx(DKGRecursiveAggregationComplete { + e3_id: e3_id.clone(), + party_id: 7, + aggregated_proof: None, + }), + ); + + assert!(!aggregator.pending_inner_proofs.contains_key(&e3_id)); + assert_eq!( + aggregator + .states + .get(&e3_id) + .map(|state| state.buffer.len()), + Some(1) + ); + + for seq in 1..6 { + let proof = dummy_proof((10 + seq) as u8); + aggregator.handle_inner_proof_ready(TypedEvent::new( + DKGInnerProofReady { + e3_id: e3_id.clone(), + party_id: 7, + proof: proof.clone(), + seq, + }, + test_ctx(DKGInnerProofReady { + e3_id: e3_id.clone(), + party_id: 7, + proof, + seq, + }), + )); + } + + let event = next_event(&history).await?; + match event.into_data() { + EnclaveEventData::ComputeRequest(request) => { + assert_eq!(request.e3_id, e3_id); + match request.request { + ComputeRequestKind::Zk(ZkRequest::NodeDkgFold(fold_request)) => { + assert_eq!(fold_request.c0_proof, early_proof); + } + other => panic!("expected NodeDkgFold request, got {other:?}"), + } + } + other => panic!("expected ComputeRequest event, got {other:?}"), + } + + assert!(aggregator + .states + .get(&e3_id) + .and_then(|state| state.fold_correlation) + .is_some()); + + Ok(()) + } +}