diff --git a/cli/src/clients/datafusion_helpers/mod.rs b/cli/src/clients/datafusion_helpers/mod.rs index 26fe84dc07..18c97b41b1 100644 --- a/cli/src/clients/datafusion_helpers/mod.rs +++ b/cli/src/clients/datafusion_helpers/mod.rs @@ -435,3 +435,87 @@ pub struct HandlerStateStats { pub oldest_at: chrono::DateTime, pub oldest_invocation: String, } + +#[cfg(test)] +mod tests { + use super::*; + + // Mirrors the `JsonResponse` wrapper returned by the server's /query endpoint. + #[derive(serde::Deserialize)] + struct Rows { + rows: Vec, + } + + #[test] + fn invocation_state_parse_and_display_roundtrip() { + let cases = [ + ("pending", InvocationState::Pending), + ("scheduled", InvocationState::Scheduled), + ("ready", InvocationState::Ready), + ("running", InvocationState::Running), + ("suspended", InvocationState::Suspended), + ("backing-off", InvocationState::BackingOff), + ("completed", InvocationState::Completed), + ("paused", InvocationState::Paused), + ]; + for (s, expected) in cases { + let parsed: InvocationState = s.parse().unwrap(); + assert_eq!(parsed, expected, "parse('{s}')"); + assert_eq!(parsed.to_string(), s, "display({expected:?})"); + } + } + + #[test] + fn invocation_state_unknown_fallback() { + let state: InvocationState = "bogus-state".parse().unwrap(); + assert_eq!(state, InvocationState::Unknown); + assert_eq!(state.to_string(), "unknown"); + } + + #[test] + fn invocation_completion_from_sql() { + assert!(matches!( + InvocationCompletion::from_sql(Some("success".into()), None), + Some(InvocationCompletion::Success) + )); + // failure with message + assert!(matches!( + InvocationCompletion::from_sql(Some("failure".into()), Some("timeout".into())), + Some(InvocationCompletion::Failure(msg)) if msg == "timeout" + )); + // failure without message falls back to "Unknown" + assert!(matches!( + InvocationCompletion::from_sql(Some("failure".into()), None), + Some(InvocationCompletion::Failure(msg)) if msg == "Unknown" + )); + assert!(InvocationCompletion::from_sql(None, None).is_none()); + assert!(InvocationCompletion::from_sql(None, Some("x".into())).is_none()); + } + + #[test] + fn simple_invocation_deserializes_from_json() { + let json = r#"{"rows":[ + {"id":"inv_1","target":"Greeter/greet"}, + {"id":"inv_2","target":"Counter/inc"} + ]}"#; + let resp: Rows = serde_json::from_str(json).unwrap(); + assert_eq!(resp.rows.len(), 2); + assert_eq!(resp.rows[0].id, "inv_1"); + assert_eq!(resp.rows[0].target, "Greeter/greet"); + assert_eq!(resp.rows[1].id, "inv_2"); + } + + #[test] + fn service_handler_usage_deserializes_from_json() { + let json = r#"{"rows":[ + {"service":"Greeter","handler":"greet","inv_count":5}, + {"service":"Counter","handler":"inc","inv_count":0} + ]}"#; + let resp: Rows = serde_json::from_str(json).unwrap(); + assert_eq!(resp.rows.len(), 2); + assert_eq!(resp.rows[0].service, "Greeter"); + assert_eq!(resp.rows[0].handler, "greet"); + assert_eq!(resp.rows[0].inv_count, 5); + assert_eq!(resp.rows[1].inv_count, 0); + } +} diff --git a/cli/src/clients/datafusion_helpers/v2.rs b/cli/src/clients/datafusion_helpers/v2.rs index 913f619465..9f2fc1e8d6 100644 --- a/cli/src/clients/datafusion_helpers/v2.rs +++ b/cli/src/clients/datafusion_helpers/v2.rs @@ -707,6 +707,155 @@ pub struct StateKeysQueryResult { value: Vec, } +#[cfg(test)] +mod tests { + use super::*; + + #[derive(serde::Deserialize)] + struct Rows { + rows: Vec, + } + + /// Minimal valid JSON row for an invocation query result. + /// Only required fields are set; all Option fields default to None. + fn inv_row(id: &str, status: &str, last_start_at: &str, completion_result: &str) -> String { + format!( + r#"{{ + "id": "{id}", + "target": "MyService/myHandler", + "target_service_ty": "service", + "status": "{status}", + "created_at": "2024-01-15T10:30:00+00:00", + "pinned_deployment_exists": false, + "last_start_at": {last_start_at}, + "completion_result": {completion_result}, + "completion_failure": null + }}"# + ) + } + + #[test] + fn invocation_query_result_deserializes() { + let json = format!( + r#"{{"rows":[{}]}}"#, + inv_row("inv_abc", "running", "null", "null") + ); + let resp: Rows = serde_json::from_str(&json).unwrap(); + assert_eq!(resp.rows.len(), 1); + assert_eq!(resp.rows[0].invocation.id, "inv_abc"); + assert_eq!(resp.rows[0].invocation.status, InvocationState::Running); + assert_eq!(resp.rows[0].invocation.target, "MyService/myHandler"); + } + + #[test] + fn invocation_from_running_query_result_sets_duration() { + // Running + last_start_at set → current_attempt_duration is computed, last_attempt_started_at is None + let json = format!( + r#"{{"rows":[{}]}}"#, + inv_row("inv_1", "running", r#""2024-01-15T10:30:00+00:00""#, "null") + ); + let resp: Rows = serde_json::from_str(&json).unwrap(); + let inv = Invocation::from(resp.rows.into_iter().next().unwrap()); + assert!( + inv.current_attempt_duration.is_some(), + "running inv should have attempt duration" + ); + assert!(inv.last_attempt_started_at.is_none()); + assert!(inv.completion.is_none()); + } + + #[test] + fn invocation_from_backing_off_query_result_sets_last_attempt_started_at() { + // BackingOff + last_start_at → last_attempt_started_at is set, no attempt duration + let json = format!( + r#"{{"rows":[{}]}}"#, + inv_row( + "inv_2", + "backing-off", + r#""2024-01-15T10:30:00+00:00""#, + "null" + ) + ); + let resp: Rows = serde_json::from_str(&json).unwrap(); + let inv = Invocation::from(resp.rows.into_iter().next().unwrap()); + assert!(inv.current_attempt_duration.is_none()); + assert!( + inv.last_attempt_started_at.is_some(), + "backing-off should record last attempt time" + ); + } + + #[test] + fn invocation_from_pending_query_result_has_no_duration() { + let json = format!( + r#"{{"rows":[{}]}}"#, + inv_row("inv_3", "pending", "null", "null") + ); + let resp: Rows = serde_json::from_str(&json).unwrap(); + let inv = Invocation::from(resp.rows.into_iter().next().unwrap()); + assert!(inv.current_attempt_duration.is_none()); + assert!(inv.last_attempt_started_at.is_none()); + } + + #[test] + fn invocation_from_completed_query_result_propagates_completion() { + let json = format!( + r#"{{"rows":[{}]}}"#, + inv_row("inv_4", "completed", "null", r#""success""#) + ); + let resp: Rows = serde_json::from_str(&json).unwrap(); + let inv = Invocation::from(resp.rows.into_iter().next().unwrap()); + assert!(matches!( + inv.completion, + Some(InvocationCompletion::Success) + )); + } + + #[test] + fn service_status_query_result_deserializes() { + // Prevents regressions like #3041 where SQL column aliases in get_service_status + // diverged from the struct's expected field names (e.g. aliasing + // target_service_name to "service" would silently break deserialization). + let json = r#"{"rows":[{ + "target_service_name": "Greeter", + "target_handler_name": "greet", + "status": "pending", + "num_invocations": 3, + "oldest_at": "2024-01-15T10:30:00+00:00", + "oldest_invocation": "inv_oldest" + }]}"#; + let resp: Rows = serde_json::from_str(json).unwrap(); + assert_eq!(resp.rows.len(), 1); + assert_eq!(resp.rows[0].target_service_name, "Greeter"); + assert_eq!(resp.rows[0].target_handler_name, "greet"); + assert_eq!(resp.rows[0].status, InvocationState::Pending); + assert_eq!(resp.rows[0].stats.num_invocations, 3); + assert_eq!(resp.rows[0].stats.oldest_invocation, "inv_oldest"); + } + + #[test] + fn count_estimate_from_rows_and_display() { + // received fewer than limit → Exact + assert!(matches!( + CountEstimate::from_rows(true, 5, 1000), + CountEstimate::Exact(5) + )); + assert_eq!(CountEstimate::Exact(42).to_string(), "42"); + + // at limit but rows > minimum_count → LowerBound(rows) + assert!(matches!( + CountEstimate::from_rows(false, 50_000, 100), + CountEstimate::LowerBound(50_000) + )); + // at limit, rows <= minimum_count → LowerBound(minimum_count) + assert!(matches!( + CountEstimate::from_rows(false, 50_000, 50_001), + CountEstimate::LowerBound(50_001) + )); + assert_eq!(CountEstimate::LowerBound(100).to_string(), "100+"); + } +} + pub(crate) async fn get_state_keys( client: &DataFusionHttpClient, service: &str,