Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions cli/src/clients/datafusion_helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,3 +435,87 @@ pub struct HandlerStateStats {
pub oldest_at: chrono::DateTime<Local>,
pub oldest_invocation: String,
}

#[cfg(test)]
mod tests {
use super::*;

// Mirrors the `JsonResponse` wrapper returned by the server's /query endpoint.
#[derive(serde::Deserialize)]
struct Rows<T> {
rows: Vec<T>,
}

#[test]
fn invocation_state_parse_and_display_roundtrip() {
let cases = [
("pending", InvocationState::Pending),
("scheduled", InvocationState::Scheduled),
("ready", InvocationState::Ready),
("running", InvocationState::Running),
("suspended", InvocationState::Suspended),
("backing-off", InvocationState::BackingOff),
("completed", InvocationState::Completed),
("paused", InvocationState::Paused),
];
for (s, expected) in cases {
let parsed: InvocationState = s.parse().unwrap();
assert_eq!(parsed, expected, "parse('{s}')");
assert_eq!(parsed.to_string(), s, "display({expected:?})");
}
}

#[test]
fn invocation_state_unknown_fallback() {
let state: InvocationState = "bogus-state".parse().unwrap();
assert_eq!(state, InvocationState::Unknown);
assert_eq!(state.to_string(), "unknown");
}

#[test]
fn invocation_completion_from_sql() {
assert!(matches!(
InvocationCompletion::from_sql(Some("success".into()), None),
Some(InvocationCompletion::Success)
));
// failure with message
assert!(matches!(
InvocationCompletion::from_sql(Some("failure".into()), Some("timeout".into())),
Some(InvocationCompletion::Failure(msg)) if msg == "timeout"
));
// failure without message falls back to "Unknown"
assert!(matches!(
InvocationCompletion::from_sql(Some("failure".into()), None),
Some(InvocationCompletion::Failure(msg)) if msg == "Unknown"
));
assert!(InvocationCompletion::from_sql(None, None).is_none());
assert!(InvocationCompletion::from_sql(None, Some("x".into())).is_none());
}

#[test]
fn simple_invocation_deserializes_from_json() {
let json = r#"{"rows":[
{"id":"inv_1","target":"Greeter/greet"},
{"id":"inv_2","target":"Counter/inc"}
]}"#;
let resp: Rows<SimpleInvocation> = serde_json::from_str(json).unwrap();
assert_eq!(resp.rows.len(), 2);
assert_eq!(resp.rows[0].id, "inv_1");
assert_eq!(resp.rows[0].target, "Greeter/greet");
assert_eq!(resp.rows[1].id, "inv_2");
}

#[test]
fn service_handler_usage_deserializes_from_json() {
let json = r#"{"rows":[
{"service":"Greeter","handler":"greet","inv_count":5},
{"service":"Counter","handler":"inc","inv_count":0}
]}"#;
let resp: Rows<ServiceHandlerUsage> = serde_json::from_str(json).unwrap();
assert_eq!(resp.rows.len(), 2);
assert_eq!(resp.rows[0].service, "Greeter");
assert_eq!(resp.rows[0].handler, "greet");
assert_eq!(resp.rows[0].inv_count, 5);
assert_eq!(resp.rows[1].inv_count, 0);
}
}
149 changes: 149 additions & 0 deletions cli/src/clients/datafusion_helpers/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,155 @@ pub struct StateKeysQueryResult {
value: Vec<u8>,
}

#[cfg(test)]
mod tests {
use super::*;

#[derive(serde::Deserialize)]
struct Rows<T> {
rows: Vec<T>,
}

/// Minimal valid JSON row for an invocation query result.
/// Only required fields are set; all Option fields default to None.
fn inv_row(id: &str, status: &str, last_start_at: &str, completion_result: &str) -> String {
format!(
r#"{{
"id": "{id}",
"target": "MyService/myHandler",
"target_service_ty": "service",
"status": "{status}",
"created_at": "2024-01-15T10:30:00+00:00",
"pinned_deployment_exists": false,
"last_start_at": {last_start_at},
"completion_result": {completion_result},
"completion_failure": null
}}"#
)
}

#[test]
fn invocation_query_result_deserializes() {
let json = format!(
r#"{{"rows":[{}]}}"#,
inv_row("inv_abc", "running", "null", "null")
);
let resp: Rows<InvocationQueryResult> = serde_json::from_str(&json).unwrap();
assert_eq!(resp.rows.len(), 1);
assert_eq!(resp.rows[0].invocation.id, "inv_abc");
assert_eq!(resp.rows[0].invocation.status, InvocationState::Running);
assert_eq!(resp.rows[0].invocation.target, "MyService/myHandler");
}

