Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions src/catalog/src/system_schema/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use datatypes::schema::SchemaRef;
use lazy_static::lazy_static;
use paste::paste;
use process_list::InformationSchemaProcessList;
use store_api::sst_entry::{ManifestSstEntry, StorageSstEntry};
use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry};
use store_api::storage::{ScanRequest, TableId};
use table::TableRef;
use table::metadata::TableType;
Expand All @@ -68,7 +68,7 @@ use crate::system_schema::information_schema::region_peers::InformationSchemaReg
use crate::system_schema::information_schema::runtime_metrics::InformationSchemaMetrics;
use crate::system_schema::information_schema::schemata::InformationSchemaSchemata;
use crate::system_schema::information_schema::ssts::{
InformationSchemaSstsManifest, InformationSchemaSstsStorage,
InformationSchemaSstsIndexMeta, InformationSchemaSstsManifest, InformationSchemaSstsStorage,
};
use crate::system_schema::information_schema::table_constraints::InformationSchemaTableConstraints;
use crate::system_schema::information_schema::tables::InformationSchemaTables;
Expand Down Expand Up @@ -263,6 +263,9 @@ impl SystemSchemaProviderInner for InformationSchemaProvider {
SSTS_STORAGE => Some(Arc::new(InformationSchemaSstsStorage::new(
self.catalog_manager.clone(),
)) as _),
SSTS_INDEX_META => Some(Arc::new(InformationSchemaSstsIndexMeta::new(
self.catalog_manager.clone(),
)) as _),
_ => None,
}
}
Expand Down Expand Up @@ -342,6 +345,10 @@ impl InformationSchemaProvider {
SSTS_STORAGE.to_string(),
self.build_table(SSTS_STORAGE).unwrap(),
);
tables.insert(
SSTS_INDEX_META.to_string(),
self.build_table(SSTS_INDEX_META).unwrap(),
);
}

tables.insert(TABLES.to_string(), self.build_table(TABLES).unwrap());
Expand Down Expand Up @@ -456,6 +463,8 @@ pub enum DatanodeInspectKind {
SstManifest,
/// List SST entries discovered in storage layer
SstStorage,
/// List index metadata collected from manifest
SstIndexMeta,
}

impl DatanodeInspectRequest {
Expand All @@ -464,6 +473,7 @@ impl DatanodeInspectRequest {
match self.kind {
DatanodeInspectKind::SstManifest => ManifestSstEntry::build_plan(self.scan),
DatanodeInspectKind::SstStorage => StorageSstEntry::build_plan(self.scan),
DatanodeInspectKind::SstIndexMeta => PuffinIndexMetaEntry::build_plan(self.scan),
}
}
}
Expand Down
63 changes: 60 additions & 3 deletions src/catalog/src/system_schema/information_schema/ssts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,22 @@
use std::sync::{Arc, Weak};

use common_catalog::consts::{
INFORMATION_SCHEMA_SSTS_MANIFEST_TABLE_ID, INFORMATION_SCHEMA_SSTS_STORAGE_TABLE_ID,
INFORMATION_SCHEMA_SSTS_INDEX_META_TABLE_ID, INFORMATION_SCHEMA_SSTS_MANIFEST_TABLE_ID,
INFORMATION_SCHEMA_SSTS_STORAGE_TABLE_ID,
};
use common_error::ext::BoxedError;
use common_recordbatch::SendableRecordBatchStream;
use common_recordbatch::adapter::AsyncRecordBatchStreamAdapter;
use datatypes::schema::SchemaRef;
use snafu::ResultExt;
use store_api::sst_entry::{ManifestSstEntry, StorageSstEntry};
use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry};
use store_api::storage::{ScanRequest, TableId};

use crate::CatalogManager;
use crate::error::{ProjectSchemaSnafu, Result};
use crate::information_schema::{
DatanodeInspectKind, DatanodeInspectRequest, InformationTable, SSTS_MANIFEST, SSTS_STORAGE,
DatanodeInspectKind, DatanodeInspectRequest, InformationTable, SSTS_INDEX_META, SSTS_MANIFEST,
SSTS_STORAGE,
};
use crate::system_schema::utils;

Expand Down Expand Up @@ -140,3 +142,58 @@ impl InformationTable for InformationSchemaSstsStorage {
)))
}
}

/// Information schema table for index metadata.
pub struct InformationSchemaSstsIndexMeta {
schema: SchemaRef,
catalog_manager: Weak<dyn CatalogManager>,
}

impl InformationSchemaSstsIndexMeta {
pub(super) fn new(catalog_manager: Weak<dyn CatalogManager>) -> Self {
Self {
schema: PuffinIndexMetaEntry::schema(),
catalog_manager,
}
}
}

impl InformationTable for InformationSchemaSstsIndexMeta {
fn table_id(&self) -> TableId {
INFORMATION_SCHEMA_SSTS_INDEX_META_TABLE_ID
}

fn table_name(&self) -> &'static str {
SSTS_INDEX_META
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
}

fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
let schema = if let Some(p) = &request.projection {
Arc::new(self.schema.try_project(p).context(ProjectSchemaSnafu)?)
} else {
self.schema.clone()
};

let info_ext = utils::information_extension(&self.catalog_manager)?;
let req = DatanodeInspectRequest {
kind: DatanodeInspectKind::SstIndexMeta,
scan: request,
};

let future = async move {
info_ext
.inspect_datanode(req)
.await
.map_err(BoxedError::new)
.context(common_recordbatch::error::ExternalSnafu)
};
Ok(Box::pin(AsyncRecordBatchStreamAdapter::new(
schema,
Box::pin(future),
)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,4 @@ pub const REGION_STATISTICS: &str = "region_statistics";
pub const PROCESS_LIST: &str = "process_list";
pub const SSTS_MANIFEST: &str = "ssts_manifest";
pub const SSTS_STORAGE: &str = "ssts_storage";
pub const SSTS_INDEX_META: &str = "ssts_index_meta";
2 changes: 2 additions & 0 deletions src/common/catalog/src/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ pub const INFORMATION_SCHEMA_PROCESS_LIST_TABLE_ID: u32 = 36;
pub const INFORMATION_SCHEMA_SSTS_MANIFEST_TABLE_ID: u32 = 37;
/// id for information_schema.ssts_storage
pub const INFORMATION_SCHEMA_SSTS_STORAGE_TABLE_ID: u32 = 38;
/// id for information_schema.ssts_index_meta
pub const INFORMATION_SCHEMA_SSTS_INDEX_META_TABLE_ID: u32 = 39;

// ----- End of information_schema tables -----

Expand Down
27 changes: 26 additions & 1 deletion src/datanode/src/region_server/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,20 @@ use datafusion_expr::{LogicalPlan, TableSource};
use futures::TryStreamExt;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use store_api::sst_entry::{ManifestSstEntry, StorageSstEntry};
use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry};
use store_api::storage::RegionId;

use crate::error::{DataFusionSnafu, ListStorageSstsSnafu, Result, UnexpectedSnafu};
use crate::region_server::RegionServer;

/// Reserved internal table kinds used.
/// These are recognized by reserved table names and mapped to providers.
#[allow(clippy::enum_variant_names)]
#[derive(Clone, Debug, PartialEq, Eq, Hash, Copy)]
enum InternalTableKind {
InspectSstManifest,
InspectSstStorage,
InspectSstIndexMeta,
}

impl InternalTableKind {
Expand All @@ -50,6 +52,9 @@ impl InternalTableKind {
if name.eq_ignore_ascii_case(StorageSstEntry::reserved_table_name_for_inspection()) {
return Some(Self::InspectSstStorage);
}
if name.eq_ignore_ascii_case(PuffinIndexMetaEntry::reserved_table_name_for_inspection()) {
return Some(Self::InspectSstIndexMeta);
}
None
}

Expand All @@ -58,6 +63,7 @@ impl InternalTableKind {
match self {
Self::InspectSstManifest => server.inspect_sst_manifest_provider().await,
Self::InspectSstStorage => server.inspect_sst_storage_provider().await,
Self::InspectSstIndexMeta => server.inspect_sst_index_meta_provider().await,
}
}
}
Expand Down Expand Up @@ -103,6 +109,25 @@ impl RegionServer {
let table = MemTable::try_new(schema, vec![vec![batch]]).context(DataFusionSnafu)?;
Ok(Arc::new(table))
}

/// Expose index metadata across the engine as an in-memory table.
pub async fn inspect_sst_index_meta_provider(&self) -> Result<Arc<dyn TableProvider>> {
let mito = {
let guard = self.inner.mito_engine.read().unwrap();
guard.as_ref().cloned().context(UnexpectedSnafu {
violated: "mito engine not available",
})?
};

let entries = mito.all_index_metas().await;
let schema = PuffinIndexMetaEntry::schema().arrow_schema().clone();
let batch = PuffinIndexMetaEntry::to_record_batch(&entries)
.map_err(DataFusionError::from)
.context(DataFusionSnafu)?;

let table = MemTable::try_new(schema, vec![vec![batch]]).context(DataFusionSnafu)?;
Ok(Arc::new(table))
}
}

/// A catalog list that resolves `TableProvider` by table name:
Expand Down
9 changes: 9 additions & 0 deletions src/store-api/src/sst_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,15 @@ impl PuffinIndexMetaEntry {
pub fn reserved_table_name_for_inspection() -> &'static str {
"__inspect/__mito/__puffin_index_meta"
}

/// Builds a logical plan for scanning puffin index metadata entries.
pub fn build_plan(scan_request: ScanRequest) -> Result<LogicalPlan, DataFusionError> {
build_plan_helper(
scan_request,
Self::reserved_table_name_for_inspection(),
Self::schema(),
)
}
}

fn build_plan_helper(
Expand Down
Loading