diff --git a/hyperactor/src/mailbox.rs b/hyperactor/src/mailbox.rs index 74b7769c9..4fce9636a 100644 --- a/hyperactor/src/mailbox.rs +++ b/hyperactor/src/mailbox.rs @@ -1092,9 +1092,9 @@ impl Buffer { fn send( &self, item: (T, PortHandle>), - ) -> Result<(), mpsc::error::SendError<(T, PortHandle>)>> { + ) -> Result<(), Box>)>>> { self.seq.fetch_add(1, Ordering::SeqCst); - self.queue.send(item)?; + self.queue.send(item).map_err(Box::new)?; Ok(()) } } @@ -1224,9 +1224,8 @@ impl MailboxSender for MailboxClient { return_handle: PortHandle>, ) { tracing::event!(target:"messages", tracing::Level::TRACE, "size"=envelope.data.len(), "sender"= %envelope.sender, "dest" = %envelope.dest.actor_id(), "port"= envelope.dest.index(), "message_type" = envelope.data.typename().unwrap_or("unknown"), "send_message"); - if let Err(mpsc::error::SendError((envelope, return_handle))) = - self.buffer.send((envelope, return_handle)) - { + if let Err(err) = self.buffer.send((envelope, return_handle)) { + let mpsc::error::SendError((envelope, return_handle)) = *err; let err = DeliveryError::BrokenLink( "failed to enqueue in MailboxClient; buffer's queue is closed".to_string(), ); diff --git a/hyperactor_mesh/src/host_mesh/host_agent.rs b/hyperactor_mesh/src/host_mesh/host_agent.rs index ae8695c5d..6a6bc1131 100644 --- a/hyperactor_mesh/src/host_mesh/host_agent.rs +++ b/hyperactor_mesh/src/host_mesh/host_agent.rs @@ -388,7 +388,7 @@ impl HostAgent { let addr = host.addr().to_string(); let mut children: Vec = Vec::new(); - let system_children: Vec = Vec::new(); + let system_children: Vec = Vec::new(); // LC-2 // Procs are not system — only actors are. Both service and // local appear as regular children; 's' in the TUI toggles diff --git a/hyperactor_mesh/src/mesh_admin.rs b/hyperactor_mesh/src/mesh_admin.rs index c80b85dbd..b79e98984 100644 --- a/hyperactor_mesh/src/mesh_admin.rs +++ b/hyperactor_mesh/src/mesh_admin.rs @@ -153,13 +153,50 @@ //! must correspond to the reference used to resolve it. The //! display form of `identity` round-trips through `NodeRef::from_str`. //! -//! - **NI-2 (parent coherence):** A node's `parent: Option` -//! must equal the `identity` of the node it appears under. If node -//! `P` lists `R` in its `children`, then `R.parent == Some(P.identity)`. +//! - **NI-2 (parent = containment parent):** A node's +//! `parent: Option` records its canonical containment +//! parent, not the inverse of every navigation edge. Specifically: +//! root → `None`, host → `Root`, proc → `Host(…)`, +//! actor → `Proc(…)`. An actor's parent is always its owning proc, +//! even when the actor also appears as a child of another actor via +//! supervision. +//! +//! - **NI-3 (children = navigation graph):** A node's `children` +//! is the admin navigation graph. Actor-to-actor supervision links +//! coexist with proc→actor membership links without changing +//! `parent`. The same actor may therefore appear in `children` of +//! both its proc and its supervising actor. //! //! Together these ensure that the TUI can correlate responses to tree //! nodes, and that upward/downward navigation is consistent. //! +//! ## Link-classification invariants (LC-*) +//! +//! These describe which nodes emit `system_children` and +//! `stopped_children` classification sets, and what those sets +//! contain. +//! +//! - **LC-1 (root system_children empty):** Root payloads always +//! emit `system_children: vec![]`. Root children are host nodes, +//! which are not classified as system. +//! +//! - **LC-2 (host system_children empty):** Host payloads always +//! emit `system_children: vec![]`. Host children are procs, which +//! are not classified as system — only actors carry the system +//! classification. +//! +//! - **LC-3 (proc system_children subset):** Proc payloads emit +//! `system_children ⊆ children`, containing only `NodeRef::Actor` +//! refs where `cell.is_system()` is true. +//! +//! - **LC-4 (proc stopped_children subset):** Proc payloads emit +//! `stopped_children ⊆ children`, containing only +//! `NodeRef::Actor` refs for terminated actors retained for +//! post-mortem inspection. +//! +//! - **LC-5 (actor/error no classification sets):** Actor and Error +//! payloads do not carry `system_children` or `stopped_children`. +//! //! ## Proc-resolution invariants (SP-*) //! //! When a proc reference is resolved, the returned `NodePayload` @@ -1117,8 +1154,7 @@ impl MeshAdminAgent { /// /// The root is not a real actor/proc; it's a convenience node /// that anchors navigation. Its children are `NodeRef::Host` - /// entries for each configured `HostAgent` plus any standalone - /// procs (root client proc, admin proc). + /// entries for each configured `HostAgent`. fn build_root_payload(&self) -> NodePayload { use crate::introspect::NodeRef; @@ -1127,7 +1163,7 @@ impl MeshAdminAgent { .values() .map(|agent| NodeRef::Host(agent.actor_id().clone())) .collect(); - let system_children: Vec = Vec::new(); + let system_children: Vec = Vec::new(); // LC-1 let mut attrs = hyperactor_config::Attrs::new(); attrs.set(crate::introspect::NODE_TYPE, "root".to_string()); attrs.set(crate::introspect::NUM_HOSTS, self.hosts.len()); @@ -2792,10 +2828,16 @@ mod tests { NodeProperties::Root { num_hosts, started_by, + system_children, .. } => { assert_eq!(*num_hosts, 2); assert!(!started_by.is_empty()); + // LC-1: root system_children is always empty. + assert!( + system_children.is_empty(), + "LC-1: root system_children must be empty" + ); } other => panic!("expected Root, got {:?}", other), } @@ -2901,6 +2943,18 @@ mod tests { !host_node.children.is_empty(), "host should have at least one proc child" ); + // LC-2: host system_children is always empty. + match &host_node.properties { + NodeProperties::Host { + system_children, .. + } => { + assert!( + system_children.is_empty(), + "LC-2: host system_children must be empty" + ); + } + other => panic!("expected Host, got {:?}", other), + } // -- 6. Resolve a system proc child -- let proc_ref = &host_node.children[0]; diff --git a/hyperactor_mesh/test/mesh_admin_integration/harness.rs b/hyperactor_mesh/test/mesh_admin_integration/harness.rs index 0f04ad740..6730d0aac 100644 --- a/hyperactor_mesh/test/mesh_admin_integration/harness.rs +++ b/hyperactor_mesh/test/mesh_admin_integration/harness.rs @@ -302,6 +302,13 @@ pub(crate) fn pyspy_workload_binary() -> PathBuf { .to_path_buf() } +/// Resolve the Rust sieve binary via Buck resources. +pub(crate) fn sieve_rust_binary() -> PathBuf { + buck_resources::get("monarch/hyperactor_mesh/sieve_rs") + .expect("sieve_rust resource not found") + .to_path_buf() +} + // Workload launch /// Start a workload binary and wait for the mesh admin server to diff --git a/hyperactor_mesh/test/mesh_admin_integration/main.rs b/hyperactor_mesh/test/mesh_admin_integration/main.rs index db891e679..ae597b530 100644 --- a/hyperactor_mesh/test/mesh_admin_integration/main.rs +++ b/hyperactor_mesh/test/mesh_admin_integration/main.rs @@ -280,6 +280,16 @@ //! - **MIT-70 (query-malformed-body):** `POST /v1/query` with a //! malformed JSON body (missing required `sql` field) returns a //! non-success status. +//! +//! ### Supervision topology (sieve) +//! +//! - **MIT-71 (actor-child-parent-is-proc):** When actor A exposes +//! actor B in `children` (supervision child), resolving B yields +//! `parent = Some(NodeRef::Proc(…))` — the containment proc, not +//! the supervising actor (NI-2). +//! - **MIT-72 (proc-and-actor-children-coexist):** Actor B appears +//! in both the proc's `children` (membership edge) and actor A's +//! `children` (supervision edge) simultaneously (NI-3). mod admin; mod auth; @@ -290,6 +300,7 @@ mod openapi; mod pyspy; mod ref_check; mod ref_edge; +mod supervision; mod telemetry; mod tree; @@ -425,3 +436,11 @@ async fn test_no_dashboard_returns_404() { async fn test_query_malformed_body() { telemetry::run_query_malformed_body().await; } + +// --- supervision family --- + +/// MIT-71, MIT-72: NI-2/NI-3 supervision proof — Rust sieve binary. +#[tokio::test] +async fn test_supervision_proof_rust() { + supervision::run_supervision_proof_rust().await; +} diff --git a/hyperactor_mesh/test/mesh_admin_integration/supervision.rs b/hyperactor_mesh/test/mesh_admin_integration/supervision.rs new file mode 100644 index 000000000..f806adee9 --- /dev/null +++ b/hyperactor_mesh/test/mesh_admin_integration/supervision.rs @@ -0,0 +1,230 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +//! NI-2/NI-3 integration proof over a live sieve topology. +//! +//! The sieve example creates a chain of actors where each actor +//! spawns the next as a supervision child: +//! +//! proc → sieve[0] → sieve[1] → sieve[2] → … +//! +//! All sieve actors also appear as direct children of the proc +//! (via `all_instance_keys()`). This test proves that: +//! +//! - NI-2: an actor's `parent` is its containment proc, not +//! the supervising actor that spawned it. +//! - NI-3: actor→actor edges in `children` coexist with +//! proc→actor membership edges. + +use std::future::Future; +use std::pin::Pin; +use std::time::Duration; + +use hyperactor_mesh::introspect::NodePayload; +use hyperactor_mesh::introspect::NodeProperties; +use hyperactor_mesh::introspect::NodeRef; + +use crate::harness; +use crate::harness::WorkloadFixture; + +/// How long to wait for the admin URL sentinel after starting the +/// sieve binary. +const SIEVE_START_TIMEOUT: Duration = Duration::from_secs(60); + +/// Settle time after admin-URL readiness for the sieve actor chain +/// to build (the sieve starts 5s after the sentinel, then needs +/// time to discover primes and spawn child actors). +const SIEVE_CHAIN_READY_SLEEP: Duration = Duration::from_secs(10); + +/// Maximum number of discovery attempts before failing. +const DISCOVERY_ATTEMPTS: usize = 30; + +/// Backoff between discovery attempts. +const DISCOVERY_BACKOFF: Duration = Duration::from_secs(2); + +/// A live sieve workload fixture. +struct SieveScenario { + fixture: WorkloadFixture, +} + +impl SieveScenario { + async fn start() -> Self { + let bin = harness::sieve_rust_binary(); + // The sieve prints "Starts in 5 seconds." then spawns actors. + // Give it enough time to find some primes and build the chain. + // --num-primes 10: enough to build a chain of ~10 actors, + // small enough to converge quickly. + let fixture = harness::start_workload(&bin, &["--num-primes", "10"], SIEVE_START_TIMEOUT) + .await + .expect("failed to start sieve workload"); + // Wait for sieve actors to spawn. + tokio::time::sleep(SIEVE_CHAIN_READY_SLEEP).await; + SieveScenario { fixture } + } + + /// Run a test closure with a fresh scenario. + /// + /// **Structural cleanup guarantee (MIT-2):** The fixture is shut + /// down before this function returns, even if the closure panics. + /// `WorkloadFixture::Drop` calls `start_kill()` on the panic path. + async fn run(f: F) + where + F: for<'a> FnOnce(&'a SieveScenario) -> Pin + 'a>>, + { + let scenario = Self::start().await; + let guard = ShutdownGuard(&scenario.fixture); + f(&scenario).await; + guard.disarm(); + scenario.fixture.shutdown().await; + } +} + +/// Drop guard matching the `DiningScenario` cleanup pattern. +struct ShutdownGuard<'a>(#[allow(dead_code)] &'a WorkloadFixture); + +impl ShutdownGuard<'_> { + fn disarm(self) { + std::mem::forget(self); + } +} + +impl Drop for ShutdownGuard<'_> { + fn drop(&mut self) { + // Panic path: WorkloadFixture::Drop handles synchronous kill. + } +} + +fn enc(r: &NodeRef) -> String { + urlencoding::encode(&r.to_string()).into_owned() +} + +/// Find the first actor under a proc that itself has actor children +/// (i.e., the first sieve actor in the chain that has spawned a +/// successor). +async fn find_actor_with_child( + fixture: &WorkloadFixture, +) -> Option<(NodeRef, NodePayload, NodePayload)> { + // Walk: root → host → proc → actor. Transient fetch failures + // skip the node and continue rather than aborting the search. + let root = match fixture.get_node_payload("/v1/root").await { + Ok(r) => r, + Err(_) => return None, + }; + + for host_ref in &root.children { + let host = match fixture + .get_node_payload(&format!("/v1/{}", enc(host_ref))) + .await + { + Ok(h) => h, + Err(_) => continue, + }; + + for proc_ref in &host.children { + let proc_node = match fixture + .get_node_payload(&format!("/v1/{}", enc(proc_ref))) + .await + { + Ok(p) => p, + Err(_) => continue, + }; + + if !matches!(proc_node.properties, NodeProperties::Proc { .. }) { + continue; + } + + for actor_ref in &proc_node.children { + let actor = match fixture + .get_node_payload(&format!("/v1/{}", enc(actor_ref))) + .await + { + Ok(a) => a, + Err(_) => continue, + }; + + // Must be a sieve actor (not an infrastructure actor + // that happens to have children) with at least one + // supervision child. + let is_sieve = matches!( + &actor.properties, + NodeProperties::Actor { actor_type, .. } + if actor_type.contains("SieveActor") + ); + if is_sieve && !actor.children.is_empty() { + return Some((proc_ref.clone(), proc_node, actor)); + } + } + } + } + None +} + +/// MIT-71, MIT-72: NI-2/NI-3 supervision proof — Rust sieve binary. +pub async fn run_supervision_proof_rust() { + SieveScenario::run(|s| { + Box::pin(async move { + check_supervision(&s.fixture).await; + }) + }) + .await; +} + +/// NI-2, NI-3: actor supervision children coexist with proc +/// membership, and parent always points to the proc. +async fn check_supervision(fixture: &WorkloadFixture) { + // Retry discovery — actors may still be spawning. + let (proc_ref, proc_node, actor_a) = { + let mut result = None; + for _attempt in 1..=DISCOVERY_ATTEMPTS { + if let Some(found) = find_actor_with_child(fixture).await { + result = Some(found); + break; + } + tokio::time::sleep(DISCOVERY_BACKOFF).await; + } + result.expect("failed to find an actor with supervision children in the sieve topology") + }; + + // actor_a is the supervisor (e.g. sieve[0]). + // Select the first Actor child explicitly — not just children[0]. + let actor_b_ref = actor_a + .children + .iter() + .find(|r| matches!(r, NodeRef::Actor(_))) + .expect("NI-3: supervising actor A must have at least one Actor child"); + + // (2) NI-3: A.children contains B — actor→actor navigation edge. + assert!( + actor_a.children.contains(actor_b_ref), + "NI-3: supervising actor A must list child actor B in children" + ); + + // (3) NI-3: the containing proc's children also contains B — + // proc→actor membership edge coexists with actor→actor edge. + assert!( + proc_node.children.contains(actor_b_ref), + "NI-3: proc must also list actor B in children; \ + proc children: {:?}, looking for: {:?}", + proc_node.children, + actor_b_ref + ); + + // (4) NI-2: resolve B and assert parent = proc, not actor A. + let actor_b = fixture + .get_node_payload(&format!("/v1/{}", enc(actor_b_ref))) + .await + .expect("failed to resolve actor B"); + + assert_eq!( + actor_b.parent, + Some(proc_ref), + "NI-2: actor B's parent must be the containing proc, not the supervising actor; \ + got: {:?}", + actor_b.parent + ); +} diff --git a/hyperactor_mesh_admin_tui/bin/admin_tui.rs b/hyperactor_mesh_admin_tui/bin/admin_tui.rs index 50a48c72b..eed5a1471 100644 --- a/hyperactor_mesh_admin_tui/bin/admin_tui.rs +++ b/hyperactor_mesh_admin_tui/bin/admin_tui.rs @@ -17,6 +17,10 @@ use clap::Parser; use hyperactor_mesh_admin_tui_lib::LangName; use hyperactor_mesh_admin_tui_lib::ThemeName; use hyperactor_mesh_admin_tui_lib::TuiConfig; +// tokio is the async runtime on the OSS path (#[tokio::main]); +// fbcode uses fbinit::main. Explicit use suppresses the unused-deps +// linter while keeping tokio in BUCK deps for autocargo. +use tokio as _; /// Command-line arguments for the admin TUI. #[derive(Debug, Parser)] diff --git a/hyperactor_mesh_admin_tui/src/lib.rs b/hyperactor_mesh_admin_tui/src/lib.rs index 53e33faf2..7d9772731 100644 --- a/hyperactor_mesh_admin_tui/src/lib.rs +++ b/hyperactor_mesh_admin_tui/src/lib.rs @@ -159,6 +159,12 @@ //! buck2 run fbcode//monarch/hyperactor_mesh_admin_tui:hyperactor_mesh_admin_tui -- --addr 127.0.0.1:XXXXX //! ``` +// tokio is used extensively (tokio::spawn, tokio::time, tokio::sync) +// but the unused-deps linter does not see through fbinit's runtime +// provider. This suppresses the false positive while keeping tokio +// in BUCK deps for autocargo. +use tokio as _; + mod actions; mod app; pub(crate) mod client; diff --git a/monarch_introspection_snapshot/Cargo.toml b/monarch_introspection_snapshot/Cargo.toml index bba2b30ee..bfb7026f8 100644 --- a/monarch_introspection_snapshot/Cargo.toml +++ b/monarch_introspection_snapshot/Cargo.toml @@ -8,5 +8,10 @@ edition = "2024" license = "BSD-3-Clause" [dependencies] +anyhow = "1.0.102" datafusion = "52.1" +hyperactor_mesh = { version = "0.0.0", path = "../hyperactor_mesh" } monarch_record_batch = { version = "0.0.0", path = "../monarch_record_batch" } + +[dev-dependencies] +hyperactor = { version = "0.0.0", path = "../hyperactor" } diff --git a/monarch_introspection_snapshot/src/capture.rs b/monarch_introspection_snapshot/src/capture.rs new file mode 100644 index 000000000..906f24203 --- /dev/null +++ b/monarch_introspection_snapshot/src/capture.rs @@ -0,0 +1,893 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +//! BFS capture of a mesh topology into [`SnapshotData`]. +//! +//! The entry point is [`capture_snapshot`], which walks the mesh from +//! root, resolving each node exactly once, and folds the results into +//! a relational row set ready for ingestion. +//! +//! # Capture invariants (CS-*) +//! +//! - **CS-1 (snapshot-row-once):** Each successful `capture_snapshot` +//! emits exactly one [`SnapshotRow`]. +//! - **CS-2 (resolve-once-per-node):** Each distinct `NodeRef` is +//! resolved at most once per capture. +//! - **CS-3 (edge-per-parent-payload):** Every [`ChildRow`] emitted +//! comes from a successfully resolved parent payload's `children`, +//! in parent enumeration order. +//! - **CS-4 (duplicate-node-single-row):** If the same `NodeRef` is +//! reachable from multiple parents, [`SnapshotData`] contains one +//! node projection but one [`ChildRow`] per observed parent→child +//! edge. +//! - **CS-5 (fold-homomorphism):** Folding a +//! [`ConvertedNode`](crate::convert::ConvertedNode) via +//! [`SnapshotData::push_converted`] appends exactly one +//! [`NodeRow`], exactly one subtype-table row matching `kind_row`, +//! optionally one [`ActorFailureRow`], and all of its +//! [`ChildRow`]s. +//! - **CS-6 (resolution-error-boundary):** Resolver transport/query +//! failure aborts capture with `Err`. Only successfully resolved +//! payloads with `NodeProperties::Error` populate +//! `resolution_errors`. +//! - **CS-7 (typed-visited-key):** Traversal dedup uses typed +//! `NodeRef` identity, not stringified IDs. +//! - **CS-8 (snapshot-ts-capture-start):** `snapshot.snapshot_ts` is +//! captured once at traversal start and is independent of per-node +//! `as_of`. + +use std::collections::HashSet; +use std::collections::VecDeque; +use std::future::Future; +use std::time::SystemTime; + +use anyhow::Context; +use hyperactor_mesh::introspect::NodePayload; +use hyperactor_mesh::introspect::NodeRef; + +use crate::convert::ConvertedNode; +use crate::convert::NodeKindRow; +use crate::convert::convert_node; +use crate::convert::to_micros; +use crate::schema::ActorFailureRow; +use crate::schema::ActorNodeRow; +use crate::schema::ChildRow; +use crate::schema::HostNodeRow; +use crate::schema::NodeRow; +use crate::schema::ProcNodeRow; +use crate::schema::ResolutionErrorRow; +use crate::schema::RootNodeRow; +use crate::schema::SnapshotRow; + +/// All row vectors produced by a single snapshot capture. +#[derive(Debug)] +pub struct SnapshotData { + /// Capture metadata (CS-1: exactly one per successful capture). + pub snapshot: SnapshotRow, + /// One row per resolved node (CS-2: each `NodeRef` resolved + /// once). + pub nodes: Vec, + /// One row per parent→child edge (CS-3, CS-4: edges emitted + /// per-parent, so a multiply-reachable node has multiple edges). + pub children: Vec, + /// Singleton on success — CS-2 applied to the single root entry + /// point means root is resolved exactly once. + pub root_nodes: Vec, + /// One row per resolved host node. + pub host_nodes: Vec, + /// One row per resolved proc node. + pub proc_nodes: Vec, + /// One row per resolved actor node. + pub actor_nodes: Vec, + /// One row per actor with `failure_info: Some(…)` (CV-3). + pub actor_failures: Vec, + /// One row per successfully resolved `NodeProperties::Error` + /// (CS-6: distinct from resolver transport failures). + pub resolution_errors: Vec, +} + +impl SnapshotData { + /// Fold a single converted node into the accumulator (CS-5). + /// + /// This is the only place that branches on [`NodeKindRow`]. BFS + /// calls this and nothing else. + pub fn push_converted(&mut self, converted: ConvertedNode) { + self.nodes.push(converted.node); + self.children.extend(converted.children); + if let Some(f) = converted.actor_failure { + self.actor_failures.push(f); + } + match converted.kind_row { + NodeKindRow::Root(r) => self.root_nodes.push(r), + NodeKindRow::Host(h) => self.host_nodes.push(h), + NodeKindRow::Proc(p) => self.proc_nodes.push(p), + NodeKindRow::Actor(a) => self.actor_nodes.push(a), + NodeKindRow::ResolutionError(e) => self.resolution_errors.push(e), + } + } +} + +/// Capture a full mesh snapshot by BFS from root. +/// +/// The `resolve` closure is called once per distinct `NodeRef` +/// (CS-2). Resolver transport failures abort capture immediately +/// (CS-6). Successfully resolved `NodeProperties::Error` payloads are +/// valid rows, not capture failures. +pub async fn capture_snapshot(snapshot_id: &str, resolve: F) -> anyhow::Result +where + F: Fn(&NodeRef) -> Fut, + Fut: Future>, +{ + // CS-1, CS-8: one snapshot row, timestamp at capture start. + let snapshot = SnapshotRow { + snapshot_id: snapshot_id.to_owned(), + snapshot_ts: to_micros(SystemTime::now()) + .context("failed to compute snapshot timestamp")?, + }; + + let mut data = SnapshotData { + snapshot, + nodes: Vec::new(), + children: Vec::new(), + root_nodes: Vec::new(), + host_nodes: Vec::new(), + proc_nodes: Vec::new(), + actor_nodes: Vec::new(), + actor_failures: Vec::new(), + resolution_errors: Vec::new(), + }; + + // CS-7: typed visited key. + let mut visited: HashSet = HashSet::new(); + let mut queue: VecDeque = VecDeque::new(); + queue.push_back(NodeRef::Root); + + while let Some(node_ref) = queue.pop_front() { + // Dedup on dequeue — queue may contain already-visited refs. + if !visited.insert(node_ref.clone()) { + continue; + } + + // CS-6: resolver failure is an immediate capture error. + let payload = resolve(&node_ref) + .await + .with_context(|| format!("failed to resolve {}", node_ref))?; + + // Enqueue all children unconditionally; dedup happens on + // dequeue. + for child_ref in &payload.children { + queue.push_back(child_ref.clone()); + } + + // Project and fold. + let converted = convert_node(snapshot_id, &payload) + .with_context(|| format!("failed to convert {}", node_ref))?; + data.push_converted(converted); + } + + Ok(data) +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + use std::sync::Mutex; + use std::time::Duration; + use std::time::UNIX_EPOCH; + + use hyperactor::channel::ChannelAddr; + use hyperactor::reference::ProcId; + use hyperactor_mesh::introspect::NodeProperties; + + use super::*; + + // Test fixtures + + fn test_proc_id() -> ProcId { + ProcId::with_name(ChannelAddr::Local(0), "worker") + } + + fn test_actor_id(name: &str, idx: usize) -> hyperactor::reference::ActorId { + test_proc_id().actor_id(name, idx) + } + + fn test_host_actor_id() -> hyperactor::reference::ActorId { + test_proc_id().actor_id("host_agent", 0) + } + + fn test_time() -> SystemTime { + UNIX_EPOCH + Duration::from_micros(1_700_000_000_000_000) + } + + fn make_payload( + identity: NodeRef, + properties: NodeProperties, + children: Vec, + ) -> NodePayload { + NodePayload { + identity, + properties, + children, + parent: None, + as_of: test_time(), + } + } + + /// Stub resolver backed by a map. Returns the payload for known + /// refs, error for unknown. + fn stub_resolver( + map: HashMap, + ) -> impl Fn(&NodeRef) -> std::future::Ready> { + move |node_ref: &NodeRef| { + let result = map + .get(node_ref) + .cloned() + .ok_or_else(|| anyhow::anyhow!("unknown ref: {}", node_ref)); + std::future::ready(result) + } + } + + /// Stub resolver that also records every NodeRef requested. + fn recording_stub_resolver( + map: HashMap, + log: Arc>>, + ) -> impl Fn(&NodeRef) -> std::future::Ready> { + move |node_ref: &NodeRef| { + log.lock().unwrap().push(node_ref.clone()); + let result = map + .get(node_ref) + .cloned() + .ok_or_else(|| anyhow::anyhow!("unknown ref: {}", node_ref)); + std::future::ready(result) + } + } + + // CS-1, CS-8: one snapshot row, timestamp at capture start. + #[tokio::test] + async fn test_capture_emits_one_snapshot_row() { + let map: HashMap = [( + NodeRef::Root, + make_payload( + NodeRef::Root, + NodeProperties::Root { + num_hosts: 0, + started_at: test_time(), + started_by: "test".to_owned(), + system_children: vec![], + }, + vec![], + ), + )] + .into(); + + let data = capture_snapshot("snap-1", stub_resolver(map)) + .await + .unwrap(); + + assert_eq!(data.snapshot.snapshot_id, "snap-1"); + // CS-8: snapshot_ts should be recent (within last 5 seconds), + // and distinct from node as_of. + let now_micros = to_micros(SystemTime::now()).unwrap(); + assert!( + data.snapshot.snapshot_ts > now_micros - 5_000_000, + "CS-8: snapshot_ts should be recent" + ); + assert_ne!( + data.snapshot.snapshot_ts, + to_micros(test_time()).unwrap(), + "CS-8: snapshot_ts should differ from node as_of" + ); + // CS-1: exactly one node (root only in this topology). + assert_eq!(data.nodes.len(), 1); + // Root is always singleton on success. + assert_eq!(data.root_nodes.len(), 1); + } + + // CS-2, CS-7: each NodeRef resolved at most once. + #[tokio::test] + async fn test_capture_resolves_each_node_once() { + let actor_b = NodeRef::Actor(test_actor_id("b", 0)); + let actor_a = NodeRef::Actor(test_actor_id("a", 0)); + let proc_ref = NodeRef::Proc(test_proc_id()); + let host_ref = NodeRef::Host(test_host_actor_id()); + + // actor_b reachable from both proc and actor_a. + let map: HashMap = [ + ( + NodeRef::Root, + make_payload( + NodeRef::Root, + NodeProperties::Root { + num_hosts: 1, + started_at: test_time(), + started_by: "test".to_owned(), + system_children: vec![], + }, + vec![host_ref.clone()], + ), + ), + ( + host_ref.clone(), + make_payload( + host_ref.clone(), + NodeProperties::Host { + addr: "addr".to_owned(), + num_procs: 1, + system_children: vec![], + }, + vec![proc_ref.clone()], + ), + ), + ( + proc_ref.clone(), + make_payload( + proc_ref.clone(), + NodeProperties::Proc { + proc_name: "w".to_owned(), + num_actors: 2, + system_children: vec![], + stopped_children: vec![], + stopped_retention_cap: 0, + is_poisoned: false, + failed_actor_count: 0, + }, + vec![actor_a.clone(), actor_b.clone()], + ), + ), + ( + actor_a.clone(), + make_payload( + actor_a.clone(), + NodeProperties::Actor { + actor_status: "running".to_owned(), + actor_type: "A".to_owned(), + messages_processed: 0, + created_at: None, + last_message_handler: None, + total_processing_time_us: 0, + flight_recorder: None, + is_system: false, + failure_info: None, + }, + vec![actor_b.clone()], + ), + ), + ( + actor_b.clone(), + make_payload( + actor_b.clone(), + NodeProperties::Actor { + actor_status: "running".to_owned(), + actor_type: "B".to_owned(), + messages_processed: 0, + created_at: None, + last_message_handler: None, + total_processing_time_us: 0, + flight_recorder: None, + is_system: false, + failure_info: None, + }, + vec![], + ), + ), + ] + .into(); + + let log = Arc::new(Mutex::new(Vec::new())); + let data = capture_snapshot("s", recording_stub_resolver(map, log.clone())) + .await + .unwrap(); + + // CS-2: actor_b resolved exactly once even though reachable + // from both proc and actor_a. + let resolved = log.lock().unwrap(); + let b_count = resolved.iter().filter(|r| **r == actor_b).count(); + assert_eq!(b_count, 1, "CS-2: actor_b should be resolved once"); + + // CS-7: visited key is typed NodeRef (structural — if it + // used strings, the dedup would fail for refs with different + // typed representations but same Display form). + assert_eq!(data.nodes.len(), 5); + } + + // CS-3: child edges emitted in parent enumeration order. + #[tokio::test] + async fn test_capture_emits_edges_from_each_parent() { + let a0 = NodeRef::Actor(test_actor_id("a", 0)); + let a1 = NodeRef::Actor(test_actor_id("b", 0)); + let a2 = NodeRef::Actor(test_actor_id("c", 0)); + let proc_ref = NodeRef::Proc(test_proc_id()); + let host_ref = NodeRef::Host(test_host_actor_id()); + + let map: HashMap = [ + ( + NodeRef::Root, + make_payload( + NodeRef::Root, + NodeProperties::Root { + num_hosts: 1, + started_at: test_time(), + started_by: "t".to_owned(), + system_children: vec![], + }, + vec![host_ref.clone()], + ), + ), + ( + host_ref.clone(), + make_payload( + host_ref.clone(), + NodeProperties::Host { + addr: "a".to_owned(), + num_procs: 1, + system_children: vec![], + }, + vec![proc_ref.clone()], + ), + ), + ( + proc_ref.clone(), + make_payload( + proc_ref.clone(), + NodeProperties::Proc { + proc_name: "w".to_owned(), + num_actors: 3, + system_children: vec![], + stopped_children: vec![], + stopped_retention_cap: 0, + is_poisoned: false, + failed_actor_count: 0, + }, + vec![a0.clone(), a1.clone(), a2.clone()], + ), + ), + ( + a0.clone(), + make_payload( + a0.clone(), + NodeProperties::Actor { + actor_status: "r".to_owned(), + actor_type: "A".to_owned(), + messages_processed: 0, + created_at: None, + last_message_handler: None, + total_processing_time_us: 0, + flight_recorder: None, + is_system: false, + failure_info: None, + }, + vec![], + ), + ), + ( + a1.clone(), + make_payload( + a1.clone(), + NodeProperties::Actor { + actor_status: "r".to_owned(), + actor_type: "A".to_owned(), + messages_processed: 0, + created_at: None, + last_message_handler: None, + total_processing_time_us: 0, + flight_recorder: None, + is_system: false, + failure_info: None, + }, + vec![], + ), + ), + ( + a2.clone(), + make_payload( + a2.clone(), + NodeProperties::Actor { + actor_status: "r".to_owned(), + actor_type: "A".to_owned(), + messages_processed: 0, + created_at: None, + last_message_handler: None, + total_processing_time_us: 0, + flight_recorder: None, + is_system: false, + failure_info: None, + }, + vec![], + ), + ), + ] + .into(); + + let data = capture_snapshot("s", stub_resolver(map)).await.unwrap(); + + // CS-3: proc's children should appear with correct sort keys. + let proc_id_str = proc_ref.to_string(); + let proc_children: Vec<_> = data + .children + .iter() + .filter(|c| c.parent_id == proc_id_str) + .collect(); + assert_eq!(proc_children.len(), 3); + assert_eq!(proc_children[0].child_sort_key, 0); + assert_eq!(proc_children[0].child_id, a0.to_string()); + assert_eq!(proc_children[1].child_sort_key, 1); + assert_eq!(proc_children[1].child_id, a1.to_string()); + assert_eq!(proc_children[2].child_sort_key, 2); + assert_eq!(proc_children[2].child_id, a2.to_string()); + } + + // CS-2, CS-4: one node row for a multiply-reachable node, but + // one ChildRow per parent→child edge. + #[tokio::test] + async fn test_capture_dedupes_nodes_not_edges() { + let actor_b = NodeRef::Actor(test_actor_id("b", 0)); + let actor_a = NodeRef::Actor(test_actor_id("a", 0)); + let proc_ref = NodeRef::Proc(test_proc_id()); + let host_ref = NodeRef::Host(test_host_actor_id()); + + // actor_b reachable from both proc and actor_a. + let map: HashMap = [ + ( + NodeRef::Root, + make_payload( + NodeRef::Root, + NodeProperties::Root { + num_hosts: 1, + started_at: test_time(), + started_by: "t".to_owned(), + system_children: vec![], + }, + vec![host_ref.clone()], + ), + ), + ( + host_ref.clone(), + make_payload( + host_ref.clone(), + NodeProperties::Host { + addr: "a".to_owned(), + num_procs: 1, + system_children: vec![], + }, + vec![proc_ref.clone()], + ), + ), + ( + proc_ref.clone(), + make_payload( + proc_ref.clone(), + NodeProperties::Proc { + proc_name: "w".to_owned(), + num_actors: 2, + system_children: vec![], + stopped_children: vec![], + stopped_retention_cap: 0, + is_poisoned: false, + failed_actor_count: 0, + }, + vec![actor_a.clone(), actor_b.clone()], + ), + ), + ( + actor_a.clone(), + make_payload( + actor_a.clone(), + NodeProperties::Actor { + actor_status: "r".to_owned(), + actor_type: "A".to_owned(), + messages_processed: 0, + created_at: None, + last_message_handler: None, + total_processing_time_us: 0, + flight_recorder: None, + is_system: false, + failure_info: None, + }, + vec![actor_b.clone()], + ), + ), + ( + actor_b.clone(), + make_payload( + actor_b.clone(), + NodeProperties::Actor { + actor_status: "r".to_owned(), + actor_type: "B".to_owned(), + messages_processed: 0, + created_at: None, + last_message_handler: None, + total_processing_time_us: 0, + flight_recorder: None, + is_system: false, + failure_info: None, + }, + vec![], + ), + ), + ] + .into(); + + let data = capture_snapshot("s", stub_resolver(map)).await.unwrap(); + + // CS-4: one NodeRow for actor_b. + let b_id = actor_b.to_string(); + let b_nodes: Vec<_> = data.nodes.iter().filter(|n| n.node_id == b_id).collect(); + assert_eq!(b_nodes.len(), 1, "CS-4: one NodeRow for actor_b"); + + // CS-4: two ChildRows pointing to actor_b from different parents. + let b_edges: Vec<_> = data + .children + .iter() + .filter(|c| c.child_id == b_id) + .collect(); + assert_eq!(b_edges.len(), 2, "CS-4: two ChildRows for actor_b"); + + let parents: HashSet<&str> = b_edges.iter().map(|c| c.parent_id.as_str()).collect(); + assert!( + parents.contains(proc_ref.to_string().as_str()), + "CS-4: proc should be a parent of actor_b" + ); + assert!( + parents.contains(actor_a.to_string().as_str()), + "CS-4: actor_a should be a parent of actor_b" + ); + } + + // CS-5: push_converted routes each NodeKindRow variant to the + // correct subtype vec. Table-driven across all five variants. + #[test] + fn test_push_converted_matches_kind_row() { + use crate::schema::*; + + fn node(id: &str, kind: &str) -> NodeRow { + NodeRow { + snapshot_id: "s".to_owned(), + node_id: id.to_owned(), + node_kind: kind.to_owned(), + as_of: 0, + } + } + + let cases: Vec<(ConvertedNode, &str)> = vec![ + ( + ConvertedNode { + node: node("root", "root"), + kind_row: NodeKindRow::Root(RootNodeRow { + snapshot_id: "s".to_owned(), + node_id: "root".to_owned(), + num_hosts: 0, + started_at: 0, + started_by: "t".to_owned(), + }), + actor_failure: None, + children: vec![ChildRow { + snapshot_id: "s".to_owned(), + parent_id: "root".to_owned(), + child_id: "h".to_owned(), + child_sort_key: 0, + is_system: false, + is_stopped: false, + }], + }, + "Root", + ), + ( + ConvertedNode { + node: node("h", "host"), + kind_row: NodeKindRow::Host(HostNodeRow { + snapshot_id: "s".to_owned(), + node_id: "h".to_owned(), + addr: "a".to_owned(), + host_num_procs: 0, + }), + actor_failure: None, + children: vec![], + }, + "Host", + ), + ( + ConvertedNode { + node: node("p", "proc"), + kind_row: NodeKindRow::Proc(ProcNodeRow { + snapshot_id: "s".to_owned(), + node_id: "p".to_owned(), + proc_name: "w".to_owned(), + num_actors: 0, + stopped_retention_cap: 0, + is_poisoned: false, + failed_actor_count: 0, + }), + actor_failure: None, + children: vec![], + }, + "Proc", + ), + ( + ConvertedNode { + node: node("a", "actor"), + kind_row: NodeKindRow::Actor(ActorNodeRow { + snapshot_id: "s".to_owned(), + node_id: "a".to_owned(), + actor_status: "failed".to_owned(), + actor_type: "A".to_owned(), + messages_processed: 0, + created_at: None, + last_message_handler: None, + total_processing_time_us: 0, + is_system: false, + }), + actor_failure: Some(ActorFailureRow { + snapshot_id: "s".to_owned(), + node_id: "a".to_owned(), + failure_error_message: "boom".to_owned(), + failure_root_cause_actor: "a".to_owned(), + failure_root_cause_name: None, + failure_occurred_at: 0, + failure_is_propagated: false, + }), + children: vec![], + }, + "Actor", + ), + ( + ConvertedNode { + node: node("e", "error"), + kind_row: NodeKindRow::ResolutionError(ResolutionErrorRow { + snapshot_id: "s".to_owned(), + node_id: "e".to_owned(), + error_code: "not_found".to_owned(), + error_message: "gone".to_owned(), + }), + actor_failure: None, + children: vec![], + }, + "ResolutionError", + ), + ]; + + let mut data = SnapshotData { + snapshot: SnapshotRow { + snapshot_id: "s".to_owned(), + snapshot_ts: 0, + }, + nodes: Vec::new(), + children: Vec::new(), + root_nodes: Vec::new(), + host_nodes: Vec::new(), + proc_nodes: Vec::new(), + actor_nodes: Vec::new(), + actor_failures: Vec::new(), + resolution_errors: Vec::new(), + }; + + for (converted, label) in cases { + data.push_converted(converted); + let subtype_count = match label { + "Root" => data.root_nodes.len(), + "Host" => data.host_nodes.len(), + "Proc" => data.proc_nodes.len(), + "Actor" => data.actor_nodes.len(), + "ResolutionError" => data.resolution_errors.len(), + _ => unreachable!(), + }; + assert_eq!( + subtype_count, 1, + "CS-5: {label} variant should route to its subtype vec" + ); + } + + assert_eq!(data.nodes.len(), 5); + assert_eq!(data.children.len(), 1); // from root + assert_eq!(data.actor_failures.len(), 1); // from actor + } + + // CS-6: resolver failure aborts capture. + #[tokio::test] + async fn test_capture_aborts_on_resolver_error() { + let host_ref = NodeRef::Host(test_host_actor_id()); + + // Root has a child, but the child resolver fails. + let map: HashMap = [( + NodeRef::Root, + make_payload( + NodeRef::Root, + NodeProperties::Root { + num_hosts: 1, + started_at: test_time(), + started_by: "t".to_owned(), + system_children: vec![], + }, + vec![host_ref], + ), + )] + .into(); + + let result = capture_snapshot("s", stub_resolver(map)).await; + assert!( + result.is_err(), + "CS-6: capture should fail when resolver returns Err" + ); + } + + // CS-6: successfully resolved NodeProperties::Error is a valid + // row. + #[tokio::test] + async fn test_capture_keeps_domain_error_payloads() { + let error_ref = NodeRef::Actor(test_actor_id("err", 0)); + let host_ref = NodeRef::Host(test_host_actor_id()); + let proc_ref = NodeRef::Proc(test_proc_id()); + + let map: HashMap = [ + ( + NodeRef::Root, + make_payload( + NodeRef::Root, + NodeProperties::Root { + num_hosts: 1, + started_at: test_time(), + started_by: "t".to_owned(), + system_children: vec![], + }, + vec![host_ref.clone()], + ), + ), + ( + host_ref.clone(), + make_payload( + host_ref.clone(), + NodeProperties::Host { + addr: "a".to_owned(), + num_procs: 1, + system_children: vec![], + }, + vec![proc_ref.clone()], + ), + ), + ( + proc_ref.clone(), + make_payload( + proc_ref.clone(), + NodeProperties::Proc { + proc_name: "w".to_owned(), + num_actors: 1, + system_children: vec![], + stopped_children: vec![], + stopped_retention_cap: 0, + is_poisoned: false, + failed_actor_count: 0, + }, + vec![error_ref.clone()], + ), + ), + ( + error_ref.clone(), + make_payload( + error_ref.clone(), + NodeProperties::Error { + code: "not_found".to_owned(), + message: "child not found".to_owned(), + }, + vec![], + ), + ), + ] + .into(); + + let data = capture_snapshot("s", stub_resolver(map)).await.unwrap(); + + // CS-6: capture succeeds despite an Error node. + assert_eq!(data.nodes.len(), 4); + assert_eq!( + data.resolution_errors.len(), + 1, + "CS-6: domain error should produce ResolutionErrorRow" + ); + assert_eq!(data.resolution_errors[0].error_code, "not_found"); + } +} diff --git a/monarch_introspection_snapshot/src/convert.rs b/monarch_introspection_snapshot/src/convert.rs new file mode 100644 index 000000000..eda9492b6 --- /dev/null +++ b/monarch_introspection_snapshot/src/convert.rs @@ -0,0 +1,818 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +//! Conversion from typed mesh-admin domain types to relational row +//! definitions. +//! +//! The entry point is [`convert_node`], which takes a single +//! [`NodePayload`] and produces a [`ConvertedNode`] — a typed +//! per-node projection that the BFS capture layer can fold into a +//! full snapshot. +//! +//! # Conversion invariants (CV-*) +//! +//! - **CV-1 (exactly one node row):** Each `NodePayload` converts to +//! exactly one [`NodeRow`]. +//! - **CV-2 (exactly one subtype row):** Each `NodePayload` converts +//! to exactly one [`NodeKindRow`] variant matching +//! `NodeRow.node_kind`. Enforced by the type system. +//! - **CV-3 (failure iff actor with failure_info):** `actor_failure` +//! is `Some` iff `NodeProperties::Actor { failure_info: Some(_) }`. +//! - **CV-4 (child sort key = enumeration order):** `ChildRow` at +//! position `i` in `payload.children` has `child_sort_key = i`. +//! - **CV-5 (child classification from parent sets):** `is_system` +//! and `is_stopped` are derived solely from the parent's typed +//! `system_children` / `stopped_children` sets via `HashSet` +//! lookup. +//! - **CV-6 (canonical boundary crossing):** Typed refs cross the SQL +//! boundary only via `.to_string()`. Times cross only via +//! [`to_micros`]. No ad-hoc formatting. +//! - **CV-7 (parent not materialized):** `convert_node` does not read +//! or persist `NodePayload.parent`. Parenthood in the snapshot +//! schema is represented only through [`ChildRow`] edges. + +use std::collections::HashSet; +use std::time::SystemTime; +use std::time::UNIX_EPOCH; + +use anyhow::Context; +use hyperactor_mesh::introspect::FailureInfo; +use hyperactor_mesh::introspect::NodePayload; +use hyperactor_mesh::introspect::NodeProperties; +use hyperactor_mesh::introspect::NodeRef; + +use crate::schema::ActorFailureRow; +use crate::schema::ActorNodeRow; +use crate::schema::ChildRow; +use crate::schema::HostNodeRow; +use crate::schema::NodeRow; +use crate::schema::ProcNodeRow; +use crate::schema::ResolutionErrorRow; +use crate::schema::RootNodeRow; + +// Conversion algebra + +/// Typed per-node projection. Encodes the exact-one-subtype invariant +/// (CV-2) in the type system: `kind_row` is an enum with one variant +/// per node kind, and `node.node_kind` is derived from the same +/// match. +#[derive(Debug, Clone, PartialEq)] +pub struct ConvertedNode { + /// Base row present for every node (CV-1). + pub node: NodeRow, + /// Exactly one kind-specific row, matching `node.node_kind` (CV-2). + pub kind_row: NodeKindRow, + /// Failure detail, present only for actor nodes with + /// `failure_info: Some(…)` (CV-3). + pub actor_failure: Option, + /// One [`ChildRow`] per entry in `payload.children`, in + /// enumeration order (CV-4). + pub children: Vec, +} + +/// Exactly one subtype row per converted node. +#[derive(Debug, Clone, PartialEq)] +pub enum NodeKindRow { + /// Root node properties. + Root(RootNodeRow), + /// Host node properties. + Host(HostNodeRow), + /// Proc node properties. + Proc(ProcNodeRow), + /// Actor node properties. + Actor(ActorNodeRow), + /// Resolution error properties. + ResolutionError(ResolutionErrorRow), +} + +impl NodeKindRow { + /// The `node_kind` discriminator string stored in [`NodeRow`]. + /// Derived from the same match that produces the variant, so the + /// string and the variant can never disagree. + pub fn kind_str(&self) -> &'static str { + match self { + Self::Root(_) => "root", + Self::Host(_) => "host", + Self::Proc(_) => "proc", + Self::Actor(_) => "actor", + Self::ResolutionError(_) => "error", + } + } +} + +// Boundary helpers (CV-6) + +/// Convert a `SystemTime` to microseconds since epoch. +/// +/// Fallible: pre-epoch times and post-2554 overflow produce errors +/// rather than silent truncation. +pub(crate) fn to_micros(t: SystemTime) -> anyhow::Result { + let micros = t + .duration_since(UNIX_EPOCH) + .context("SystemTime before UNIX epoch")? + .as_micros(); + i64::try_from(micros).context("SystemTime microseconds overflow i64") +} + +fn to_opt_micros(t: Option) -> anyhow::Result> { + t.map(to_micros).transpose() +} + +// Child classification (CV-5) + +/// Link-level classification sets extracted from a parent's +/// `NodeProperties`. Built once per payload, then queried per child. +struct ChildClasses<'a> { + system: HashSet<&'a NodeRef>, + stopped: HashSet<&'a NodeRef>, +} + +impl<'a> ChildClasses<'a> { + fn from_properties(properties: &'a NodeProperties) -> Self { + match properties { + NodeProperties::Root { + system_children, .. + } + | NodeProperties::Host { + system_children, .. + } => Self { + system: system_children.iter().collect(), + stopped: HashSet::new(), + }, + NodeProperties::Proc { + system_children, + stopped_children, + .. + } => Self { + system: system_children.iter().collect(), + stopped: stopped_children.iter().collect(), + }, + NodeProperties::Actor { .. } | NodeProperties::Error { .. } => Self { + system: HashSet::new(), + stopped: HashSet::new(), + }, + } + } + + fn classify(&self, child: &NodeRef) -> (bool, bool) { + (self.system.contains(child), self.stopped.contains(child)) + } +} + +fn build_child_rows( + snapshot_id: &str, + parent_id: &str, + children: &[NodeRef], + classes: &ChildClasses<'_>, +) -> anyhow::Result> { + children + .iter() + .enumerate() + .map(|(i, child)| { + let (is_system, is_stopped) = classes.classify(child); + Ok(ChildRow { + snapshot_id: snapshot_id.to_owned(), + parent_id: parent_id.to_owned(), + child_id: child.to_string(), + child_sort_key: i64::try_from(i).context("child index overflow i64")?, + is_system, + is_stopped, + }) + }) + .collect() +} + +// Entry point (CV-1, CV-2, CV-3, CV-4, CV-5, CV-6) + +/// Convert a single [`NodePayload`] into its relational row +/// projection. +/// +/// `payload.parent` is intentionally not stored — the snapshot schema +/// represents parenthood through [`ChildRow`] edges, not a column on +/// the node. Parent reconstruction is query-side and contextual +/// (join through `ChildRow`) rather than a stored fact. +pub fn convert_node(snapshot_id: &str, payload: &NodePayload) -> anyhow::Result { + let node_id = payload.identity.to_string(); + + // Subtype row + optional failure. Done first because NodeRow.node_kind + // is derived from the same match via kind_row.kind_str(). + let (kind_row, actor_failure) = match &payload.properties { + NodeProperties::Root { + num_hosts, + started_at, + started_by, + .. + } => { + let row = RootNodeRow { + snapshot_id: snapshot_id.to_owned(), + node_id: node_id.clone(), + num_hosts: i64::try_from(*num_hosts).context("num_hosts overflow i64")?, + started_at: to_micros(*started_at)?, + started_by: started_by.clone(), + }; + (NodeKindRow::Root(row), None) + } + NodeProperties::Host { + addr, num_procs, .. + } => { + let row = HostNodeRow { + snapshot_id: snapshot_id.to_owned(), + node_id: node_id.clone(), + addr: addr.clone(), + host_num_procs: i64::try_from(*num_procs).context("num_procs overflow i64")?, + }; + (NodeKindRow::Host(row), None) + } + NodeProperties::Proc { + proc_name, + num_actors, + stopped_retention_cap, + is_poisoned, + failed_actor_count, + .. + } => { + let row = ProcNodeRow { + snapshot_id: snapshot_id.to_owned(), + node_id: node_id.clone(), + proc_name: proc_name.clone(), + num_actors: i64::try_from(*num_actors).context("num_actors overflow i64")?, + stopped_retention_cap: i64::try_from(*stopped_retention_cap) + .context("stopped_retention_cap overflow i64")?, + is_poisoned: *is_poisoned, + failed_actor_count: i64::try_from(*failed_actor_count) + .context("failed_actor_count overflow i64")?, + }; + (NodeKindRow::Proc(row), None) + } + NodeProperties::Actor { + actor_status, + actor_type, + messages_processed, + created_at, + last_message_handler, + total_processing_time_us, + is_system, + failure_info, + .. + } => { + let actor_row = ActorNodeRow { + snapshot_id: snapshot_id.to_owned(), + node_id: node_id.clone(), + actor_status: actor_status.clone(), + actor_type: actor_type.clone(), + messages_processed: i64::try_from(*messages_processed) + .context("messages_processed overflow i64")?, + created_at: to_opt_micros(*created_at)?, + last_message_handler: last_message_handler.clone(), + total_processing_time_us: i64::try_from(*total_processing_time_us) + .context("total_processing_time_us overflow i64")?, + is_system: *is_system, + }; + let failure = failure_info + .as_ref() + .map(|fi| convert_failure(snapshot_id, &node_id, fi)) + .transpose()?; + (NodeKindRow::Actor(actor_row), failure) + } + NodeProperties::Error { code, message } => { + let row = ResolutionErrorRow { + snapshot_id: snapshot_id.to_owned(), + node_id: node_id.clone(), + error_code: code.clone(), + error_message: message.clone(), + }; + (NodeKindRow::ResolutionError(row), None) + } + }; + + let node = NodeRow { + snapshot_id: snapshot_id.to_owned(), + node_id: node_id.clone(), + node_kind: kind_row.kind_str().to_owned(), + as_of: to_micros(payload.as_of)?, + }; + + // Child edges with classification from parent's typed sets. + let classes = ChildClasses::from_properties(&payload.properties); + let children = build_child_rows(snapshot_id, &node_id, &payload.children, &classes)?; + + Ok(ConvertedNode { + node, + kind_row, + actor_failure, + children, + }) +} + +fn convert_failure( + snapshot_id: &str, + node_id: &str, + fi: &FailureInfo, +) -> anyhow::Result { + Ok(ActorFailureRow { + snapshot_id: snapshot_id.to_owned(), + node_id: node_id.to_owned(), + failure_error_message: fi.error_message.clone(), + failure_root_cause_actor: fi.root_cause_actor.to_string(), + failure_root_cause_name: fi.root_cause_name.clone(), + failure_occurred_at: to_micros(fi.occurred_at)?, + failure_is_propagated: fi.is_propagated, + }) +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use hyperactor::channel::ChannelAddr; + use hyperactor::reference::ProcId; + + use super::*; + + // Test fixtures + + fn test_proc_id() -> ProcId { + ProcId::with_name(ChannelAddr::Local(0), "worker") + } + + fn test_actor_id() -> hyperactor::reference::ActorId { + test_proc_id().actor_id("actor", 0) + } + + fn test_host_actor_id() -> hyperactor::reference::ActorId { + test_proc_id().actor_id("host_agent", 0) + } + + fn test_time() -> SystemTime { + UNIX_EPOCH + Duration::from_micros(1_700_000_000_000_000) + } + + fn test_time_2() -> SystemTime { + UNIX_EPOCH + Duration::from_micros(1_700_000_001_000_000) + } + + // CV-1, CV-2, CV-6: Root variant produces correct NodeRow and + // RootNodeRow. + #[test] + fn test_convert_root() { + let payload = NodePayload { + identity: NodeRef::Root, + properties: NodeProperties::Root { + num_hosts: 2, + started_at: test_time(), + started_by: "test_user".to_owned(), + system_children: vec![], + }, + children: vec![NodeRef::Host(test_host_actor_id())], + parent: None, + as_of: test_time(), + }; + + let result = convert_node("snap-1", &payload).unwrap(); + + assert_eq!(result.node.node_id, "root"); + assert_eq!(result.node.node_kind, "root"); + assert_eq!(result.node.snapshot_id, "snap-1"); + assert_eq!(result.node.as_of, 1_700_000_000_000_000); + + let NodeKindRow::Root(root) = &result.kind_row else { + panic!("expected Root variant"); + }; + assert_eq!(root.num_hosts, 2); + assert_eq!(root.started_at, 1_700_000_000_000_000); + assert_eq!(root.started_by, "test_user"); + + assert!(result.actor_failure.is_none()); + assert_eq!(result.children.len(), 1); + assert_eq!( + result.children[0].child_id, + format!("host:{}", test_host_actor_id()) + ); + } + + // CV-1, CV-2, CV-6: Host variant. + #[test] + fn test_convert_host() { + let payload = NodePayload { + identity: NodeRef::Host(test_host_actor_id()), + properties: NodeProperties::Host { + addr: "10.0.0.1:8080".to_owned(), + num_procs: 3, + system_children: vec![], + }, + children: vec![NodeRef::Proc(test_proc_id())], + parent: Some(NodeRef::Root), + as_of: test_time(), + }; + + let result = convert_node("snap-1", &payload).unwrap(); + + assert_eq!(result.node.snapshot_id, "snap-1"); + assert_eq!( + result.node.node_id, + format!("host:{}", test_host_actor_id()) + ); + assert_eq!(result.node.node_kind, "host"); + let NodeKindRow::Host(host) = &result.kind_row else { + panic!("expected Host variant"); + }; + assert_eq!(host.snapshot_id, "snap-1"); + assert_eq!(host.node_id, result.node.node_id); + assert_eq!(host.addr, "10.0.0.1:8080"); + assert_eq!(host.host_num_procs, 3); + assert!(result.actor_failure.is_none()); + assert_eq!(result.children.len(), 1); + } + + // CV-1, CV-2, CV-6: Proc variant. + #[test] + fn test_convert_proc() { + let payload = NodePayload { + identity: NodeRef::Proc(test_proc_id()), + properties: NodeProperties::Proc { + proc_name: "worker".to_owned(), + num_actors: 5, + system_children: vec![], + stopped_children: vec![], + stopped_retention_cap: 100, + is_poisoned: false, + failed_actor_count: 1, + }, + children: vec![NodeRef::Actor(test_actor_id())], + parent: Some(NodeRef::Host(test_host_actor_id())), + as_of: test_time(), + }; + + let result = convert_node("snap-1", &payload).unwrap(); + + assert_eq!(result.node.snapshot_id, "snap-1"); + assert_eq!(result.node.node_id, test_proc_id().to_string()); + assert_eq!(result.node.node_kind, "proc"); + let NodeKindRow::Proc(proc_row) = &result.kind_row else { + panic!("expected Proc variant"); + }; + assert_eq!(proc_row.snapshot_id, "snap-1"); + assert_eq!(proc_row.node_id, result.node.node_id); + assert_eq!(proc_row.proc_name, "worker"); + assert_eq!(proc_row.num_actors, 5); + assert_eq!(proc_row.stopped_retention_cap, 100); + assert!(!proc_row.is_poisoned); + assert_eq!(proc_row.failed_actor_count, 1); + assert!(result.actor_failure.is_none()); + } + + // CV-1, CV-2, CV-3 (None case), CV-6: Actor without failure. + #[test] + fn test_convert_actor_no_failure() { + let payload = NodePayload { + identity: NodeRef::Actor(test_actor_id()), + properties: NodeProperties::Actor { + actor_status: "running".to_owned(), + actor_type: "MyActor".to_owned(), + messages_processed: 42, + created_at: Some(test_time()), + last_message_handler: Some("handle_msg".to_owned()), + total_processing_time_us: 1500, + flight_recorder: None, + is_system: false, + failure_info: None, + }, + children: vec![], + parent: Some(NodeRef::Proc(test_proc_id())), + as_of: test_time(), + }; + + let result = convert_node("snap-1", &payload).unwrap(); + + assert_eq!(result.node.node_kind, "actor"); + let NodeKindRow::Actor(actor) = &result.kind_row else { + panic!("expected Actor variant"); + }; + assert_eq!(actor.actor_status, "running"); + assert_eq!(actor.messages_processed, 42); + assert_eq!(actor.created_at, Some(1_700_000_000_000_000)); + assert_eq!(actor.last_message_handler.as_deref(), Some("handle_msg")); + assert_eq!(actor.total_processing_time_us, 1500); + assert!(!actor.is_system); + assert!(result.actor_failure.is_none()); + } + + // CV-1, CV-2, CV-3 (Some case), CV-6: Actor with failure. + #[test] + fn test_convert_actor_with_failure() { + let payload = NodePayload { + identity: NodeRef::Actor(test_actor_id()), + properties: NodeProperties::Actor { + actor_status: "failed".to_owned(), + actor_type: "MyActor".to_owned(), + messages_processed: 10, + created_at: None, + last_message_handler: None, + total_processing_time_us: 500, + flight_recorder: Some("trace-abc".to_owned()), + is_system: true, + failure_info: Some(FailureInfo { + error_message: "boom".to_owned(), + root_cause_actor: test_actor_id(), + root_cause_name: Some("root_actor".to_owned()), + occurred_at: test_time_2(), + is_propagated: true, + }), + }, + children: vec![], + parent: Some(NodeRef::Proc(test_proc_id())), + as_of: test_time(), + }; + + let result = convert_node("snap-1", &payload).unwrap(); + + assert_eq!(result.node.node_kind, "actor"); + let failure = result.actor_failure.as_ref().expect("expected failure row"); + assert_eq!(failure.failure_error_message, "boom"); + assert_eq!( + failure.failure_root_cause_actor, + test_actor_id().to_string() + ); + assert_eq!( + failure.failure_root_cause_name.as_deref(), + Some("root_actor") + ); + assert_eq!(failure.failure_occurred_at, 1_700_000_001_000_000); + assert!(failure.failure_is_propagated); + } + + // CV-1, CV-2: Error variant. + #[test] + fn test_convert_error() { + let payload = NodePayload { + identity: NodeRef::Actor(test_actor_id()), + properties: NodeProperties::Error { + code: "not_found".to_owned(), + message: "actor not found".to_owned(), + }, + children: vec![], + parent: None, + as_of: test_time(), + }; + + let result = convert_node("snap-1", &payload).unwrap(); + + assert_eq!(result.node.node_kind, "error"); + let NodeKindRow::ResolutionError(err) = &result.kind_row else { + panic!("expected ResolutionError variant"); + }; + assert_eq!(err.error_code, "not_found"); + assert_eq!(err.error_message, "actor not found"); + assert!(result.actor_failure.is_none()); + } + + // CV-5: Proc with mixed system/stopped/regular children. + #[test] + fn test_child_classification_mixed() { + let regular = test_proc_id().actor_id("regular", 0); + let sys_only = test_proc_id().actor_id("sys_actor", 0); + let stopped_only = test_proc_id().actor_id("stopped_actor", 0); + let sys_and_stopped = test_proc_id().actor_id("both", 0); + + let children = vec![ + NodeRef::Actor(regular.clone()), + NodeRef::Actor(sys_only.clone()), + NodeRef::Actor(stopped_only.clone()), + NodeRef::Actor(sys_and_stopped.clone()), + ]; + + let payload = NodePayload { + identity: NodeRef::Proc(test_proc_id()), + properties: NodeProperties::Proc { + proc_name: "worker".to_owned(), + num_actors: 4, + system_children: vec![ + NodeRef::Actor(sys_only), + NodeRef::Actor(sys_and_stopped.clone()), + ], + stopped_children: vec![ + NodeRef::Actor(stopped_only), + NodeRef::Actor(sys_and_stopped), + ], + stopped_retention_cap: 10, + is_poisoned: false, + failed_actor_count: 0, + }, + children, + parent: Some(NodeRef::Host(test_host_actor_id())), + as_of: test_time(), + }; + + let result = convert_node("snap-1", &payload).unwrap(); + assert_eq!(result.children.len(), 4); + + // regular: neither system nor stopped + assert!(!result.children[0].is_system); + assert!(!result.children[0].is_stopped); + + // sys_only: system but not stopped + assert!(result.children[1].is_system); + assert!(!result.children[1].is_stopped); + + // stopped_only: stopped but not system + assert!(!result.children[2].is_system); + assert!(result.children[2].is_stopped); + + // sys_and_stopped: both + assert!(result.children[3].is_system); + assert!(result.children[3].is_stopped); + } + + // CV-4: child_sort_key is enumeration order. + #[test] + fn test_child_sort_key_is_enumeration_order() { + let a0 = test_proc_id().actor_id("a", 0); + let a1 = test_proc_id().actor_id("b", 0); + let a2 = test_proc_id().actor_id("c", 0); + + let payload = NodePayload { + identity: NodeRef::Proc(test_proc_id()), + properties: NodeProperties::Proc { + proc_name: "w".to_owned(), + num_actors: 3, + system_children: vec![], + stopped_children: vec![], + stopped_retention_cap: 0, + is_poisoned: false, + failed_actor_count: 0, + }, + children: vec![NodeRef::Actor(a0), NodeRef::Actor(a1), NodeRef::Actor(a2)], + parent: None, + as_of: test_time(), + }; + + let result = convert_node("snap-1", &payload).unwrap(); + assert_eq!(result.children[0].child_sort_key, 0); + assert_eq!(result.children[1].child_sort_key, 1); + assert_eq!(result.children[2].child_sort_key, 2); + } + + // CV-6: to_micros produces correct microseconds. + #[test] + fn test_to_micros_known_epoch() { + let t = UNIX_EPOCH + Duration::from_micros(1_234_567_890); + assert_eq!(to_micros(t).unwrap(), 1_234_567_890); + } + + // CV-6: to_micros rejects pre-epoch SystemTime. + #[test] + fn test_to_micros_pre_epoch_errors() { + let t = UNIX_EPOCH - Duration::from_secs(1); + let err = to_micros(t).unwrap_err(); + assert!( + format!("{err:#}").contains("before UNIX epoch"), + "expected pre-epoch error, got: {err:#}" + ); + } + + // CV-2: node_kind string matches kind_row variant for every type. + #[test] + fn test_node_kind_derived_from_match() { + let payloads = [ + ( + "root", + NodeProperties::Root { + num_hosts: 0, + started_at: test_time(), + started_by: String::new(), + system_children: vec![], + }, + ), + ( + "host", + NodeProperties::Host { + addr: String::new(), + num_procs: 0, + system_children: vec![], + }, + ), + ( + "proc", + NodeProperties::Proc { + proc_name: String::new(), + num_actors: 0, + system_children: vec![], + stopped_children: vec![], + stopped_retention_cap: 0, + is_poisoned: false, + failed_actor_count: 0, + }, + ), + ( + "actor", + NodeProperties::Actor { + actor_status: String::new(), + actor_type: String::new(), + messages_processed: 0, + created_at: None, + last_message_handler: None, + total_processing_time_us: 0, + flight_recorder: None, + is_system: false, + failure_info: None, + }, + ), + ( + "error", + NodeProperties::Error { + code: String::new(), + message: String::new(), + }, + ), + ]; + + for (expected_kind, props) in payloads { + let payload = NodePayload { + identity: NodeRef::Root, + properties: props, + children: vec![], + parent: None, + as_of: test_time(), + }; + let result = convert_node("s", &payload).unwrap(); + assert_eq!( + result.node.node_kind, + result.kind_row.kind_str(), + "node_kind and kind_row disagree for {expected_kind}" + ); + assert_eq!(result.node.node_kind, expected_kind); + } + } + + // CV-7: parenthood is represented only through ChildRow edges. + #[test] + fn test_parent_field_ignored() { + let make = |parent: Option| NodePayload { + identity: NodeRef::Actor(test_actor_id()), + properties: NodeProperties::Actor { + actor_status: "running".to_owned(), + actor_type: "A".to_owned(), + messages_processed: 0, + created_at: None, + last_message_handler: None, + total_processing_time_us: 0, + flight_recorder: None, + is_system: false, + failure_info: None, + }, + children: vec![], + parent, + as_of: test_time(), + }; + + let a = convert_node("s", &make(None)).unwrap(); + let b = convert_node("s", &make(Some(NodeRef::Proc(test_proc_id())))).unwrap(); + + assert_eq!(a, b); + } + + // CV-5: empty classification sets produce false link flags. + #[test] + fn test_empty_classification_sets_produce_false_flags() { + let child = NodeRef::Host(test_host_actor_id()); + + // Root with empty system_children. + let root_payload = NodePayload { + identity: NodeRef::Root, + properties: NodeProperties::Root { + num_hosts: 1, + started_at: test_time(), + started_by: "u".to_owned(), + system_children: vec![], + }, + children: vec![child.clone()], + parent: None, + as_of: test_time(), + }; + let root_result = convert_node("s", &root_payload).unwrap(); + assert!(!root_result.children[0].is_system); + assert!(!root_result.children[0].is_stopped); + + // Host with empty system_children. + let proc_child = NodeRef::Proc(test_proc_id()); + let host_payload = NodePayload { + identity: NodeRef::Host(test_host_actor_id()), + properties: NodeProperties::Host { + addr: "addr".to_owned(), + num_procs: 1, + system_children: vec![], + }, + children: vec![proc_child], + parent: Some(NodeRef::Root), + as_of: test_time(), + }; + let host_result = convert_node("s", &host_payload).unwrap(); + assert!(!host_result.children[0].is_system); + assert!(!host_result.children[0].is_stopped); + } +} diff --git a/monarch_introspection_snapshot/src/lib.rs b/monarch_introspection_snapshot/src/lib.rs index a4a4036ac..91d0c6065 100644 --- a/monarch_introspection_snapshot/src/lib.rs +++ b/monarch_introspection_snapshot/src/lib.rs @@ -8,11 +8,16 @@ //! Bridge crate for snapshot-based mesh introspection. //! -//! Will sit above `hyperactor_mesh` (which owns live mesh topology) +//! Sits above `hyperactor_mesh` (which owns live mesh topology) //! and `monarch_distributed_telemetry` (which owns table storage). //! -//! Currently provides the relational row schema ([`schema`] module) -//! that defines the Arrow table shapes for mesh snapshots. Capture -//! and ingestion are planned but not yet implemented. +//! Currently provides: +//! - [`schema`] — relational row definitions (Arrow table shapes) +//! - [`convert`] — `NodePayload` → row projection (`ConvertedNode`) +//! - [`capture`] — BFS capture of a mesh topology into `SnapshotData` +//! +//! Ingestion orchestration is planned but not yet implemented. +pub mod capture; +pub mod convert; pub mod schema; diff --git a/monarch_introspection_snapshot/src/schema.rs b/monarch_introspection_snapshot/src/schema.rs index aa1f5c1d0..7630ebdc6 100644 --- a/monarch_introspection_snapshot/src/schema.rs +++ b/monarch_introspection_snapshot/src/schema.rs @@ -43,10 +43,29 @@ //! though the in-memory model is typed. Conversion into these string IDs //! happens at the boundary when rows are produced. //! -//! Another important choice is that [`ChildRow`] models a rooted DAG, not -//! a strict tree: a node may legitimately appear under more than one -//! parent. Queries should therefore treat `children` as an edge table, -//! not assume a unique parent pointer for every non-root node. +//! Another important choice is that [`ChildRow`] models the admin +//! navigation graph, not a single ownership relation. The same node +//! may legitimately appear under more than one parent when different +//! navigation relations are projected into the same edge table. +//! +//! Example from the TUI: +//! +//! ```text +//! │ ▼ unix:@EEgtjsbaWCOCPl3zsDeX4KNH (2 procs) +//! │ ├─ ▼ local (447 actors: 5 system, 442 user) +//! │ ├─ ▼ sieve-13dKVEf934hK[0] +//! │ └─ ▼ sieve-13dKVEf934hK[1] +//! │▸ └─ ▶ sieve-13dKVEf934hK[2] +//! │ ├─ ▶ sieve-13dKVEf934hK[1] +//! │ ├─ ▶ sieve-13dKVEf934hK[2] +//! ``` +//! +//! Here `sieve-...[2]` appears both as a direct child of proc `local` +//! and as a child of actor `sieve-...[1]`. Within each relation the +//! parent is unique, but the snapshot schema currently stores both +//! relations in the same [`ChildRow`] table. Queries should therefore +//! treat [`ChildRow`] as an edge table of parent→child links, not +//! assume one globally unique parent for every non-root node. //! //! # Cardinality invariants //! @@ -86,7 +105,7 @@ use monarch_record_batch::RecordBatchRow; // buffer (zero rows remain after drain). /// One row per snapshot capture. -#[derive(RecordBatchRow)] +#[derive(Debug, Clone, PartialEq, RecordBatchRow)] pub struct SnapshotRow { /// PK. Opaque UUID generated at capture time. pub snapshot_id: String, @@ -98,7 +117,7 @@ pub struct SnapshotRow { /// /// Tree structure is expressed solely through [`ChildRow`] — there /// is no parent column here. -#[derive(RecordBatchRow)] +#[derive(Debug, Clone, PartialEq, RecordBatchRow)] pub struct NodeRow { /// PK component. FK → [`SnapshotRow::snapshot_id`]. pub snapshot_id: String, @@ -121,7 +140,7 @@ pub struct NodeRow { /// than because of a cycle. The TUI already accounts for this by /// disambiguating appearances by `(reference, depth)` and rejecting /// only true ancestor cycles. -#[derive(RecordBatchRow)] +#[derive(Debug, Clone, PartialEq, RecordBatchRow)] pub struct ChildRow { /// PK component. FK → [`SnapshotRow::snapshot_id`]. pub snapshot_id: String, @@ -140,7 +159,7 @@ pub struct ChildRow { } /// Subtype table for root nodes. PK: `(snapshot_id, node_id)`. -#[derive(RecordBatchRow)] +#[derive(Debug, Clone, PartialEq, RecordBatchRow)] pub struct RootNodeRow { /// PK component. FK → `Node(snapshot_id, node_id)`. pub snapshot_id: String, @@ -155,7 +174,7 @@ pub struct RootNodeRow { } /// Subtype table for host nodes. PK: `(snapshot_id, node_id)`. -#[derive(RecordBatchRow)] +#[derive(Debug, Clone, PartialEq, RecordBatchRow)] pub struct HostNodeRow { /// PK component. FK → `Node(snapshot_id, node_id)`. pub snapshot_id: String, @@ -168,7 +187,7 @@ pub struct HostNodeRow { } /// Subtype table for proc nodes. PK: `(snapshot_id, node_id)`. -#[derive(RecordBatchRow)] +#[derive(Debug, Clone, PartialEq, RecordBatchRow)] pub struct ProcNodeRow { /// PK component. FK → `Node(snapshot_id, node_id)`. pub snapshot_id: String, @@ -192,7 +211,7 @@ pub struct ProcNodeRow { /// `NodeProperties::Actor::is_system` — whether the actor was spawned /// as a system actor. Distinct from [`ChildRow::is_system`], which /// records whether the parent classifies the child link as system. -#[derive(RecordBatchRow)] +#[derive(Debug, Clone, PartialEq, RecordBatchRow)] pub struct ActorNodeRow { /// PK component. FK → `Node(snapshot_id, node_id)`. pub snapshot_id: String, @@ -222,7 +241,7 @@ pub struct ActorNodeRow { /// /// Captures the failure state as observed at snapshot time only — not /// intended to model failure history. -#[derive(RecordBatchRow)] +#[derive(Debug, Clone, PartialEq, RecordBatchRow)] pub struct ActorFailureRow { /// PK component. FK → `ActorNode(snapshot_id, node_id)`. pub snapshot_id: String, @@ -244,7 +263,7 @@ pub struct ActorFailureRow { /// Subtype table for resolution error nodes. PK: `(snapshot_id, /// node_id)`. -#[derive(RecordBatchRow)] +#[derive(Debug, Clone, PartialEq, RecordBatchRow)] pub struct ResolutionErrorRow { /// PK component. FK → `Node(snapshot_id, node_id)`. pub snapshot_id: String,