#[test]
fn invocation_from_running_query_result_sets_duration() {
// Running + last_start_at set → current_attempt_duration is computed, last_attempt_started_at is None
let json = format!(
r#"{{"rows":[{}]}}"#,
inv_row("inv_1", "running", r#""2024-01-15T10:30:00+00:00""#, "null")
);
let resp: Rows<InvocationQueryResult> = serde_json::from_str(&json).unwrap();
let inv = Invocation::from(resp.rows.into_iter().next().unwrap());
assert!(
inv.current_attempt_duration.is_some(),
"running inv should have attempt duration"
);
assert!(inv.last_attempt_started_at.is_none());
assert!(inv.completion.is_none());
}

#[test]
fn invocation_from_backing_off_query_result_sets_last_attempt_started_at() {
// BackingOff + last_start_at → last_attempt_started_at is set, no attempt duration
let json = format!(
r#"{{"rows":[{}]}}"#,
inv_row(
"inv_2",
"backing-off",
r#""2024-01-15T10:30:00+00:00""#,
"null"
)
);
let resp: Rows<InvocationQueryResult> = serde_json::from_str(&json).unwrap();
let inv = Invocation::from(resp.rows.into_iter().next().unwrap());
assert!(inv.current_attempt_duration.is_none());
assert!(
inv.last_attempt_started_at.is_some(),
"backing-off should record last attempt time"
);
}

#[test]
fn invocation_from_pending_query_result_has_no_duration() {
let json = format!(
r#"{{"rows":[{}]}}"#,
inv_row("inv_3", "pending", "null", "null")
);
let resp: Rows<InvocationQueryResult> = serde_json::from_str(&json).unwrap();
let inv = Invocation::from(resp.rows.into_iter().next().unwrap());
assert!(inv.current_attempt_duration.is_none());
assert!(inv.last_attempt_started_at.is_none());
}

#[test]
fn invocation_from_completed_query_result_propagates_completion() {
let json = format!(
r#"{{"rows":[{}]}}"#,
inv_row("inv_4", "completed", "null", r#""success""#)
);
let resp: Rows<InvocationQueryResult> = serde_json::from_str(&json).unwrap();
let inv = Invocation::from(resp.rows.into_iter().next().unwrap());
assert!(matches!(
inv.completion,
Some(InvocationCompletion::Success)
));
}

#[test]
fn service_status_query_result_deserializes() {
// Prevents regressions like #3041 where SQL column aliases in get_service_status
// diverged from the struct's expected field names (e.g. aliasing
// target_service_name to "service" would silently break deserialization).
let json = r#"{"rows":[{
"target_service_name": "Greeter",
"target_handler_name": "greet",
"status": "pending",
"num_invocations": 3,
"oldest_at": "2024-01-15T10:30:00+00:00",
"oldest_invocation": "inv_oldest"
}]}"#;
let resp: Rows<ServiceStatusQueryResult> = serde_json::from_str(json).unwrap();
assert_eq!(resp.rows.len(), 1);
assert_eq!(resp.rows[0].target_service_name, "Greeter");
assert_eq!(resp.rows[0].target_handler_name, "greet");
assert_eq!(resp.rows[0].status, InvocationState::Pending);
assert_eq!(resp.rows[0].stats.num_invocations, 3);
assert_eq!(resp.rows[0].stats.oldest_invocation, "inv_oldest");
}

#[test]
fn count_estimate_from_rows_and_display() {
// received fewer than limit → Exact
assert!(matches!(
CountEstimate::from_rows(true, 5, 1000),
CountEstimate::Exact(5)
));
assert_eq!(CountEstimate::Exact(42).to_string(), "42");

// at limit but rows > minimum_count → LowerBound(rows)
assert!(matches!(
CountEstimate::from_rows(false, 50_000, 100),
CountEstimate::LowerBound(50_000)
));
// at limit, rows <= minimum_count → LowerBound(minimum_count)
assert!(matches!(
CountEstimate::from_rows(false, 50_000, 50_001),
CountEstimate::LowerBound(50_001)
));
assert_eq!(CountEstimate::LowerBound(100).to_string(), "100+");
}
}

pub(crate) async fn get_state_keys(
client: &DataFusionHttpClient,
service: &str,
Expand Down
Loading