Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions agent/flow-trace/00_INDEX.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,3 +191,4 @@ _Found during source-code cross-referencing of these trace documents._
| 4 | **`activate` CLI command is misleading** | Low | Named "activate" but actually calls "register" — will fail for already-registered operators. There's no standalone way to trigger re-evaluation of active status; instead, `_updateOperatorStatus()` runs automatically inside `addTicketBalance()`, `bondLicense()`, etc. |
| 5 | **Active-job load balancing bug fixed** | Info | The Rust `NodeStateStore.available_tickets()` subtracts `active_jobs` from total tickets, reducing the chance of busy nodes being selected for new E3s. Previously, the `Sortition` actor's `Handler<EnclaveEvent>` was missing match arms for `E3Failed` and `E3StageChanged`, causing these events to fall to the default `_ => ()` — the typed handlers for decrementing jobs were dead code. This has been fixed: E3Failed and E3StageChanged are now routed to their handlers, and `finalized_committees` is cleaned up in `decrement_jobs_for_e3` to prevent unbounded memory growth. |
| 6 | **Committee member expulsion** | Info | `SlashingManager` can call `expelCommitteeMember()` mid-DKG. The `Sortition` actor enriches the raw `CommitteeMemberExpelled` event with the expelled member's `party_id` (resolved from its stored `Committee` list) and re-publishes it. `ThresholdKeyshare` then uses the enriched `party_id` to update its collectors, potentially completing DKG with fewer parties. `ThresholdKeyshare` itself does not hold committee state. |
| 7 | **NodeProofAggregator stall bridge fixed** | Info | `NodeProofAggregator` no longer drops `DKGInnerProofReady` events that arrive before `ThresholdSharePending`; it prebuffers them until collection state exists. It also converts `NodeDkgFold` `ComputeRequestError` into `DKGRecursiveAggregationComplete { aggregated_proof: None }` instead of silently discarding actor state, preventing DKG proof aggregation stalls when fold workers fail or events arrive slightly out of order. |
12 changes: 9 additions & 3 deletions agent/flow-trace/04_DKG_AND_COMPUTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,15 @@ The per-circuit `wrapper/` Noir step was removed; aggregator response structs no
implements `ZkRequest::NodeDkgFold` (full per-node pipeline to a `NodeFold` proof),
`ZkRequest::DkgAggregation` (`NodesFold` + C5 + `DkgAggregator`), and
`ZkRequest::DecryptionAggregation` (per-ciphertext `C6Fold` + C7 + `DecryptionAggregator`).
`NodeProofAggregator` buffers all `DKGInnerProofReady` proofs then issues one `NodeDkgFold` request;
`PublicKeyAggregator` and `ThresholdPlaintextAggregator` dispatch the aggregator requests instead of
pairwise folding.
`NodeProofAggregator` prebuffers `DKGInnerProofReady` proofs that arrive before
`ThresholdSharePending`, drains those buffered proofs into collection state once
`ThresholdSharePending` arrives, and issues one `NodeDkgFold` request when the
full ordered proof set is available. If that `NodeDkgFold` compute request fails,
it publishes `DKGRecursiveAggregationComplete { aggregated_proof: None }` so the
downstream DKG/public-key aggregation path can terminate deterministically instead
of stalling on missing node-fold output. `PublicKeyAggregator` and
`ThresholdPlaintextAggregator` dispatch the aggregator requests instead of pairwise
folding.

### Step 6: Collect All Threshold Shares (with C2/C3 Verification)

