Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions crates/storage-query-datafusion/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion::sql::TableReference;

use crate::empty_invoker_status_handle::EmptyInvokerStatusHandle;
use crate::remote_query_scanner_manager::RemoteScannerManager;
use codederror::CodedError;
use restate_core::{Metadata, TaskCenter};
use restate_invoker_api::StatusHandle;
Expand All @@ -40,9 +42,7 @@ use restate_types::partitions::state::PartitionReplicaSetStates;
use restate_types::schema::deployment::DeploymentResolver;
use restate_types::schema::service::ServiceMetadataResolver;

use crate::remote_query_scanner_manager::RemoteScannerManager;

const SYS_INVOCATION_VIEW: &str = "CREATE VIEW sys_invocation as SELECT
const SYS_INVOCATION_OLD_VIEW: &str = "CREATE VIEW sys_invocation_old as SELECT
ss.id,
ss.target,
ss.target_service_name,
Expand Down Expand Up @@ -189,6 +189,23 @@ where
self.partition_store_manager.clone(),
&self.remote_scanner_manager,
)?;
if let Some(status) = self.status.clone() {
crate::invocation::register_self(
ctx,
self.partition_selector.clone(),
status,
self.partition_store_manager.clone(),
&self.remote_scanner_manager,
)?;
} else {
crate::invocation::register_self(
ctx,
self.partition_selector.clone(),
EmptyInvokerStatusHandle,
self.partition_store_manager.clone(),
&self.remote_scanner_manager,
)?;
}
crate::keyed_service_status::register_self(
ctx,
self.partition_selector.clone(),
Expand Down Expand Up @@ -232,7 +249,7 @@ where
&self.remote_scanner_manager,
)?;

ctx.datafusion_context.sql(SYS_INVOCATION_VIEW).await?;
ctx.datafusion_context.sql(SYS_INVOCATION_OLD_VIEW).await?;

Ok(())
}
Expand Down
15 changes: 15 additions & 0 deletions crates/storage-query-datafusion/src/invocation/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

mod row;
pub(crate) mod schema;
mod table;

pub(crate) use table::register_self;
Loading
Loading