diff --git a/crates/storage-query-datafusion/src/context.rs b/crates/storage-query-datafusion/src/context.rs index 6f2500b3c0..5300d0537f 100644 --- a/crates/storage-query-datafusion/src/context.rs +++ b/crates/storage-query-datafusion/src/context.rs @@ -26,6 +26,8 @@ use datafusion::physical_plan::SendableRecordBatchStream; use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion::sql::TableReference; +use crate::empty_invoker_status_handle::EmptyInvokerStatusHandle; +use crate::remote_query_scanner_manager::RemoteScannerManager; use codederror::CodedError; use restate_core::{Metadata, TaskCenter}; use restate_invoker_api::StatusHandle; @@ -40,9 +42,7 @@ use restate_types::partitions::state::PartitionReplicaSetStates; use restate_types::schema::deployment::DeploymentResolver; use restate_types::schema::service::ServiceMetadataResolver; -use crate::remote_query_scanner_manager::RemoteScannerManager; - -const SYS_INVOCATION_VIEW: &str = "CREATE VIEW sys_invocation as SELECT +const SYS_INVOCATION_OLD_VIEW: &str = "CREATE VIEW sys_invocation_old as SELECT ss.id, ss.target, ss.target_service_name, @@ -189,6 +189,23 @@ where self.partition_store_manager.clone(), &self.remote_scanner_manager, )?; + if let Some(status) = self.status.clone() { + crate::invocation::register_self( + ctx, + self.partition_selector.clone(), + status, + self.partition_store_manager.clone(), + &self.remote_scanner_manager, + )?; + } else { + crate::invocation::register_self( + ctx, + self.partition_selector.clone(), + EmptyInvokerStatusHandle, + self.partition_store_manager.clone(), + &self.remote_scanner_manager, + )?; + } crate::keyed_service_status::register_self( ctx, self.partition_selector.clone(), @@ -232,7 +249,7 @@ where &self.remote_scanner_manager, )?; - ctx.datafusion_context.sql(SYS_INVOCATION_VIEW).await?; + ctx.datafusion_context.sql(SYS_INVOCATION_OLD_VIEW).await?; Ok(()) } diff --git a/crates/storage-query-datafusion/src/invocation/mod.rs b/crates/storage-query-datafusion/src/invocation/mod.rs new file mode 100644 index 0000000000..057a963489 --- /dev/null +++ b/crates/storage-query-datafusion/src/invocation/mod.rs @@ -0,0 +1,15 @@ +// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +mod row; +pub(crate) mod schema; +mod table; + +pub(crate) use table::register_self; diff --git a/crates/storage-query-datafusion/src/invocation/row.rs b/crates/storage-query-datafusion/src/invocation/row.rs new file mode 100644 index 0000000000..e0a7175b56 --- /dev/null +++ b/crates/storage-query-datafusion/src/invocation/row.rs @@ -0,0 +1,469 @@ +// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use restate_invoker_api::InvocationStatusReport; +use restate_storage_api::protobuf_types::v1::lazy::InvocationStatusV2Lazy; +use restate_storage_api::protobuf_types::v1::source::Source; +use restate_types::errors::ConversionError; +use restate_types::identifiers::{InvocationId, WithPartitionKey}; +use restate_types::invocation::{ServiceType, TraceId}; +use restate_types::service_protocol::ServiceProtocolVersion; +use restate_types::time::MillisSinceEpoch; + +use crate::invocation::schema::{SysInvocationBuilder, SysInvocationRowBuilder}; + +#[inline] +pub(crate) fn append_invocation_row<'a>( + builder: &mut SysInvocationBuilder, + invocation_id: InvocationId, + invocation_status: &'a InvocationStatusV2Lazy<'a>, + status_report: Option<&InvocationStatusReport>, +) -> Result<(), ConversionError> { + let mut row = builder.row(); + + if row.is_partition_key_defined() { + row.partition_key(invocation_id.partition_key()); + } + + if (row.is_target_service_name_defined() + || row.is_target_service_key_defined() + || row.is_target_handler_name_defined() + || row.is_target_defined() + || row.is_target_service_ty_defined()) + && let Some(invocation_target) = invocation_status.invocation_target()? + { + if row.is_target_service_name_defined() { + row.target_service_name(invocation_target.service_name()?); + } + if row.is_target_service_key_defined() + && let Some(key) = invocation_target.key()? + { + row.target_service_key(key); + } + if row.is_target_handler_name_defined() { + row.target_handler_name(invocation_target.handler_name()?); + } + if row.is_target_defined() { + row.fmt_target(invocation_target.target_fmt()?); + } + if row.is_target_service_ty_defined() { + row.target_service_ty(match invocation_target.service_ty()? { + ServiceType::Service => "service", + ServiceType::VirtualObject => "virtual_object", + ServiceType::Workflow => "workflow", + }); + } + } + + // Invocation id + if row.is_id_defined() { + row.fmt_id(invocation_id); + } + + if row.is_idempotency_key_defined() + && let Some(key) = invocation_status.idempotency_key()? + { + row.idempotency_key(key) + } + + if needs_invoked_by(&row) { + fill_invoked_by(&mut row, invocation_status.source()?)?; + } + + fill_timestamps(&mut row, invocation_status); + + // Fill status_report fields + if let Some(report) = status_report { + fill_status_report(&mut row, report); + } + + // Additional invocation metadata + use restate_storage_api::protobuf_types::v1::invocation_status_v2::Status; + match invocation_status.inner.status() { + Status::Scheduled => { + fill_computed_status(&mut row, Status::Scheduled, status_report); + if row.is_created_using_restate_version_defined() { + row.created_using_restate_version( + invocation_status.created_using_restate_version()?, + ); + } + if row.is_scheduled_at_defined() + && let Some(execution_time) = invocation_status.inner.execution_time + { + row.scheduled_start_at(execution_time as i64) + } + if row.is_completion_retention_defined() { + row.completion_retention( + invocation_status + .completion_retention_duration()? + .as_millis() as i64, + ); + } + if row.is_journal_retention_defined() { + row.journal_retention( + invocation_status.journal_retention_duration()?.as_millis() as i64 + ); + } + } + Status::Inboxed => { + fill_computed_status(&mut row, Status::Inboxed, status_report); + if row.is_created_using_restate_version_defined() { + row.created_using_restate_version( + invocation_status.created_using_restate_version()?, + ); + } + if row.is_scheduled_at_defined() + && let Some(execution_time) = invocation_status.inner.execution_time + { + row.scheduled_start_at(execution_time as i64) + } + if row.is_completion_retention_defined() { + row.completion_retention( + invocation_status + .completion_retention_duration()? + .as_millis() as i64, + ); + } + if row.is_journal_retention_defined() { + row.journal_retention( + invocation_status.journal_retention_duration()?.as_millis() as i64 + ); + } + } + Status::Invoked => { + fill_computed_status(&mut row, Status::Invoked, status_report); + fill_journal_metadata(&mut row, invocation_status)?; + fill_in_flight_invocation_metadata(&mut row, invocation_status)?; + } + Status::Paused => { + fill_computed_status(&mut row, Status::Paused, status_report); + fill_journal_metadata(&mut row, invocation_status)?; + fill_in_flight_invocation_metadata(&mut row, invocation_status)?; + } + Status::Suspended => { + fill_computed_status(&mut row, Status::Suspended, status_report); + + if row.is_suspended_waiting_for_completions_defined() { + row.suspended_waiting_for_completions( + invocation_status + .inner + .waiting_for_completions + .iter() + .map(|c| Some(*c)), + ); + } + if row.is_suspended_waiting_for_signals_defined() { + row.suspended_waiting_for_signals( + invocation_status + .inner + .waiting_for_signal_indexes + .iter() + .map(|c| Some(*c)), + ); + } + + fill_journal_metadata(&mut row, invocation_status)?; + fill_in_flight_invocation_metadata(&mut row, invocation_status)?; + } + Status::Completed => { + fill_computed_status(&mut row, Status::Completed, status_report); + fill_journal_metadata(&mut row, invocation_status)?; + + if row.is_pinned_deployment_id_defined() + && let Some(deployment_id) = invocation_status.deployment_id()? + { + row.fmt_pinned_deployment_id(deployment_id); + } + if row.is_pinned_service_protocol_version_defined() + && let Some(service_protocol_version) = + invocation_status.service_protocol_version()? + { + row.pinned_service_protocol_version( + service_protocol_version.as_repr().unsigned_abs(), + ); + } + + if row.is_created_using_restate_version_defined() { + row.created_using_restate_version( + invocation_status.created_using_restate_version()?, + ); + } + if row.is_scheduled_at_defined() + && let Some(execution_time) = invocation_status.inner.execution_time + { + row.scheduled_start_at(execution_time as i64) + } + if row.is_completion_retention_defined() { + row.completion_retention( + invocation_status + .completion_retention_duration()? + .as_millis() as i64, + ); + } + if row.is_journal_retention_defined() { + row.journal_retention( + invocation_status.journal_retention_duration()?.as_millis() as i64 + ); + } + + if row.is_completion_result_defined() || row.is_completion_failure_defined() { + use restate_storage_api::protobuf_types::v1::response_result::ResponseResult; + match invocation_status.response_result()? { + ResponseResult::ResponseSuccess(_) => { + row.completion_result("success"); + } + ResponseResult::ResponseFailure( + restate_storage_api::protobuf_types::v1::response_result::ResponseFailure { + failure_code, + failure_message, + .. + }, + ) => { + row.completion_result("failure"); + if row.is_completion_failure_defined() { + let message = str::from_utf8(failure_message.as_ref()) + .map_err(|_| ConversionError::invalid_data("failure_message"))?; + row.fmt_completion_failure(format_args!( + "[{}] {}", + failure_code, message, + )); + } + } + } + } + } + Status::UnknownStatus => return Err(ConversionError::invalid_data("status")), + }; + + Ok(()) +} + +fn fill_computed_status( + row: &mut SysInvocationRowBuilder, + status: restate_storage_api::protobuf_types::v1::invocation_status_v2::Status, + status_report: Option<&InvocationStatusReport>, +) { + use restate_storage_api::protobuf_types::v1::invocation_status_v2::Status; + + if !row.is_status_defined() { + return; + } + + row.status(match status { + Status::Inboxed => "pending", + Status::Scheduled => "scheduled", + Status::Completed => "completed", + Status::Suspended => "suspended", + Status::Paused => "paused", + Status::Invoked => match status_report { + Some(report) if report.in_flight() => "running", + Some(report) if report.retry_count() > 0 => "backing-off", + _ => "ready", + }, + Status::UnknownStatus => "unknown", + }); +} + +fn fill_status_report(row: &mut SysInvocationRowBuilder, report: &InvocationStatusReport) { + if row.is_retry_count_defined() { + row.retry_count(report.retry_count() as u64); + } + if row.is_last_start_at_defined() { + row.last_start_at(MillisSinceEpoch::as_u64(&report.last_start_at().into()) as i64); + } + if row.is_last_attempt_deployment_id_defined() + && let Some(last_attempt_deployment_id) = report.last_attempt_deployment_id() + { + row.last_attempt_deployment_id(last_attempt_deployment_id.to_string()); + } + if row.is_last_attempt_server_defined() + && let Some(last_attempt_server) = report.last_attempt_server() + { + row.last_attempt_server(last_attempt_server); + } + if row.is_next_retry_at_defined() + && let Some(next_retry_at) = report.next_retry_at() + { + row.next_retry_at(MillisSinceEpoch::as_u64(&next_retry_at.into()) as i64); + } + if let Some(last_retry_attempt_failure) = report.last_retry_attempt_failure() { + if row.is_last_failure_defined() { + row.fmt_last_failure(&last_retry_attempt_failure.err); + } + if row.is_last_failure_error_code_defined() + && let Some(doc_error_code) = last_retry_attempt_failure.doc_error_code + { + row.last_failure_error_code(doc_error_code.code()) + } + + match report.last_attempt_service_protocol_version() { + None => {} + Some(sp) if *sp <= ServiceProtocolVersion::V3 => { + if let Some(name) = &last_retry_attempt_failure.related_entry_name + && !name.is_empty() + { + row.last_failure_related_entry_name(name); + } + if let Some(idx) = last_retry_attempt_failure.related_entry_index { + row.last_failure_related_entry_index(idx as u64); + } + + if row.is_last_failure_related_entry_type_defined() + && let Some(related_entry_type) = &last_retry_attempt_failure.related_entry_type + { + row.fmt_last_failure_related_entry_type(related_entry_type); + } + } + _ => { + if let Some(name) = &last_retry_attempt_failure.related_entry_name + && !name.is_empty() + { + row.last_failure_related_command_name(name); + } + if let Some(idx) = last_retry_attempt_failure.related_entry_index { + row.last_failure_related_command_index(idx as u64); + } + + if row.is_last_failure_related_command_type_defined() + && let Some(related_command_type) = + &last_retry_attempt_failure.related_entry_type + { + row.fmt_last_failure_related_command_type(related_command_type); + } + } + } + } +} + +fn fill_in_flight_invocation_metadata( + row: &mut SysInvocationRowBuilder, + status: &InvocationStatusV2Lazy, +) -> Result<(), ConversionError> { + if row.is_created_using_restate_version_defined() { + row.created_using_restate_version(status.created_using_restate_version()?); + } + if row.is_pinned_deployment_id_defined() + && let Some(deployment_id) = status.deployment_id()? + { + row.fmt_pinned_deployment_id(deployment_id); + } + if row.is_pinned_service_protocol_version_defined() + && let Some(service_protocol_version) = status.service_protocol_version()? + { + row.pinned_service_protocol_version(service_protocol_version.as_repr().unsigned_abs()); + } + if row.is_scheduled_start_at_defined() + && let Some(execution_time) = status.inner.execution_time + { + row.scheduled_start_at(execution_time as i64) + } + if row.is_completion_retention_defined() { + row.completion_retention(status.completion_retention_duration()?.as_millis() as i64); + } + if row.is_journal_retention_defined() { + row.journal_retention(status.journal_retention_duration()?.as_millis() as i64); + } + + Ok(()) +} + +fn needs_invoked_by(row: &SysInvocationRowBuilder) -> bool { + row.is_invoked_by_defined() + || row.is_invoked_by_service_name_defined() + || row.is_invoked_by_id_defined() + || row.is_invoked_by_target_defined() + || row.is_invoked_by_subscription_id_defined() + || row.is_restarted_from_defined() +} + +#[inline] +fn fill_invoked_by( + row: &mut SysInvocationRowBuilder, + source: Source, +) -> Result<(), ConversionError> { + match source { + Source::Service(service) => { + row.invoked_by("service"); + if row.is_invoked_by_service_name_defined() || row.is_invoked_by_target_defined() { + let invocation_target = service.invocation_target()?; + + if row.is_invoked_by_service_name_defined() { + row.invoked_by_service_name(invocation_target.service_name()?); + } + + if row.is_invoked_by_target_defined() { + row.fmt_invoked_by_target(invocation_target.target_fmt()?); + } + } + if row.is_invoked_by_id_defined() { + row.fmt_invoked_by_id(service.invocation_id()?); + } + } + Source::Ingress(_) => { + row.invoked_by("ingress"); + } + Source::Internal(()) => { + row.invoked_by("restate"); + } + Source::Subscription(subscription) => { + row.invoked_by("subscription"); + if row.is_invoked_by_subscription_id_defined() { + row.fmt_invoked_by_subscription_id(subscription.subscription_id()?) + } + } + Source::RestartAsNew(restart_as_new) => { + row.invoked_by("restart_as_new"); + if row.is_restarted_from_defined() { + row.fmt_restarted_from(restart_as_new.invocation_id()?) + } + } + } + + Ok(()) +} + +#[inline] +fn fill_timestamps(row: &mut SysInvocationRowBuilder, status: &InvocationStatusV2Lazy) { + row.created_at(status.inner.creation_time as i64); + row.modified_at(status.inner.modification_time as i64); + if let Some(inboxed_at) = status.inner.inboxed_transition_time { + row.inboxed_at(inboxed_at as i64); + } + if let Some(scheduled_at) = status.inner.scheduled_transition_time { + row.scheduled_at(scheduled_at as i64); + } + if let Some(running_at) = status.inner.running_transition_time { + row.running_at(running_at as i64); + } + if let Some(completed_at) = status.inner.completed_transition_time { + row.completed_at(completed_at as i64); + } +} + +#[inline] +fn fill_journal_metadata( + row: &mut SysInvocationRowBuilder, + status: &InvocationStatusV2Lazy, +) -> Result<(), ConversionError> { + if row.is_trace_id_defined() { + let tid = status.trace_id()?; + if tid != TraceId::INVALID { + row.fmt_trace_id(tid); + } + } + if row.is_journal_size_defined() { + row.journal_size(status.inner.journal_length); + } + if row.is_journal_commands_size_defined() { + row.journal_commands_size(status.inner.commands); + } + + Ok(()) +} diff --git a/crates/storage-query-datafusion/src/invocation/schema.rs b/crates/storage-query-datafusion/src/invocation/schema.rs new file mode 100644 index 0000000000..0bcd4b9402 --- /dev/null +++ b/crates/storage-query-datafusion/src/invocation/schema.rs @@ -0,0 +1,181 @@ +// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::table_macro::*; + +use datafusion::arrow::datatypes::DataType; + +define_sort_order!(sys_invocation(partition_key, id)); + +define_table!(sys_invocation( + /// Internal column that is used for partitioning the services invocations. Can be ignored. + partition_key: DataType::UInt64, + + /// [Invocation ID](/operate/invocation#invocation-identifier). + id: DataType::LargeUtf8, + + /// Invocation Target. Format for plain services: `ServiceName/HandlerName`, e.g. + /// `Greeter/greet`. Format for virtual objects/workflows: `VirtualObjectName/Key/HandlerName`, + /// e.g. `Greeter/Francesco/greet`. + target: DataType::LargeUtf8, + + /// The name of the invoked service. + target_service_name: DataType::LargeUtf8, + + /// The key of the virtual object or the workflow ID. Null for regular services. + target_service_key: DataType::LargeUtf8, + + /// The invoked handler. + target_handler_name: DataType::LargeUtf8, + + /// The service type. Either `service` or `virtual_object` or `workflow`. + target_service_ty: DataType::LargeUtf8, + + /// Idempotency key, if any. + idempotency_key: DataType::LargeUtf8, + + /// Either: + /// * `ingress` if the invocation was created externally. + /// * `service` if the invocation was created by another Restate service. + /// * `subscription` if the invocation was created by a subscription (e.g. Kafka). + /// * `restart_as_new` if the invocation was created by restarting an old invocation as new. + invoked_by: DataType::LargeUtf8, + + /// The name of caller service if `invoked_by = 'service'`. + invoked_by_service_name: DataType::LargeUtf8, + + /// The caller [Invocation ID](/operate/invocation#invocation-identifier) if `invoked_by = 'service'`. + invoked_by_id: DataType::LargeUtf8, + + /// The subscription id if `invoked_by = 'subscription'`. + invoked_by_subscription_id: DataType::LargeUtf8, + + /// The caller invocation target if `invoked_by = 'service'`. + invoked_by_target: DataType::LargeUtf8, + + /// The original invocation id if `invoked_by = 'restart_as_new'`. + restarted_from: DataType::LargeUtf8, + + /// The ID of the service deployment that started processing this invocation, and will continue + /// to do so (e.g. for retries). This gets set after the first journal entry has been stored for + /// this invocation. + pinned_deployment_id: DataType::LargeUtf8, + + /// The negotiated protocol version used for this invocation. + /// This gets set after the first journal entry has been stored for this invocation. + pinned_service_protocol_version: DataType::UInt32, + + /// The ID of the trace that is assigned to this invocation. Only relevant when tracing is + /// enabled. + trace_id: DataType::LargeUtf8, + + /// The number of journal entries durably logged for this invocation. + journal_size: DataType::UInt32, + + /// The number of commands generated by this invocation, stored in the journal. + /// Only relevant when pinned_service_protocol_version >= 4. + journal_commands_size: DataType::UInt32, + + /// Timestamp indicating the start of this invocation. + created_at: TimestampMillisecond, + + /// restate-server version in use when this invocation was created. + created_using_restate_version: DataType::LargeUtf8, + + /// Timestamp indicating the last invocation status transition. For example, last time the + /// status changed from `invoked` to `suspended`. + modified_at: TimestampMillisecond, + + /// Timestamp indicating when the invocation was inboxed, if ever. + inboxed_at: TimestampMillisecond, + + /// Timestamp indicating when the invocation was scheduled, if ever. + scheduled_at: TimestampMillisecond, + + /// If the invocation was scheduled, indicates the timestamp when the invocation should start. + scheduled_start_at: TimestampMillisecond, + + /// Timestamp indicating when the invocation first transitioned to running, if ever. + running_at: TimestampMillisecond, + + /// Timestamp indicating when the invocation was completed, if ever. + completed_at: TimestampMillisecond, + + /// For how long the metadata of this invocation, including its result, is retained after completion. + completion_retention: DataType::Duration, + + /// For how long the journal is retained after completion. + journal_retention: DataType::Duration, + + /// List of completion ids the invocation is awaiting on, if `status = suspended`. + suspended_waiting_for_completions: UInt32List, + + /// List of signals the invocation is awaiting on, if `status = suspended`. + suspended_waiting_for_signals: UInt32List, + + /// The number of invocation attempts since the current leader started executing it. Increments + /// on start, so a value greater than 1 means a failure occurred. Note: the value is not a + /// global attempt counter across invocation suspensions and leadership changes. + retry_count: DataType::UInt64, + + /// Timestamp indicating the start of the most recent attempt of this invocation. + last_start_at: TimestampMillisecond, + + /// Timestamp indicating the start of the next attempt of this invocation. + next_retry_at: TimestampMillisecond, + + /// The ID of the service deployment that executed the most recent attempt of this invocation; + /// this is set before a journal entry is stored, but can change later. + last_attempt_deployment_id: DataType::LargeUtf8, + + /// Server/SDK version, e.g. `restate-sdk-java/1.0.1` + last_attempt_server: DataType::LargeUtf8, + + /// An error message describing the most recent failed attempt of this invocation, if any. + last_failure: DataType::LargeUtf8, + + /// The error code of the most recent failed attempt of this invocation, if any. + last_failure_error_code: DataType::LargeUtf8, + + /// The index of the journal entry that caused the failure, if any. It may be out-of-bound + /// of the currently stored entries in `sys_journal`. + /// DEPRECATED: you should not use this field anymore, but last_failure_related_command_index instead. + last_failure_related_entry_index: DataType::UInt64, + + /// The name of the journal entry that caused the failure, if any. + /// DEPRECATED: you should not use this field anymore, but last_failure_related_command_name instead. + last_failure_related_entry_name: DataType::LargeUtf8, + + /// The type of the journal entry that caused the failure, if any. You can check all the + /// available entry types in [`entries.rs`](https://github.com/restatedev/restate/blob/main/crates/types/src/journal/entries.rs). + /// DEPRECATED: you should not use this field anymore, but last_failure_related_command_type instead. + last_failure_related_entry_type: DataType::LargeUtf8, + + /// The index of the command in the journal that caused the failure, if any. It may be out-of-bound + /// of the currently stored commands in `sys_journal`. + last_failure_related_command_index: DataType::UInt64, + + /// The name of the command that caused the failure, if any. + last_failure_related_command_name: DataType::LargeUtf8, + + /// The type of the command that caused the failure, if any. You can check all the + /// available command types in [`entries.rs`](https://github.com/restatedev/restate/blob/main/crates/types/src/journal_v2/command.rs). + last_failure_related_command_type: DataType::LargeUtf8, + + /// The status of the invocation: `pending`, `scheduled`, `ready`, `running`, + /// `backing-off`, `suspended`, `paused`, or `completed`. + status: DataType::LargeUtf8, + + /// If `status = 'completed'`, this contains either `success` or `failure` + completion_result: DataType::LargeUtf8, + + /// If `status = 'completed' AND completion_result = 'failure'`, this contains the error cause + completion_failure: DataType::LargeUtf8, +)); diff --git a/crates/storage-query-datafusion/src/invocation/table.rs b/crates/storage-query-datafusion/src/invocation/table.rs new file mode 100644 index 0000000000..35644dcf6b --- /dev/null +++ b/crates/storage-query-datafusion/src/invocation/table.rs @@ -0,0 +1,179 @@ +// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use anyhow::anyhow; +use datafusion::arrow::datatypes::SchemaRef; +use datafusion::common::DataFusionError; +use datafusion::execution::SendableRecordBatchStream; +use datafusion::physical_expr::PhysicalExpr; +use datafusion::physical_expr_common::metrics::Time; +use datafusion::physical_plan::stream::RecordBatchReceiverStream; +use restate_invoker_api::{InvocationStatusReport, StatusHandle}; +use restate_partition_store::PartitionStoreManager; +use restate_storage_api::invocation_status_table::ScanInvocationStatusTable; +use restate_types::identifiers::{InvocationId, PartitionId, PartitionKey}; +use std::collections::HashMap; +use std::fmt::Debug; +use std::ops::{ControlFlow, RangeInclusive}; +use std::sync::Arc; + +use crate::context::{QueryContext, SelectPartitions}; +use crate::filter::FirstMatchingPartitionKeyExtractor; +use crate::filter::InvocationIdFilter; +use crate::invocation::row::append_invocation_row; +use crate::invocation::schema::{SysInvocationBuilder, sys_invocation_sort_order}; +use crate::partition_store_scanner::ScanLocalPartitionFilter; +use crate::remote_query_scanner_manager::RemoteScannerManager; +use crate::statistics::{ + DEPLOYMENT_ROW_ESTIMATE, RowEstimate, SERVICE_ROW_ESTIMATE, TableStatisticsBuilder, +}; +use crate::table_providers::{PartitionedTableProvider, ScanPartition}; +use crate::table_util::BatchSender; + +const NAME: &str = "sys_invocation"; + +pub(crate) fn register_self( + ctx: &QueryContext, + partition_selector: impl SelectPartitions, + status_handle: impl StatusHandle + Send + Sync + Debug + Clone + 'static, + partition_store_manager: Arc, + remote_scanner_manager: &RemoteScannerManager, +) -> datafusion::common::Result<()> { + let local_scanner = Arc::new(LocalInvocationStatusPartitionScanner { + partition_store_manager, + status_handle, + }) as Arc; + + let schema = SysInvocationBuilder::schema(); + let statistics = TableStatisticsBuilder::new(schema.clone()) + .with_num_rows_estimate(RowEstimate::Large) + .with_partition_key() + .with_primary_key("id") + .with_foreign_key("pinned_deployment_id", DEPLOYMENT_ROW_ESTIMATE) + .with_foreign_key("target_service_name", SERVICE_ROW_ESTIMATE); + + let table = PartitionedTableProvider::new( + partition_selector, + schema, + sys_invocation_sort_order(), + remote_scanner_manager.create_distributed_scanner(NAME, local_scanner), + FirstMatchingPartitionKeyExtractor::default() + .with_service_key("target_service_key") + .with_invocation_id("id"), + ) + .with_statistics(statistics.build()); + ctx.register_partitioned_table(NAME, Arc::new(table)) +} + +#[derive(Clone, derive_more::Debug)] +struct LocalInvocationStatusPartitionScanner { + #[debug(skip)] + partition_store_manager: Arc, + #[debug(skip)] + status_handle: S, +} + +impl LocalInvocationStatusPartitionScanner +where + S: StatusHandle + Send + Sync + Debug + Clone + 'static, +{ + #[allow(clippy::too_many_arguments)] + fn scan_partition( + &self, + partition_id: PartitionId, + range: RangeInclusive, + projection: SchemaRef, + predicate: Option>, + batch_size: usize, + limit: Option, + elapsed_compute: Time, + ) -> anyhow::Result { + let partition_store_manager = self.partition_store_manager.clone(); + let mut stream_builder = RecordBatchReceiverStream::builder(projection.clone(), 1); + let tx = stream_builder.tx(); + let status_handle = self.status_handle.clone(); + + let background_task = async move { + let partition_store = partition_store_manager.get_partition_store(partition_id).await.ok_or_else(|| { + // make sure that the consumer of this stream to learn about the fact that this node does not have + // that partition anymore, so that it can decide how to react to this. + // for example, they can retry or fail the query with a useful message. + let err = anyhow!("partition {} doesn't exist on this node, this is benign if the partition is being transferred out of/into this node.", partition_id); + DataFusionError::External(err.into()) + })?; + + // timer starts on first row, stops on scanner drop + let mut elapsed_compute = + crate::partition_store_scanner::ElapsedCompute::new(elapsed_compute); + + let mut batch_sender = + BatchSender::new(projection, tx, predicate.clone(), batch_size, limit); + + // Collect invocation status report + let invocation_status_reports: HashMap = + status_handle + .read_status(range.clone()) + .await + .map(|isr| (*isr.invocation_id(), isr)) + .collect(); + + partition_store + .for_each_invocation_status_lazy( + InvocationIdFilter::new(range.clone(), predicate).into(), + move |(invocation_id, invocation_status)| { + elapsed_compute.start(); + match append_invocation_row( + batch_sender.builder_mut(), + invocation_id, + invocation_status, + invocation_status_reports.get(&invocation_id), + ) { + Ok(()) => {} + err => return ControlFlow::Break(err), + } + batch_sender.send_if_needed().map_break(Ok) + }, + ) + .map_err(|err| DataFusionError::External(err.into()))? + .await + .map_err(|err| DataFusionError::External(err.into()))?; + + Ok(()) + }; + stream_builder.spawn(background_task); + Ok(stream_builder.build()) + } +} + +impl ScanPartition for LocalInvocationStatusPartitionScanner +where + S: StatusHandle + Send + Sync + Debug + Clone + 'static, +{ + fn scan_partition( + &self, + partition_id: PartitionId, + range: RangeInclusive, + projection: SchemaRef, + predicate: Option>, + batch_size: usize, + limit: Option, + elapsed_compute: Time, + ) -> anyhow::Result { + self.scan_partition( + partition_id, + range, + projection, + predicate, + batch_size, + limit, + elapsed_compute, + ) + } +} diff --git a/crates/storage-query-datafusion/src/lib.rs b/crates/storage-query-datafusion/src/lib.rs index e3d29cad73..84b5a95918 100644 --- a/crates/storage-query-datafusion/src/lib.rs +++ b/crates/storage-query-datafusion/src/lib.rs @@ -16,6 +16,7 @@ pub mod bifrost_read_stream; mod deployment; mod idempotency; mod inbox; +mod invocation; mod invocation_state; mod invocation_status; mod journal; diff --git a/crates/storage-query-datafusion/src/partition_store_scanner.rs b/crates/storage-query-datafusion/src/partition_store_scanner.rs index a2cbb6394c..ff2e9368b3 100644 --- a/crates/storage-query-datafusion/src/partition_store_scanner.rs +++ b/crates/storage-query-datafusion/src/partition_store_scanner.rs @@ -165,17 +165,17 @@ where } } -struct ElapsedCompute { +pub(crate) struct ElapsedCompute { time: Time, start: Option, } impl ElapsedCompute { - fn new(time: Time) -> Self { + pub(crate) fn new(time: Time) -> Self { Self { time, start: None } } - fn start(&mut self) { + pub(crate) fn start(&mut self) { self.start.get_or_insert_with(Instant::now); } } diff --git a/crates/storage-query-datafusion/src/tests.rs b/crates/storage-query-datafusion/src/tests.rs index ab2fe04f89..4976dcfe1a 100644 --- a/crates/storage-query-datafusion/src/tests.rs +++ b/crates/storage-query-datafusion/src/tests.rs @@ -241,7 +241,7 @@ async fn query_sys_invocation_with_protocol_v4() { } #[restate_core::test(flavor = "multi_thread", worker_threads = 2)] -async fn query_sys_invocation_status_completed() { +async fn query_sys_invocation_completed() { let invocation_target = InvocationTarget::mock_service(); // To have deterministic ordering @@ -296,7 +296,7 @@ async fn query_sys_invocation_status_completed() { id, completed_at + completion_retention AS completion_expiration, completed_at + journal_retention AS journal_expiration - FROM sys_invocation_status + FROM sys_invocation WHERE journal_retention >= INTERVAL 5 SECOND ORDER BY id ASC", )