Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions crates/invoker-api/src/status_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use codederror::Code;
use restate_types::errors::InvocationError;
use restate_types::identifiers::{DeploymentId, InvocationId, PartitionKey};
use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionLeaderEpoch};
use restate_types::invocation::InvocationTarget;
use restate_types::journal::{EntryIndex, EntryType};
use restate_types::service_protocol::ServiceProtocolVersion;
use std::future::Future;
Expand All @@ -22,6 +23,7 @@ use std::time::SystemTime;

#[derive(Debug, Clone)]
pub struct InvocationStatusReportInner {
pub invocation_target: Option<InvocationTarget>,
pub in_flight: bool,
pub start_count: usize,
pub last_start_at: SystemTime,
Expand All @@ -35,6 +37,7 @@ pub struct InvocationStatusReportInner {
impl Default for InvocationStatusReportInner {
fn default() -> Self {
Self {
invocation_target: None,
in_flight: false,
start_count: 0,
last_start_at: SystemTime::now(),
Expand Down Expand Up @@ -75,6 +78,10 @@ impl InvocationStatusReport {
self.1.1
}

pub fn invocation_target(&self) -> Option<&InvocationTarget> {
self.2.invocation_target.as_ref()
}

pub fn in_flight(&self) -> bool {
self.2.in_flight
}
Expand Down
3 changes: 2 additions & 1 deletion crates/invoker-impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1582,7 +1582,8 @@ where
);

// Transition the state machine, and store it
self.status_store.on_start(partition, invocation_id);
self.status_store
.on_start(partition, invocation_id, ism.invocation_target.clone());
ism.start(abort_handle, completions_tx);
trace!(
restate.invocation.target = %ism.invocation_target,
Expand Down
2 changes: 2 additions & 0 deletions crates/invoker-impl/src/status_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@ impl InvocationStatusStore {
&mut self,
partition: PartitionLeaderEpoch,
invocation_id: InvocationId,
invocation_target: InvocationTarget,
) {
let report = self
.0
.entry(partition)
.or_default()
.entry(invocation_id)
.or_default();
report.invocation_target = Some(invocation_target);
report.start_count += 1;
report.last_start_at = SystemTime::now();
report.next_retry_at = None;
Expand Down
32 changes: 32 additions & 0 deletions crates/storage-query-datafusion/src/invocation_state/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

use restate_invoker_api::InvocationStatusReport;
use restate_types::identifiers::WithPartitionKey;
use restate_types::invocation::ServiceType;
use restate_types::service_protocol::ServiceProtocolVersion;
use restate_types::time::MillisSinceEpoch;

Expand All @@ -28,6 +29,37 @@
if row.is_id_defined() {
row.fmt_id(invocation_id);
}

if (row.is_target_service_name_defined()
|| row.is_target_service_key_defined()
|| row.is_target_handler_name_defined()
|| row.is_target_defined()
|| row.is_target_service_ty_defined())
&& let Some(invocation_target) = status_row.invocation_target()
{
if row.is_target_service_name_defined() {
row.target_service_name(invocation_target.service_name().as_ref());

Check failure on line 41 in crates/storage-query-datafusion/src/invocation_state/row.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

type annotations needed
}
if row.is_target_service_key_defined() {
if let Some(key) = invocation_target.key() {
row.target_service_key(key.as_ref());

Check failure on line 45 in crates/storage-query-datafusion/src/invocation_state/row.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

type annotations needed
}
}
if row.is_target_handler_name_defined() {
row.target_handler_name(invocation_target.handler_name().as_ref());

Check failure on line 49 in crates/storage-query-datafusion/src/invocation_state/row.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

type annotations needed
}
if row.is_target_defined() {
row.fmt_target(invocation_target);
}
if row.is_target_service_ty_defined() {
row.target_service_ty(match invocation_target.service_ty() {
ServiceType::Service => "service",
ServiceType::VirtualObject => "virtual_object",
ServiceType::Workflow => "workflow",
});
}
}

row.in_flight(status_row.in_flight());
row.retry_count(status_row.retry_count() as u64);
row.last_start_at(MillisSinceEpoch::as_u64(&status_row.last_start_at().into()) as i64);
Expand Down
17 changes: 17 additions & 0 deletions crates/storage-query-datafusion/src/invocation_state/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,23 @@ define_table!(sys_invocation_state(
/// [Invocation ID](/operate/invocation#invocation-identifier).
id: DataType::LargeUtf8,

/// Invocation Target. Format for plain services: `ServiceName/HandlerName`, e.g.
/// `Greeter/greet`. Format for virtual objects/workflows: `VirtualObjectName/Key/HandlerName`,
/// e.g. `Greeter/Francesco/greet`.
target: DataType::LargeUtf8,

/// The name of the invoked service.
target_service_name: DataType::LargeUtf8,

/// The key of the virtual object or the workflow ID. Null for regular services.
target_service_key: DataType::LargeUtf8,

/// The invoked handler.
target_handler_name: DataType::LargeUtf8,

/// The service type. Either `service` or `virtual_object` or `workflow`.
target_service_ty: DataType::LargeUtf8,

/// If true, the invocation is currently in-flight
in_flight: DataType::Boolean,

Expand Down
Loading