Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -338,3 +338,6 @@ strip = true
[profile.dev.package.tests-fuzz]
debug = false
strip = true

[patch."https://github.com/GreptimeTeam/greptime-proto.git"]
greptime-proto = { path = "../greptime-proto" }
3 changes: 3 additions & 0 deletions src/common/function/src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
mod add_region_follower;
mod flush_compact_region;
mod flush_compact_table;
mod build_index_table;
mod migrate_region;
mod reconcile_catalog;
mod reconcile_database;
Expand All @@ -30,6 +31,7 @@ use reconcile_database::ReconcileDatabaseFunction;
use reconcile_table::ReconcileTableFunction;
use remove_region_follower::RemoveRegionFollowerFunction;

use crate::admin::build_index_table::BuildIndexFunction;
use crate::flush_flow::FlushFlowFunction;
use crate::function_registry::FunctionRegistry;

Expand All @@ -46,6 +48,7 @@ impl AdminFunction {
registry.register(CompactRegionFunction::factory());
registry.register(FlushTableFunction::factory());
registry.register(CompactTableFunction::factory());
registry.register(BuildIndexFunction::factory());
registry.register(FlushFlowFunction::factory());
registry.register(ReconcileCatalogFunction::factory());
registry.register(ReconcileDatabaseFunction::factory());
Expand Down
80 changes: 80 additions & 0 deletions src/common/function/src/admin/build_index_table.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use arrow::datatypes::DataType as ArrowDataType;
use common_error::ext::BoxedError;
use common_macro::admin_fn;
use common_query::error::{
InvalidFuncArgsSnafu, MissingTableMutationHandlerSnafu, Result, TableMutationSnafu,
UnsupportedInputDataTypeSnafu,
};
use datafusion_expr::{Signature, Volatility};
use datatypes::prelude::*;
use session::context::QueryContextRef;
use session::table_name::table_name_to_full_name;
use snafu::{ResultExt, ensure};
use table::requests::BuildIndexTableRequest;

use crate::handlers::TableMutationHandlerRef;

#[admin_fn(
name = BuildIndexFunction,
display_name = build_index,
sig_fn = build_index_signature,
ret = uint64
)]
pub(crate) async fn build_index(
table_mutation_handler: &TableMutationHandlerRef,
query_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
ensure!(
params.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 1, have: {}",
params.len()
),
}
);

let ValueRef::String(table_name) = params[0] else {
return UnsupportedInputDataTypeSnafu {
function: "build_index",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};

let (catalog_name, schema_name, table_name) = table_name_to_full_name(table_name, query_ctx)
.map_err(BoxedError::new)
.context(TableMutationSnafu)?;

let affected_rows = table_mutation_handler
.build_index(
BuildIndexTableRequest {
catalog_name,
schema_name,
table_name,
},
query_ctx.clone(),
)
.await?;

Ok(Value::from(affected_rows as u64))
}

fn build_index_signature() -> Signature {
Signature::uniform(1, vec![ArrowDataType::Utf8], Volatility::Immutable)
}
6 changes: 5 additions & 1 deletion src/common/function/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use common_query::Output;
use common_query::error::Result;
use session::context::QueryContextRef;
use store_api::storage::RegionId;
use table::requests::{CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest};
use table::requests::{BuildIndexTableRequest, CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest};

/// A trait for handling table mutations in `QueryEngine`.
#[async_trait]
Expand All @@ -48,6 +48,10 @@ pub trait TableMutationHandler: Send + Sync {
ctx: QueryContextRef,
) -> Result<AffectedRows>;

/// Trigger a index build task for table.
async fn build_index(&self, request: BuildIndexTableRequest, ctx: QueryContextRef)
-> Result<AffectedRows>;

