diff --git a/crates/invoker-api/src/status_handle.rs b/crates/invoker-api/src/status_handle.rs index 794d011ed6..904831cf93 100644 --- a/crates/invoker-api/src/status_handle.rs +++ b/crates/invoker-api/src/status_handle.rs @@ -12,6 +12,7 @@ use codederror::Code; use restate_types::errors::InvocationError; use restate_types::identifiers::{DeploymentId, InvocationId, PartitionKey}; use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionLeaderEpoch}; +use restate_types::invocation::InvocationTarget; use restate_types::journal::{EntryIndex, EntryType}; use restate_types::service_protocol::ServiceProtocolVersion; use std::future::Future; @@ -22,6 +23,7 @@ use std::time::SystemTime; #[derive(Debug, Clone)] pub struct InvocationStatusReportInner { + pub invocation_target: Option, pub in_flight: bool, pub start_count: usize, pub last_start_at: SystemTime, @@ -35,6 +37,7 @@ pub struct InvocationStatusReportInner { impl Default for InvocationStatusReportInner { fn default() -> Self { Self { + invocation_target: None, in_flight: false, start_count: 0, last_start_at: SystemTime::now(), @@ -75,6 +78,10 @@ impl InvocationStatusReport { self.1.1 } + pub fn invocation_target(&self) -> Option<&InvocationTarget> { + self.2.invocation_target.as_ref() + } + pub fn in_flight(&self) -> bool { self.2.in_flight } diff --git a/crates/invoker-impl/src/lib.rs b/crates/invoker-impl/src/lib.rs index d074c4e1bc..70751a6173 100644 --- a/crates/invoker-impl/src/lib.rs +++ b/crates/invoker-impl/src/lib.rs @@ -1582,7 +1582,8 @@ where ); // Transition the state machine, and store it - self.status_store.on_start(partition, invocation_id); + self.status_store + .on_start(partition, invocation_id, ism.invocation_target.clone()); ism.start(abort_handle, completions_tx); trace!( restate.invocation.target = %ism.invocation_target, diff --git a/crates/invoker-impl/src/status_store.rs b/crates/invoker-impl/src/status_store.rs index 9fdd51b87c..7b250429ca 100644 --- a/crates/invoker-impl/src/status_store.rs +++ b/crates/invoker-impl/src/status_store.rs @@ -45,6 +45,7 @@ impl InvocationStatusStore { &mut self, partition: PartitionLeaderEpoch, invocation_id: InvocationId, + invocation_target: InvocationTarget, ) { let report = self .0 @@ -52,6 +53,7 @@ impl InvocationStatusStore { .or_default() .entry(invocation_id) .or_default(); + report.invocation_target = Some(invocation_target); report.start_count += 1; report.last_start_at = SystemTime::now(); report.next_retry_at = None; diff --git a/crates/storage-query-datafusion/src/invocation_state/row.rs b/crates/storage-query-datafusion/src/invocation_state/row.rs index b164f36cc6..61562d5011 100644 --- a/crates/storage-query-datafusion/src/invocation_state/row.rs +++ b/crates/storage-query-datafusion/src/invocation_state/row.rs @@ -10,6 +10,7 @@ use restate_invoker_api::InvocationStatusReport; use restate_types::identifiers::WithPartitionKey; +use restate_types::invocation::ServiceType; use restate_types::service_protocol::ServiceProtocolVersion; use restate_types::time::MillisSinceEpoch; @@ -28,6 +29,37 @@ pub(crate) fn append_invocation_state_row( if row.is_id_defined() { row.fmt_id(invocation_id); } + + if (row.is_target_service_name_defined() + || row.is_target_service_key_defined() + || row.is_target_handler_name_defined() + || row.is_target_defined() + || row.is_target_service_ty_defined()) + && let Some(invocation_target) = status_row.invocation_target() + { + if row.is_target_service_name_defined() { + row.target_service_name(invocation_target.service_name().as_ref()); + } + if row.is_target_service_key_defined() { + if let Some(key) = invocation_target.key() { + row.target_service_key(key.as_ref()); + } + } + if row.is_target_handler_name_defined() { + row.target_handler_name(invocation_target.handler_name().as_ref()); + } + if row.is_target_defined() { + row.fmt_target(invocation_target); + } + if row.is_target_service_ty_defined() { + row.target_service_ty(match invocation_target.service_ty() { + ServiceType::Service => "service", + ServiceType::VirtualObject => "virtual_object", + ServiceType::Workflow => "workflow", + }); + } + } + row.in_flight(status_row.in_flight()); row.retry_count(status_row.retry_count() as u64); row.last_start_at(MillisSinceEpoch::as_u64(&status_row.last_start_at().into()) as i64); diff --git a/crates/storage-query-datafusion/src/invocation_state/schema.rs b/crates/storage-query-datafusion/src/invocation_state/schema.rs index 9816f37406..22489bb3dd 100644 --- a/crates/storage-query-datafusion/src/invocation_state/schema.rs +++ b/crates/storage-query-datafusion/src/invocation_state/schema.rs @@ -21,6 +21,23 @@ define_table!(sys_invocation_state( /// [Invocation ID](/operate/invocation#invocation-identifier). id: DataType::LargeUtf8, + /// Invocation Target. Format for plain services: `ServiceName/HandlerName`, e.g. + /// `Greeter/greet`. Format for virtual objects/workflows: `VirtualObjectName/Key/HandlerName`, + /// e.g. `Greeter/Francesco/greet`. + target: DataType::LargeUtf8, + + /// The name of the invoked service. + target_service_name: DataType::LargeUtf8, + + /// The key of the virtual object or the workflow ID. Null for regular services. + target_service_key: DataType::LargeUtf8, + + /// The invoked handler. + target_handler_name: DataType::LargeUtf8, + + /// The service type. Either `service` or `virtual_object` or `workflow`. + target_service_ty: DataType::LargeUtf8, + /// If true, the invocation is currently in-flight in_flight: DataType::Boolean,