Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions hyperactor/src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1092,9 +1092,9 @@ impl<T: Message> Buffer<T> {
fn send(
&self,
item: (T, PortHandle<Undeliverable<T>>),
) -> Result<(), mpsc::error::SendError<(T, PortHandle<Undeliverable<T>>)>> {
) -> Result<(), Box<mpsc::error::SendError<(T, PortHandle<Undeliverable<T>>)>>> {
self.seq.fetch_add(1, Ordering::SeqCst);
self.queue.send(item)?;
self.queue.send(item).map_err(Box::new)?;
Ok(())
}
}
Expand Down Expand Up @@ -1224,9 +1224,8 @@ impl MailboxSender for MailboxClient {
return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
) {
tracing::event!(target:"messages", tracing::Level::TRACE, "size"=envelope.data.len(), "sender"= %envelope.sender, "dest" = %envelope.dest.actor_id(), "port"= envelope.dest.index(), "message_type" = envelope.data.typename().unwrap_or("unknown"), "send_message");
if let Err(mpsc::error::SendError((envelope, return_handle))) =
self.buffer.send((envelope, return_handle))
{
if let Err(err) = self.buffer.send((envelope, return_handle)) {
let mpsc::error::SendError((envelope, return_handle)) = *err;
let err = DeliveryError::BrokenLink(
"failed to enqueue in MailboxClient; buffer's queue is closed".to_string(),
);
Expand Down
2 changes: 1 addition & 1 deletion hyperactor_mesh/src/host_mesh/host_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ impl HostAgent {

let addr = host.addr().to_string();
let mut children: Vec<hyperactor::introspect::IntrospectRef> = Vec::new();
let system_children: Vec<crate::introspect::NodeRef> = Vec::new();
let system_children: Vec<crate::introspect::NodeRef> = Vec::new(); // LC-2

// Procs are not system — only actors are. Both service and
// local appear as regular children; 's' in the TUI toggles
Expand Down
66 changes: 60 additions & 6 deletions hyperactor_mesh/src/mesh_admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,50 @@
//! must correspond to the reference used to resolve it. The
//! display form of `identity` round-trips through `NodeRef::from_str`.
//!
//! - **NI-2 (parent coherence):** A node's `parent: Option<NodeRef>`
//! must equal the `identity` of the node it appears under. If node
//! `P` lists `R` in its `children`, then `R.parent == Some(P.identity)`.
//! - **NI-2 (parent = containment parent):** A node's
//! `parent: Option<NodeRef>` records its canonical containment
//! parent, not the inverse of every navigation edge. Specifically:
//! root → `None`, host → `Root`, proc → `Host(…)`,
//! actor → `Proc(…)`. An actor's parent is always its owning proc,
//! even when the actor also appears as a child of another actor via
//! supervision.
//!
//! - **NI-3 (children = navigation graph):** A node's `children`
//! is the admin navigation graph. Actor-to-actor supervision links
//! coexist with proc→actor membership links without changing
//! `parent`. The same actor may therefore appear in `children` of
//! both its proc and its supervising actor.
//!
//! Together these ensure that the TUI can correlate responses to tree
//! nodes, and that upward/downward navigation is consistent.
//!
//! ## Link-classification invariants (LC-*)
//!
//! These describe which nodes emit `system_children` and
//! `stopped_children` classification sets, and what those sets
//! contain.
//!
//! - **LC-1 (root system_children empty):** Root payloads always
//! emit `system_children: vec![]`. Root children are host nodes,
//! which are not classified as system.
//!
//! - **LC-2 (host system_children empty):** Host payloads always
//! emit `system_children: vec![]`. Host children are procs, which
//! are not classified as system — only actors carry the system
//! classification.
//!
//! - **LC-3 (proc system_children subset):** Proc payloads emit
//! `system_children ⊆ children`, containing only `NodeRef::Actor`
//! refs where `cell.is_system()` is true.
//!
//! - **LC-4 (proc stopped_children subset):** Proc payloads emit
//! `stopped_children ⊆ children`, containing only
//! `NodeRef::Actor` refs for terminated actors retained for
//! post-mortem inspection.
//!
//! - **LC-5 (actor/error no classification sets):** Actor and Error
//! payloads do not carry `system_children` or `stopped_children`.
//!
//! ## Proc-resolution invariants (SP-*)
//!
//! When a proc reference is resolved, the returned `NodePayload`
Expand Down Expand Up @@ -1117,8 +1154,7 @@ impl MeshAdminAgent {
///
/// The root is not a real actor/proc; it's a convenience node
/// that anchors navigation. Its children are `NodeRef::Host`
/// entries for each configured `HostAgent` plus any standalone
/// procs (root client proc, admin proc).
/// entries for each configured `HostAgent`.
fn build_root_payload(&self) -> NodePayload {
use crate::introspect::NodeRef;

Expand All @@ -1127,7 +1163,7 @@ impl MeshAdminAgent {
.values()
.map(|agent| NodeRef::Host(agent.actor_id().clone()))
.collect();
let system_children: Vec<NodeRef> = Vec::new();
let system_children: Vec<NodeRef> = Vec::new(); // LC-1
let mut attrs = hyperactor_config::Attrs::new();
attrs.set(crate::introspect::NODE_TYPE, "root".to_string());
attrs.set(crate::introspect::NUM_HOSTS, self.hosts.len());
Expand Down Expand Up @@ -2792,10 +2828,16 @@ mod tests {
NodeProperties::Root {
num_hosts,
started_by,
system_children,
..
} => {
assert_eq!(*num_hosts, 2);
assert!(!started_by.is_empty());
// LC-1: root system_children is always empty.
assert!(
system_children.is_empty(),
"LC-1: root system_children must be empty"
);
}
other => panic!("expected Root, got {:?}", other),
}
Expand Down Expand Up @@ -2901,6 +2943,18 @@ mod tests {
!host_node.children.is_empty(),
"host should have at least one proc child"
);
// LC-2: host system_children is always empty.
match &host_node.properties {
NodeProperties::Host {
system_children, ..
} => {
assert!(
system_children.is_empty(),
"LC-2: host system_children must be empty"
);
}
other => panic!("expected Host, got {:?}", other),
}

// -- 6. Resolve a system proc child --
let proc_ref = &host_node.children[0];
Expand Down
7 changes: 7 additions & 0 deletions hyperactor_mesh/test/mesh_admin_integration/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,13 @@ pub(crate) fn pyspy_workload_binary() -> PathBuf {
.to_path_buf()
}

/// Resolve the Rust sieve binary via Buck resources.
pub(crate) fn sieve_rust_binary() -> PathBuf {
buck_resources::get("monarch/hyperactor_mesh/sieve_rs")
.expect("sieve_rust resource not found")
.to_path_buf()
}

// Workload launch

/// Start a workload binary and wait for the mesh admin server to
Expand Down
19 changes: 19 additions & 0 deletions hyperactor_mesh/test/mesh_admin_integration/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,16 @@
//! - **MIT-70 (query-malformed-body):** `POST /v1/query` with a
//! malformed JSON body (missing required `sql` field) returns a
//! non-success status.
//!
//! ### Supervision topology (sieve)
//!
//! - **MIT-71 (actor-child-parent-is-proc):** When actor A exposes
//! actor B in `children` (supervision child), resolving B yields
//! `parent = Some(NodeRef::Proc(…))` — the containment proc, not
//! the supervising actor (NI-2).
//! - **MIT-72 (proc-and-actor-children-coexist):** Actor B appears
//! in both the proc's `children` (membership edge) and actor A's
//! `children` (supervision edge) simultaneously (NI-3).

mod admin;
mod auth;
Expand All @@ -290,6 +300,7 @@ mod openapi;
mod pyspy;
mod ref_check;
mod ref_edge;
mod supervision;
mod telemetry;
mod tree;

Expand Down Expand Up @@ -425,3 +436,11 @@ async fn test_no_dashboard_returns_404() {
async fn test_query_malformed_body() {
telemetry::run_query_malformed_body().await;
}

// --- supervision family ---

/// MIT-71, MIT-72: NI-2/NI-3 supervision proof — Rust sieve binary.
#[tokio::test]
async fn test_supervision_proof_rust() {
supervision::run_supervision_proof_rust().await;
}
Loading
Loading