/// Trigger a flush task for a table region.
async fn flush_region(&self, region_id: RegionId, ctx: QueryContextRef)
-> Result<AffectedRows>;
Expand Down
10 changes: 9 additions & 1 deletion src/common/function/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl FunctionState {
use session::context::QueryContextRef;
use store_api::storage::RegionId;
use table::requests::{
CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest,
BuildIndexTableRequest, CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest
};

use crate::handlers::{FlowServiceHandler, ProcedureServiceHandler, TableMutationHandler};
Expand Down Expand Up @@ -125,6 +125,14 @@ impl FunctionState {
Ok(ROWS)
}

async fn build_index(
&self,
_request: BuildIndexTableRequest,
_ctx: QueryContextRef,
) -> Result<AffectedRows> {
Ok(ROWS)
}

async fn flush_region(
&self,
_region_id: RegionId,
Expand Down
3 changes: 2 additions & 1 deletion src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1044,7 +1044,8 @@ impl RegionServerInner {
RegionRequest::Alter(_)
| RegionRequest::Flush(_)
| RegionRequest::Compact(_)
| RegionRequest::Truncate(_) => RegionChange::None,
| RegionRequest::Truncate(_)
| RegionRequest::BuildIndex(_)=> RegionChange::None,
RegionRequest::Catchup(_) => RegionChange::Catchup,
};

Expand Down
12 changes: 12 additions & 0 deletions src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,18 @@ impl RegionEngine for MetricEngine {
}
}
RegionRequest::Flush(req) => self.inner.flush_region(region_id, req).await,
RegionRequest::BuildIndex(_) => {
if self.inner.is_physical_region(region_id) {
self.inner
.mito
.handle_request(region_id, request)
.await
.context(error::MitoFlushOperationSnafu)
.map(|response| response.affected_rows)
} else {
UnsupportedRegionRequestSnafu { request }.fail()
}
}
RegionRequest::Truncate(_) => UnsupportedRegionRequestSnafu { request }.fail(),
RegionRequest::Delete(_) => {
if self.inner.is_physical_region(region_id) {
Expand Down
16 changes: 13 additions & 3 deletions src/mito2/src/access_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ use store_api::storage::{RegionId, SequenceNumber};
use crate::cache::CacheManagerRef;
use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
use crate::cache::write_cache::SstUploadRequest;
use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
use crate::config::{BloomFilterConfig, FulltextIndexConfig, IndexConfig, InvertedIndexConfig};
use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result};
use crate::metrics::{COMPACTION_STAGE_ELAPSED, FLUSH_ELAPSED};
use crate::read::Source;
use crate::region::options::IndexOptions;
use crate::sst::file::{FileHandle, FileId, RegionFileId};
use crate::sst::index::IndexerBuilderImpl;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::sst::index::puffin_manager::{PuffinManagerFactory, SstPuffinManager};
use crate::sst::location::{self, region_dir_from_table_dir};
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::sst::parquet::writer::ParquetWriter;
Expand Down Expand Up @@ -194,6 +194,14 @@ impl AccessLayer {
&self.intermediate_manager
}

/// Build the puffin manager.
pub(crate) fn build_puffin_manager(&self) -> SstPuffinManager {
let store = self.object_store.clone();
let path_provider =
RegionFilePathFactory::new(self.table_dir().to_string(), self.path_type());
self.puffin_manager_factory.build(store, path_provider)
}

/// Deletes a SST file (and its index file if it has one) with given file id.
pub(crate) async fn delete_sst(&self, region_file_id: &RegionFileId) -> Result<()> {
let path = location::sst_file_path(&self.table_dir, *region_file_id, self.path_type);
Expand Down Expand Up @@ -263,7 +271,7 @@ impl AccessLayer {
let store = self.object_store.clone();
let path_provider = RegionFilePathFactory::new(self.table_dir.clone(), self.path_type);
let indexer_builder = IndexerBuilderImpl {
op_type: request.op_type,
build_type: request.op_type.into(),
metadata: request.metadata.clone(),
row_group_size: write_opts.row_group_size,
puffin_manager: self
Expand All @@ -282,6 +290,7 @@ impl AccessLayer {
let mut writer = ParquetWriter::new_with_object_store(
self.object_store.clone(),
request.metadata,
request.index_config,
indexer_builder,
path_provider,
Metrics::new(write_type),
Expand Down Expand Up @@ -371,6 +380,7 @@ pub struct SstWriteRequest {

/// Configs for index
pub index_options: IndexOptions,
pub index_config: IndexConfig,
pub inverted_index_config: InvertedIndexConfig,
pub fulltext_index_config: FulltextIndexConfig,
pub bloom_filter_index_config: BloomFilterConfig,
Expand Down
30 changes: 23 additions & 7 deletions src/mito2/src/cache/write_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::metrics::{
use crate::sst::file::RegionFileId;
use crate::sst::index::IndexerBuilderImpl;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::sst::index::puffin_manager::{PuffinManagerFactory, SstPuffinManager};
use crate::sst::parquet::WriteOptions;
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
Expand Down Expand Up @@ -101,6 +101,18 @@ impl WriteCache {
self.file_cache.clone()
}

/// Build the puffin manager
pub(crate) fn build_puffin_manager(&self) -> SstPuffinManager {
let store = self.file_cache.local_store();
let path_provider = WriteCachePathProvider::new(self.file_cache.clone());
self.puffin_manager_factory.build(store, path_provider)
}

/// Returns the intermediate manager of the write cache.
pub(crate) fn intermediate_manager(&self) -> &IntermediateManager {
&self.intermediate_manager
}

/// Writes SST to the cache and then uploads it to the remote object store.
pub(crate) async fn write_and_upload_sst(
&self,
Expand All @@ -114,7 +126,7 @@ impl WriteCache {
let store = self.file_cache.local_store();
let path_provider = WriteCachePathProvider::new(self.file_cache.clone());
let indexer = IndexerBuilderImpl {
op_type: write_request.op_type,
build_type: write_request.op_type.into(),
metadata: write_request.metadata.clone(),
row_group_size: write_opts.row_group_size,
puffin_manager: self
Expand All @@ -132,6 +144,7 @@ impl WriteCache {
let mut writer = ParquetWriter::new_with_object_store(
store.clone(),
write_request.metadata,
write_request.index_config,
indexer,
path_provider.clone(),
Metrics::new(write_type),
Expand Down Expand Up @@ -287,7 +300,7 @@ impl WriteCache {
}

/// Uploads a Parquet file or a Puffin file to the remote object store.
async fn upload(
pub(crate) async fn upload(
&self,
index_key: IndexKey,
upload_path: &str,
Expand Down Expand Up @@ -368,7 +381,7 @@ pub struct SstUploadRequest {
}

/// A structs to track files to upload and clean them if upload failed.
struct UploadTracker {
pub(crate) struct UploadTracker {
/// Id of the region to track.
region_id: RegionId,
/// Paths of files uploaded successfully.
Expand All @@ -377,20 +390,20 @@ struct UploadTracker {

impl UploadTracker {
/// Creates a new instance of `UploadTracker` for a given region.
fn new(region_id: RegionId) -> Self {
pub(crate) fn new(region_id: RegionId) -> Self {
Self {
region_id,
files_uploaded: Vec::new(),
}
}

/// Add a file path to the list of uploaded files.
fn push_uploaded_file(&mut self, path: String) {
pub(crate) fn push_uploaded_file(&mut self, path: String) {
self.files_uploaded.push(path);
}

/// Cleans uploaded files and files in the file cache at best effort.
async fn clean(
pub(crate) async fn clean(
&self,
sst_info: &SstInfoArray,
file_cache: &FileCacheRef,
Expand Down Expand Up @@ -474,6 +487,7 @@ mod tests {
max_sequence: None,
cache_manager: Default::default(),
index_options: IndexOptions::default(),
index_config: Default::default(),
inverted_index_config: Default::default(),
fulltext_index_config: Default::default(),
bloom_filter_index_config: Default::default(),
Expand Down Expand Up @@ -572,6 +586,7 @@ mod tests {
max_sequence: None,
cache_manager: cache_manager.clone(),
index_options: IndexOptions::default(),
index_config: Default::default(),
inverted_index_config: Default::default(),
fulltext_index_config: Default::default(),
bloom_filter_index_config: Default::default(),
Expand Down Expand Up @@ -651,6 +666,7 @@ mod tests {
max_sequence: None,
cache_manager: cache_manager.clone(),
index_options: IndexOptions::default(),
index_config: Default::default(),
inverted_index_config: Default::default(),
fulltext_index_config: Default::default(),
bloom_filter_index_config: Default::default(),
Expand Down
3 changes: 3 additions & 0 deletions src/mito2/src/compaction/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ impl Compactor for DefaultCompactor {
.clone();
let append_mode = compaction_region.current_version.options.append_mode;
let merge_mode = compaction_region.current_version.options.merge_mode();
let index_config = compaction_region.engine_config.index.clone();
let inverted_index_config = compaction_region.engine_config.inverted_index.clone();
let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone();
let bloom_filter_index_config =
Expand Down Expand Up @@ -381,6 +382,7 @@ impl Compactor for DefaultCompactor {
storage,
max_sequence: max_sequence.map(NonZero::get),
index_options,
index_config,
inverted_index_config,
fulltext_index_config,
bloom_filter_index_config,
Expand Down Expand Up @@ -411,6 +413,7 @@ impl Compactor for DefaultCompactor {
level: output.output_level,
file_size: sst_info.file_size,
available_indexes: sst_info.index_metadata.build_available_indexes(),
indexes: sst_info.index_metadata.build_indexes(),
index_file_size: sst_info.index_metadata.file_size,
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/compaction/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ pub fn new_file_handle_with_size_and_sequence(
level,
file_size,
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
num_rows: 0,
num_row_groups: 0,
Expand Down
Loading