Expand Down
5 changes: 3 additions & 2 deletions crates/events/src/enclave_event/dkg_inner_proof_ready.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ use serde::{Deserialize, Serialize};
/// Emitted for every inner circuit (C0–C4) when available. `seq` gives the deterministic ordering.
///
/// The total count of expected proofs is communicated separately via
/// [`ThresholdSharePending`], which is always published before the first
/// `DKGInnerProofReady` for a given E3.
/// [`ThresholdSharePending`]. The normal flow publishes that event first, but
/// [`NodeProofAggregator`] also tolerates earlier `DKGInnerProofReady` arrivals
/// by prebuffering them until collection state exists.
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct DKGInnerProofReady {
pub e3_id: E3id,
Expand Down
283 changes: 268 additions & 15 deletions crates/zk-prover/src/actors/node_proof_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub struct NodeProofAggregator {
bus: BusHandle,
states: HashMap<E3id, DkgProofCollectionState>,
fold_correlation: HashMap<CorrelationId, E3id>,
pending_inner_proofs: HashMap<E3id, BTreeMap<usize, Proof>>,
}

impl NodeProofAggregator {
Expand All @@ -54,6 +55,7 @@ impl NodeProofAggregator {
bus: bus.clone(),
states: HashMap::new(),
fold_correlation: HashMap::new(),
pending_inner_proofs: HashMap::new(),
}
}

Expand All @@ -71,6 +73,7 @@ impl NodeProofAggregator {
let e3_id = msg.e3_id.clone();

if !msg.proof_aggregation_enabled {
self.pending_inner_proofs.remove(&e3_id);
info!(
"NodeProofAggregator: proof aggregation disabled for E3 {} — skipping",
e3_id
Expand Down Expand Up @@ -101,6 +104,7 @@ impl NodeProofAggregator {
(n, threshold_params.moduli().len())
}
Err(e) => {
self.pending_inner_proofs.remove(&e3_id);
error!(
"NodeProofAggregator: build_pair_for_preset failed for E3 {}: {e}",
e3_id
Expand All @@ -126,25 +130,21 @@ impl NodeProofAggregator {
e3_id, meta.party_id, total_expected,
);

self.states.insert(
e3_id.clone(),
DkgProofCollectionState {
meta,
buffer: BTreeMap::new(),
fold_correlation: None,
last_ec: ec,
},
);
self.initialize_collection_state(e3_id, meta, ec);
}

fn handle_inner_proof_ready(&mut self, msg: TypedEvent<DKGInnerProofReady>) {
let (msg, ec) = msg.into_components();
let e3_id = msg.e3_id.clone();

let Some(state) = self.states.get_mut(&e3_id) else {
error!(
"NodeProofAggregator: received DKGInnerProofReady for E3 {} before ThresholdSharePending — proof dropped",
e3_id
let pending = self.pending_inner_proofs.entry(e3_id.clone()).or_default();
pending.insert(msg.seq, msg.proof);
warn!(
"NodeProofAggregator: received DKGInnerProofReady for E3 {} before ThresholdSharePending — prebuffered seq={} (have {})",
e3_id,
msg.seq,
pending.len()
);
return;
};
Expand All @@ -171,6 +171,34 @@ impl NodeProofAggregator {
self.try_dispatch_node_dkg_fold(&e3_id);
}

fn initialize_collection_state(
&mut self,
e3_id: E3id,
meta: NodeDkgFoldMeta,
ec: EventContext<Sequenced>,
) {
let mut buffer = self.pending_inner_proofs.remove(&e3_id).unwrap_or_default();
if !buffer.is_empty() {
info!(
"NodeProofAggregator: recovered {} prebuffered inner proofs for E3 {}",
buffer.len(),
e3_id
);
}

self.states.insert(
e3_id.clone(),
DkgProofCollectionState {
meta,
buffer: std::mem::take(&mut buffer),
fold_correlation: None,
last_ec: ec,
},
);

self.try_dispatch_node_dkg_fold(&e3_id);
}

fn try_dispatch_node_dkg_fold(&mut self, e3_id: &E3id) {
let state = match self.states.get_mut(e3_id) {
Some(s) => s,
Expand Down Expand Up @@ -381,18 +409,243 @@ impl NodeProofAggregator {
}

fn handle_compute_request_error(&mut self, msg: TypedEvent<ComputeRequestError>) {
let (msg, _ec) = msg.into_components();
let (msg, ec) = msg.into_components();
if let Some(e3_id) = self.fold_correlation.remove(msg.correlation_id()) {
error!(
"NodeProofAggregator: NodeDkgFold failed for E3 {}: {:?} — aggregation aborted",
e3_id,
msg.get_err()
);
self.states.remove(&e3_id);
let state = self.states.remove(&e3_id);
warn!(
"NodeProofAggregator: E3 {} aggregation state discarded due to error",
"NodeProofAggregator: E3 {} NodeDkgFold failed — publishing DKGRecursiveAggregationComplete(None)",
e3_id
);

if let Some(state) = state {
if let Err(err) = self.bus.publish(
DKGRecursiveAggregationComplete {
e3_id: e3_id.clone(),
party_id: state.meta.party_id,
aggregated_proof: None,
},
ec,
) {
error!(
"NodeProofAggregator: failed to publish DKGRecursiveAggregationComplete(None) for E3 {}: {err}",
e3_id
);
}
}
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use anyhow::Result;
use e3_events::{
CircuitName, ComputeRequestErrorKind, ComputeRequestKind, Event, HistoryCollector,
TakeEvents, Unsequenced, ZkError,
};
use e3_test_helpers::get_common_setup;

fn test_ctx(data: impl Into<EnclaveEventData>) -> EventContext<Sequenced> {
EventContext::<Unsequenced>::from(data.into()).sequence(0)
}

fn dummy_proof(seed: u8) -> Proof {
Proof::new(
CircuitName::PkAggregation,
e3_utils::ArcBytes::from_bytes(&[seed]),
e3_utils::ArcBytes::from_bytes(&[seed.wrapping_add(1)]),
)
}

async fn next_event(history: &Addr<HistoryCollector<EnclaveEvent>>) -> Result<EnclaveEvent> {
let mut result = history.send(TakeEvents::<EnclaveEvent>::new(1)).await?;
assert!(!result.timed_out, "timed out waiting for an event");
Ok(result.events.pop().expect("expected one event"))
}

#[actix::test]
async fn node_dkg_fold_compute_error_emits_none_aggregation_result() -> Result<()> {
let (bus, _rng, _seed, _params, _crp, _errors, history) = get_common_setup(None)?;
let mut aggregator = NodeProofAggregator::new(&bus);
let e3_id = E3id::new("42", 1);
let correlation_id = CorrelationId::new();

aggregator.states.insert(
e3_id.clone(),
DkgProofCollectionState {
meta: NodeDkgFoldMeta {
party_id: 7,
total_expected: 0,
sk_enc_count: 0,
e_sm_enc_count: 0,
sk_share_encryption_requests: Vec::new(),
e_sm_share_encryption_requests: Vec::new(),
committee_n: 0,
n_moduli: 0,
params_preset: e3_fhe_params::BfvPreset::InsecureThreshold512,
},
buffer: BTreeMap::new(),
fold_correlation: Some(correlation_id),
last_ec: test_ctx(DKGRecursiveAggregationComplete {
e3_id: e3_id.clone(),
party_id: 7,
aggregated_proof: None,
}),
},
);
aggregator
.fold_correlation
.insert(correlation_id, e3_id.clone());

let request = ComputeRequest::zk(
ZkRequest::NodeDkgFold(NodeDkgFoldRequest {
c0_proof: dummy_proof(1),
c1_proof: dummy_proof(2),
c2a_proof: dummy_proof(3),
c2b_proof: dummy_proof(4),
c3a_inner_proofs: Vec::new(),
c3b_inner_proofs: Vec::new(),
c4a_proof: dummy_proof(5),
c4b_proof: dummy_proof(6),
c3_slot_indices_a: Vec::new(),
c3_slot_indices_b: Vec::new(),
c3_total_slots: 0,
party_id: 7,
params_preset: e3_fhe_params::BfvPreset::InsecureThreshold512,
}),
correlation_id,
e3_id.clone(),
);

aggregator.handle_compute_request_error(TypedEvent::new(
ComputeRequestError::new(
ComputeRequestErrorKind::Zk(ZkError::ProofGenerationFailed("boom".to_string())),
request,
),
test_ctx(DKGRecursiveAggregationComplete {
e3_id: e3_id.clone(),
party_id: 7,
aggregated_proof: None,
}),
));

let event = next_event(&history).await?;
assert!(matches!(
event.into_data(),
EnclaveEventData::DKGRecursiveAggregationComplete(data)
if data.e3_id == e3_id
&& data.party_id == 7
&& data.aggregated_proof.is_none()
));
assert!(!aggregator.states.contains_key(&e3_id));
assert!(aggregator.fold_correlation.is_empty());

Ok(())
}

#[actix::test]
async fn early_inner_proof_is_prebuffered_until_collection_starts() -> Result<()> {
let (bus, _rng, _seed, _params, _crp, _errors, history) = get_common_setup(None)?;
let mut aggregator = NodeProofAggregator::new(&bus);
let e3_id = E3id::new("43", 1);
let early_proof = dummy_proof(10);

aggregator.handle_inner_proof_ready(TypedEvent::new(
DKGInnerProofReady {
e3_id: e3_id.clone(),
party_id: 7,
proof: early_proof.clone(),
seq: 0,
},
test_ctx(DKGInnerProofReady {
e3_id: e3_id.clone(),
party_id: 7,
proof: early_proof.clone(),
seq: 0,
}),
));

assert_eq!(
aggregator
.pending_inner_proofs
.get(&e3_id)
.map(BTreeMap::len),
Some(1)
);

aggregator.initialize_collection_state(
e3_id.clone(),
NodeDkgFoldMeta {
party_id: 7,
total_expected: 6,
sk_enc_count: 0,
e_sm_enc_count: 0,
sk_share_encryption_requests: Vec::new(),
e_sm_share_encryption_requests: Vec::new(),
committee_n: 0,
n_moduli: 0,
params_preset: e3_fhe_params::BfvPreset::InsecureThreshold512,
},
test_ctx(DKGRecursiveAggregationComplete {
e3_id: e3_id.clone(),
party_id: 7,
aggregated_proof: None,
}),
);

assert!(!aggregator.pending_inner_proofs.contains_key(&e3_id));
assert_eq!(
aggregator
.states
.get(&e3_id)
.map(|state| state.buffer.len()),
Some(1)
);

for seq in 1..6 {
let proof = dummy_proof((10 + seq) as u8);
aggregator.handle_inner_proof_ready(TypedEvent::new(
DKGInnerProofReady {
e3_id: e3_id.clone(),
party_id: 7,
proof: proof.clone(),
seq,
},
test_ctx(DKGInnerProofReady {
e3_id: e3_id.clone(),
party_id: 7,
proof,
seq,
}),
));
}

let event = next_event(&history).await?;
match event.into_data() {
EnclaveEventData::ComputeRequest(request) => {
assert_eq!(request.e3_id, e3_id);
match request.request {
ComputeRequestKind::Zk(ZkRequest::NodeDkgFold(fold_request)) => {
assert_eq!(fold_request.c0_proof, early_proof);
}
other => panic!("expected NodeDkgFold request, got {other:?}"),
}
}
other => panic!("expected ComputeRequest event, got {other:?}"),
}

assert!(aggregator
.states
.get(&e3_id)
.and_then(|state| state.fold_correlation)
.is_some());

Ok(())
}
}
Loading